From 768cbb89d698b5ff52f3cd7ebd17c448dc2869a9 Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Mon, 25 Sep 2017 18:53:03 -0300 Subject: [PATCH] Process queue items by time window --- core/pkg/ingress/controller/controller.go | 12 ------- core/pkg/ingress/controller/listers.go | 5 +-- core/pkg/task/queue.go | 38 +++++++++++++++++++---- core/pkg/task/queue_test.go | 26 ++++++++++++++++ 4 files changed, 59 insertions(+), 22 deletions(-) diff --git a/core/pkg/ingress/controller/controller.go b/core/pkg/ingress/controller/controller.go index 3d96aea61..4a1efbe7f 100644 --- a/core/pkg/ingress/controller/controller.go +++ b/core/pkg/ingress/controller/controller.go @@ -113,8 +113,6 @@ type GenericController struct { runningConfig *ingress.Configuration forceReload int32 - - initialSyncDone int32 } // Configuration contains all the settings required by an Ingress controller @@ -1224,8 +1222,6 @@ func (ic *GenericController) Start() { createDefaultSSLCertificate() - ic.setInitialSyncDone() - go ic.syncQueue.Run(time.Second, ic.stopCh) if ic.syncStatus != nil { @@ -1251,14 +1247,6 @@ func (ic *GenericController) setForceReload(shouldReload bool) { } } -func (ic *GenericController) isInitialSyncDone() bool { - return atomic.LoadInt32(&ic.initialSyncDone) != 0 -} - -func (ic *GenericController) setInitialSyncDone() { - atomic.StoreInt32(&ic.initialSyncDone, 1) -} - func createDefaultSSLCertificate() { defCert, defKey := ssl.GetFakeSSLCert() c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{}) diff --git a/core/pkg/ingress/controller/listers.go b/core/pkg/ingress/controller/listers.go index 657c679ef..907cdd485 100644 --- a/core/pkg/ingress/controller/listers.go +++ b/core/pkg/ingress/controller/listers.go @@ -45,10 +45,7 @@ func (ic *GenericController) createListers(disableNodeLister bool) { return } ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) - - if ic.isInitialSyncDone() { - ic.syncQueue.Enqueue(obj) - } + ic.syncQueue.Enqueue(obj) }, DeleteFunc: func(obj interface{}) { delIng, ok := obj.(*extensions.Ingress) diff --git a/core/pkg/task/queue.go b/core/pkg/task/queue.go index 34913573e..977fdc2f6 100644 --- a/core/pkg/task/queue.go +++ b/core/pkg/task/queue.go @@ -31,8 +31,10 @@ var ( keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc ) -// Queue manages a work queue through an independent worker that -// invokes the given sync function for every work item inserted. +// Queue manages a time work queue through an independent worker that invokes the +// given sync function for every work item inserted. +// The queue uses an internal timestamp that allows the removal of certain elements +// which timestamp is older than the last successful get operation. type Queue struct { // queue is the work queue the worker polls queue workqueue.RateLimitingInterface @@ -42,6 +44,13 @@ type Queue struct { workerDone chan bool fn func(obj interface{}) (interface{}, error) + + lastSync int64 +} + +type element struct { + Key interface{} + Timestamp int64 } // Run ... @@ -56,13 +65,17 @@ func (t *Queue) Enqueue(obj interface{}) { return } + ts := time.Now().UnixNano() glog.V(3).Infof("queuing item %v", obj) key, err := t.fn(obj) if err != nil { glog.Errorf("%v", err) return } - t.queue.Add(key) + t.queue.Add(element{ + Key: key, + Timestamp: ts, + }) } func (t *Queue) defaultKeyFunc(obj interface{}) (interface{}, error) { @@ -84,13 +97,26 @@ func (t *Queue) worker() { } return } + ts := time.Now().UnixNano() - glog.V(3).Infof("syncing %v", key) + item := key.(element) + if t.lastSync > item.Timestamp { + glog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp) + t.queue.Forget(key) + t.queue.Done(key) + continue + } + + glog.V(3).Infof("syncing %v", item.Key) if err := t.sync(key); err != nil { - glog.Warningf("requeuing %v, err %v", key, err) - t.queue.AddRateLimited(key) + glog.Warningf("requeuing %v, err %v", item.Key, err) + t.queue.AddRateLimited(element{ + Key: item.Key, + Timestamp: time.Now().UnixNano(), + }) } else { t.queue.Forget(key) + t.lastSync = ts } t.queue.Done(key) diff --git a/core/pkg/task/queue_test.go b/core/pkg/task/queue_test.go index e9b1fab86..cbfd9a49b 100644 --- a/core/pkg/task/queue_test.go +++ b/core/pkg/task/queue_test.go @@ -131,3 +131,29 @@ func TestEnqueueKeyError(t *testing.T) { // shutdown queue before exit q.Shutdown() } + +func TestSkipEnqueue(t *testing.T) { + // initialize result + atomic.StoreUint32(&sr, 0) + q := NewCustomTaskQueue(mockSynFn, mockKeyFn) + stopCh := make(chan struct{}) + // run queue + go q.Run(time.Second, stopCh) + // mock object whichi will be enqueue + mo := mockEnqueueObj{ + k: "testKey", + v: "testValue", + } + q.Enqueue(mo) + q.Enqueue(mo) + q.Enqueue(mo) + q.Enqueue(mo) + // wait for 'mockSynFn' + time.Sleep(time.Millisecond * 10) + if atomic.LoadUint32(&sr) != 1 { + t.Errorf("sr should be 1, but is %d", sr) + } + + // shutdown queue before exit + q.Shutdown() +}