From e454732508449fbc1abf3951678c793e4a160061 Mon Sep 17 00:00:00 2001 From: chentao1596 Date: Wed, 4 Jan 2017 20:14:08 +0800 Subject: [PATCH 1/2] check queue's status before enqueue --- core/pkg/task/queue.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/pkg/task/queue.go b/core/pkg/task/queue.go index e7fd0a848..d29f3c2c9 100644 --- a/core/pkg/task/queue.go +++ b/core/pkg/task/queue.go @@ -51,6 +51,11 @@ func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) { // Enqueue enqueues ns/name of the given api object in the task queue. func (t *Queue) Enqueue(obj interface{}) { + if t.IsShuttingDown() { + glog.Errorf("queue has been shutdown, failed to enqueue: %v", obj) + return + } + glog.V(3).Infof("queuing item %v", obj) key, err := t.fn(obj) if err != nil { From 28b610da65aaf02f397a56fd4c303edddc7a6df4 Mon Sep 17 00:00:00 2001 From: chentao1596 Date: Sat, 7 Jan 2017 23:30:34 +0800 Subject: [PATCH 2/2] add unit test case for task.Queue --- core/pkg/task/queue_test.go | 132 ++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 core/pkg/task/queue_test.go diff --git a/core/pkg/task/queue_test.go b/core/pkg/task/queue_test.go new file mode 100644 index 000000000..9bd7f6237 --- /dev/null +++ b/core/pkg/task/queue_test.go @@ -0,0 +1,132 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package task + +import ( + "fmt" + "testing" + "time" +) + +var sr int = 0 + +type mockEnqueueObj struct { + k string + v string +} + +func mockSynFn(interface{}) error { + // sr will be plus one times after enqueue + sr++ + return nil +} + +func mockKeyFn(interface{}) (interface{}, error) { + return mockEnqueueObj{ + k: "static_key", + v: "static_value", + }, nil +} + +func mockErrorKeyFn(interface{}) (interface{}, error) { + return nil, fmt.Errorf("failed to get key") +} + +func TestShutdown(t *testing.T) { + q := NewTaskQueue(mockSynFn) + stopCh := make(chan struct{}) + // run queue + go q.Run(10*time.Second, stopCh) + q.Shutdown() + s := q.IsShuttingDown() + if !s { + t.Fatalf("queue shoule be shutdown") + } +} + +func TestEnqueueSuccess(t *testing.T) { + // initialize result + sr = 0 + q := NewCustomTaskQueue(mockSynFn, mockKeyFn) + stopCh := make(chan struct{}) + // run queue + go q.Run(10*time.Second, stopCh) + // mock object whichi will be enqueue + mo := mockEnqueueObj{ + k: "testKey", + v: "testValue", + } + q.Enqueue(mo) + // wait for 'mockSynFn' + time.Sleep(time.Millisecond * 10) + if sr != 1 { + t.Errorf("sr should be 1, but is %d", sr) + } + + // shutdown queue before exit + q.Shutdown() +} + +func TestEnqueueFailed(t *testing.T) { + // initialize result + sr = 0 + q := NewCustomTaskQueue(mockSynFn, mockKeyFn) + stopCh := make(chan struct{}) + // run queue + go q.Run(10*time.Second, stopCh) + // mock object whichi will be enqueue + mo := mockEnqueueObj{ + k: "testKey", + v: "testValue", + } + + // shutdown queue before enqueue + q.Shutdown() + // wait for shutdown + time.Sleep(time.Millisecond * 10) + q.Enqueue(mo) + // wait for 'mockSynFn' + time.Sleep(time.Millisecond * 10) + // queue is shutdown, so mockSynFn should not be executed, so the result should be 0 + if sr != 0 { + t.Errorf("queue has been shutdown, so sr should be 0, but is %d", sr) + } +} + +func TestEnqueueKeyError(t *testing.T) { + // initialize result + sr = 0 + q := NewCustomTaskQueue(mockSynFn, mockErrorKeyFn) + stopCh := make(chan struct{}) + // run queue + go q.Run(10*time.Second, stopCh) + // mock object whichi will be enqueue + mo := mockEnqueueObj{ + k: "testKey", + v: "testValue", + } + + q.Enqueue(mo) + // wait for 'mockSynFn' + time.Sleep(time.Millisecond * 10) + // key error, so the result should be 0 + if sr != 0 { + t.Errorf("error occurs while get key, so sr should be 0, but is %d", sr) + } + // shutdown queue before exit + q.Shutdown() +}