Process queue items by time window

This commit is contained in:
Manuel de Brito Fontes 2017-09-25 18:53:03 -03:00
parent 37262bda76
commit 768cbb89d6
4 changed files with 59 additions and 22 deletions

View file

@ -113,8 +113,6 @@ type GenericController struct {
runningConfig *ingress.Configuration runningConfig *ingress.Configuration
forceReload int32 forceReload int32
initialSyncDone int32
} }
// Configuration contains all the settings required by an Ingress controller // Configuration contains all the settings required by an Ingress controller
@ -1224,8 +1222,6 @@ func (ic *GenericController) Start() {
createDefaultSSLCertificate() createDefaultSSLCertificate()
ic.setInitialSyncDone()
go ic.syncQueue.Run(time.Second, ic.stopCh) go ic.syncQueue.Run(time.Second, ic.stopCh)
if ic.syncStatus != nil { if ic.syncStatus != nil {
@ -1251,14 +1247,6 @@ func (ic *GenericController) setForceReload(shouldReload bool) {
} }
} }
func (ic *GenericController) isInitialSyncDone() bool {
return atomic.LoadInt32(&ic.initialSyncDone) != 0
}
func (ic *GenericController) setInitialSyncDone() {
atomic.StoreInt32(&ic.initialSyncDone, 1)
}
func createDefaultSSLCertificate() { func createDefaultSSLCertificate() {
defCert, defKey := ssl.GetFakeSSLCert() defCert, defKey := ssl.GetFakeSSLCert()
c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{}) c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{})

View file

@ -45,10 +45,7 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
return return
} }
ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
ic.syncQueue.Enqueue(obj)
if ic.isInitialSyncDone() {
ic.syncQueue.Enqueue(obj)
}
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress) delIng, ok := obj.(*extensions.Ingress)

View file

@ -31,8 +31,10 @@ var (
keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
) )
// Queue manages a work queue through an independent worker that // Queue manages a time work queue through an independent worker that invokes the
// invokes the given sync function for every work item inserted. // given sync function for every work item inserted.
// The queue uses an internal timestamp that allows the removal of certain elements
// which timestamp is older than the last successful get operation.
type Queue struct { type Queue struct {
// queue is the work queue the worker polls // queue is the work queue the worker polls
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
@ -42,6 +44,13 @@ type Queue struct {
workerDone chan bool workerDone chan bool
fn func(obj interface{}) (interface{}, error) fn func(obj interface{}) (interface{}, error)
lastSync int64
}
type element struct {
Key interface{}
Timestamp int64
} }
// Run ... // Run ...
@ -56,13 +65,17 @@ func (t *Queue) Enqueue(obj interface{}) {
return return
} }
ts := time.Now().UnixNano()
glog.V(3).Infof("queuing item %v", obj) glog.V(3).Infof("queuing item %v", obj)
key, err := t.fn(obj) key, err := t.fn(obj)
if err != nil { if err != nil {
glog.Errorf("%v", err) glog.Errorf("%v", err)
return return
} }
t.queue.Add(key) t.queue.Add(element{
Key: key,
Timestamp: ts,
})
} }
func (t *Queue) defaultKeyFunc(obj interface{}) (interface{}, error) { func (t *Queue) defaultKeyFunc(obj interface{}) (interface{}, error) {
@ -84,13 +97,26 @@ func (t *Queue) worker() {
} }
return return
} }
ts := time.Now().UnixNano()
glog.V(3).Infof("syncing %v", key) item := key.(element)
if t.lastSync > item.Timestamp {
glog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
t.queue.Forget(key)
t.queue.Done(key)
continue
}
glog.V(3).Infof("syncing %v", item.Key)
if err := t.sync(key); err != nil { if err := t.sync(key); err != nil {
glog.Warningf("requeuing %v, err %v", key, err) glog.Warningf("requeuing %v, err %v", item.Key, err)
t.queue.AddRateLimited(key) t.queue.AddRateLimited(element{
Key: item.Key,
Timestamp: time.Now().UnixNano(),
})
} else { } else {
t.queue.Forget(key) t.queue.Forget(key)
t.lastSync = ts
} }
t.queue.Done(key) t.queue.Done(key)

View file

@ -131,3 +131,29 @@ func TestEnqueueKeyError(t *testing.T) {
// shutdown queue before exit // shutdown queue before exit
q.Shutdown() q.Shutdown()
} }
func TestSkipEnqueue(t *testing.T) {
// initialize result
atomic.StoreUint32(&sr, 0)
q := NewCustomTaskQueue(mockSynFn, mockKeyFn)
stopCh := make(chan struct{})
// run queue
go q.Run(time.Second, stopCh)
// mock object whichi will be enqueue
mo := mockEnqueueObj{
k: "testKey",
v: "testValue",
}
q.Enqueue(mo)
q.Enqueue(mo)
q.Enqueue(mo)
q.Enqueue(mo)
// wait for 'mockSynFn'
time.Sleep(time.Millisecond * 10)
if atomic.LoadUint32(&sr) != 1 {
t.Errorf("sr should be 1, but is %d", sr)
}
// shutdown queue before exit
q.Shutdown()
}