From 5db8389fb34b6e4664c9a479a6b51c8dbbee1d5e Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Mon, 27 Jun 2016 20:06:35 -0700 Subject: [PATCH] Rate limit requeues on error --- controllers/gce/controller/controller.go | 40 +++++++++--------------- controllers/gce/controller/utils.go | 20 ++++++------ 2 files changed, 24 insertions(+), 36 deletions(-) diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index 8c03337cf..3b3524ddb 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -260,31 +260,27 @@ func (lbc *LoadBalancerController) storesSynced() bool { } // sync manages Ingress create/updates/deletes. -func (lbc *LoadBalancerController) sync(key string) { +func (lbc *LoadBalancerController) sync(key string) (err error) { if !lbc.hasSynced() { time.Sleep(storeSyncPollPeriod) - lbc.ingQueue.requeue(key, fmt.Errorf("Waiting for stores to sync")) - return + return fmt.Errorf("Waiting for stores to sync") } glog.V(3).Infof("Syncing %v", key) paths, err := lbc.ingLister.List() if err != nil { - lbc.ingQueue.requeue(key, err) - return + return err } nodePorts := lbc.tr.toNodePorts(&paths) lbNames := lbc.ingLister.Store.ListKeys() lbs, _ := lbc.ListRuntimeInfo() nodeNames, err := lbc.getReadyNodeNames() if err != nil { - lbc.ingQueue.requeue(key, err) - return + return err } obj, ingExists, err := lbc.ingLister.Store.GetByKey(key) if err != nil { - lbc.ingQueue.requeue(key, err) - return + return err } // This performs a 2 phase checkpoint with the cloud: @@ -299,8 +295,8 @@ func (lbc *LoadBalancerController) sync(key string) { // don't have an associated Kubernetes Ingress/Service/Endpoint. defer func() { - if err := lbc.CloudClusterManager.GC(lbNames, nodePorts); err != nil { - lbc.ingQueue.requeue(key, err) + if deferErr := lbc.CloudClusterManager.GC(lbNames, nodePorts); deferErr != nil { + err = fmt.Errorf("Error during sync %v, error during GC %v", err, deferErr) } glog.V(3).Infof("Finished syncing %v", key) }() @@ -323,16 +319,12 @@ func (lbc *LoadBalancerController) sync(key string) { } if !ingExists { - if syncError != nil { - lbc.ingQueue.requeue(key, err) - } - return + return syncError } // Update the UrlMap of the single loadbalancer that came through the watch. l7, err := lbc.CloudClusterManager.l7Pool.Get(key) if err != nil { - lbc.ingQueue.requeue(key, fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err)) - return + return fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err) } ing := *obj.(*extensions.Ingress) @@ -345,10 +337,7 @@ func (lbc *LoadBalancerController) sync(key string) { lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error()) syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err) } - if syncError != nil { - lbc.ingQueue.requeue(key, syncError) - } - return + return syncError } // updateIngressStatus updates the IP and annotations of a loadbalancer. @@ -421,16 +410,15 @@ func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7Run // syncNodes manages the syncing of kubernetes nodes to gce instance groups. // The instancegroups are referenced by loadbalancer backends. -func (lbc *LoadBalancerController) syncNodes(key string) { +func (lbc *LoadBalancerController) syncNodes(key string) error { nodeNames, err := lbc.getReadyNodeNames() if err != nil { - lbc.nodeQueue.requeue(key, err) - return + return err } if err := lbc.CloudClusterManager.instancePool.Sync(nodeNames); err != nil { - lbc.nodeQueue.requeue(key, err) + return err } - return + return nil } func nodeReady(node api.Node) bool { diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 9eb52a3ad..b7d78e088 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -85,9 +85,9 @@ func (e errorNodePortNotFound) Error() string { // invokes the given sync function for every work item inserted. type taskQueue struct { // queue is the work queue the worker polls - queue *workqueue.Type + queue workqueue.RateLimitingInterface // sync is called for each item in the queue - sync func(string) + sync func(string) error // workerDone is closed when the worker exits workerDone chan struct{} } @@ -106,11 +106,6 @@ func (t *taskQueue) enqueue(obj interface{}) { t.queue.Add(key) } -func (t *taskQueue) requeue(key string, err error) { - glog.Errorf("Requeuing %v, err %v", key, err) - t.queue.Add(key) -} - // worker processes work in the queue through sync. func (t *taskQueue) worker() { for { @@ -120,7 +115,12 @@ func (t *taskQueue) worker() { return } glog.V(3).Infof("Syncing %v", key) - t.sync(key.(string)) + if err := t.sync(key.(string)); err != nil { + glog.Errorf("Requeuing %v, err %v", key, err) + t.queue.AddRateLimited(key) + } else { + t.queue.Forget(key) + } t.queue.Done(key) } } @@ -133,9 +133,9 @@ func (t *taskQueue) shutdown() { // NewTaskQueue creates a new task queue with the given sync function. // The sync function is called for every element inserted into the queue. -func NewTaskQueue(syncFn func(string)) *taskQueue { +func NewTaskQueue(syncFn func(string) error) *taskQueue { return &taskQueue{ - queue: workqueue.New(), + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), sync: syncFn, workerDone: make(chan struct{}), }