From a153187ce712232618f208dd8e3a47a1993406d4 Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Wed, 22 Jun 2016 17:48:13 -0400 Subject: [PATCH] Use delayed queue --- controllers/nginx/controller.go | 25 ++++++++++++------------- controllers/nginx/nginx/command.go | 13 ++++++++----- controllers/nginx/utils.go | 21 +++++++++++++-------- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/controllers/nginx/controller.go b/controllers/nginx/controller.go index 823bd2861..0e039def9 100644 --- a/controllers/nginx/controller.go +++ b/controllers/nginx/controller.go @@ -414,11 +414,10 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) (map[stri return namedPorts, nil } -func (lbc *loadBalancerController) sync(key string) { +func (lbc *loadBalancerController) sync(key string) error { if !lbc.controllersInSync() { time.Sleep(podStoreSyncedPollPeriod) - lbc.syncQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced")) - return + return fmt.Errorf("deferring sync till endpoints controller has synced") } var cfg *api.ConfigMap @@ -435,7 +434,7 @@ func (lbc *loadBalancerController) sync(key string) { ings := lbc.ingLister.Store.List() upstreams, servers := lbc.getUpstreamServers(ngxConfig, ings) - lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{ + return lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{ Upstreams: upstreams, Servers: servers, TCPUpstreams: lbc.getTCPServices(), @@ -443,21 +442,20 @@ func (lbc *loadBalancerController) sync(key string) { }) } -func (lbc *loadBalancerController) updateIngressStatus(key string) { +func (lbc *loadBalancerController) updateIngressStatus(key string) error { if !lbc.controllersInSync() { time.Sleep(podStoreSyncedPollPeriod) - lbc.ingQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced")) - return + return fmt.Errorf("deferring sync till endpoints controller has synced") } obj, ingExists, err := lbc.ingLister.Store.GetByKey(key) if err != nil { - lbc.ingQueue.requeue(key, err) - return + return err } if !ingExists { - return + // TODO: what's the correct behavior here? + return nil } ing := obj.(*extensions.Ingress) @@ -466,8 +464,7 @@ func (lbc *loadBalancerController) updateIngressStatus(key string) { currIng, err := ingClient.Get(ing.Name) if err != nil { - glog.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err) - return + return fmt.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err) } lbIPs := ing.Status.LoadBalancer.Ingress @@ -478,11 +475,13 @@ func (lbc *loadBalancerController) updateIngressStatus(key string) { }) if _, err := ingClient.UpdateStatus(currIng); err != nil { lbc.recorder.Eventf(currIng, api.EventTypeWarning, "UPDATE", "error: %v", err) - return + return err } lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", lbc.podInfo.NodeIP) } + + return nil } func (lbc *loadBalancerController) isStatusIPDefined(lbings []api.LoadBalancerIngress) bool { diff --git a/controllers/nginx/nginx/command.go b/controllers/nginx/nginx/command.go index 0e5ac987a..9ea331194 100644 --- a/controllers/nginx/nginx/command.go +++ b/controllers/nginx/nginx/command.go @@ -56,7 +56,7 @@ func (ngx *Manager) Start() { // shut down, stop accepting new connections and continue to service current requests // until all such requests are serviced. After that, the old worker processes exit. // http://nginx.org/en/docs/beginners_guide.html#control -func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressConfig) { +func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressConfig) error { ngx.reloadRateLimiter.Accept() ngx.reloadLock.Lock() @@ -65,15 +65,18 @@ func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressC newCfg, err := ngx.writeCfg(cfg, ingressCfg) if err != nil { - glog.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err) - return + return fmt.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err) } if newCfg { - if err := ngx.shellOut("nginx -s reload"); err == nil { - glog.Info("change in configuration detected. Reloading...") + if err := ngx.shellOut("nginx -s reload"); err != nil { + return fmt.Errorf("error reloading nginx: %v", err) } + + glog.Info("change in configuration detected. Reloading...") } + + return nil } // shellOut executes a command and returns its combined standard output and standard diff --git a/controllers/nginx/utils.go b/controllers/nginx/utils.go index d256d129b..b4c60a819 100644 --- a/controllers/nginx/utils.go +++ b/controllers/nginx/utils.go @@ -51,9 +51,9 @@ type StoreToConfigmapLister struct { // invokes the given sync function for every work item inserted. type taskQueue struct { // queue is the work queue the worker polls - queue *workqueue.Type + queue workqueue.RateLimitingInterface // sync is called for each item in the queue - sync func(string) + sync func(string) error // workerDone is closed when the worker exits workerDone chan struct{} } @@ -72,9 +72,8 @@ func (t *taskQueue) enqueue(obj interface{}) { t.queue.Add(key) } -func (t *taskQueue) requeue(key string, err error) { - glog.V(3).Infof("requeuing %v, err %v", key, err) - t.queue.Add(key) +func (t *taskQueue) requeue(key string) { + t.queue.AddRateLimited(key) } // worker processes work in the queue through sync. @@ -86,7 +85,13 @@ func (t *taskQueue) worker() { return } glog.V(3).Infof("syncing %v", key) - t.sync(key.(string)) + if err := t.sync(key.(string)); err != nil { + glog.V(3).Infof("requeuing %v, err %v", key, err) + t.requeue(key.(string)) + } else { + t.queue.Forget(key) + } + t.queue.Done(key) } } @@ -99,9 +104,9 @@ func (t *taskQueue) shutdown() { // NewTaskQueue creates a new task queue with the given sync function. // The sync function is called for every element inserted into the queue. -func NewTaskQueue(syncFn func(string)) *taskQueue { +func NewTaskQueue(syncFn func(string) error) *taskQueue { return &taskQueue{ - queue: workqueue.New(), + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), sync: syncFn, workerDone: make(chan struct{}), }