Use delayed queue

This commit is contained in:
Manuel de Brito Fontes 2016-06-22 17:48:13 -04:00
parent 72fe8dc293
commit a153187ce7
3 changed files with 33 additions and 26 deletions

View file

@ -414,11 +414,10 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) (map[stri
return namedPorts, nil return namedPorts, nil
} }
func (lbc *loadBalancerController) sync(key string) { func (lbc *loadBalancerController) sync(key string) error {
if !lbc.controllersInSync() { if !lbc.controllersInSync() {
time.Sleep(podStoreSyncedPollPeriod) time.Sleep(podStoreSyncedPollPeriod)
lbc.syncQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced")) return fmt.Errorf("deferring sync till endpoints controller has synced")
return
} }
var cfg *api.ConfigMap var cfg *api.ConfigMap
@ -435,7 +434,7 @@ func (lbc *loadBalancerController) sync(key string) {
ings := lbc.ingLister.Store.List() ings := lbc.ingLister.Store.List()
upstreams, servers := lbc.getUpstreamServers(ngxConfig, ings) upstreams, servers := lbc.getUpstreamServers(ngxConfig, ings)
lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{ return lbc.nginx.CheckAndReload(ngxConfig, nginx.IngressConfig{
Upstreams: upstreams, Upstreams: upstreams,
Servers: servers, Servers: servers,
TCPUpstreams: lbc.getTCPServices(), TCPUpstreams: lbc.getTCPServices(),
@ -443,21 +442,20 @@ func (lbc *loadBalancerController) sync(key string) {
}) })
} }
func (lbc *loadBalancerController) updateIngressStatus(key string) { func (lbc *loadBalancerController) updateIngressStatus(key string) error {
if !lbc.controllersInSync() { if !lbc.controllersInSync() {
time.Sleep(podStoreSyncedPollPeriod) time.Sleep(podStoreSyncedPollPeriod)
lbc.ingQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced")) return fmt.Errorf("deferring sync till endpoints controller has synced")
return
} }
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key) obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if err != nil { if err != nil {
lbc.ingQueue.requeue(key, err) return err
return
} }
if !ingExists { if !ingExists {
return // TODO: what's the correct behavior here?
return nil
} }
ing := obj.(*extensions.Ingress) ing := obj.(*extensions.Ingress)
@ -466,8 +464,7 @@ func (lbc *loadBalancerController) updateIngressStatus(key string) {
currIng, err := ingClient.Get(ing.Name) currIng, err := ingClient.Get(ing.Name)
if err != nil { if err != nil {
glog.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err) return fmt.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
return
} }
lbIPs := ing.Status.LoadBalancer.Ingress lbIPs := ing.Status.LoadBalancer.Ingress
@ -478,11 +475,13 @@ func (lbc *loadBalancerController) updateIngressStatus(key string) {
}) })
if _, err := ingClient.UpdateStatus(currIng); err != nil { if _, err := ingClient.UpdateStatus(currIng); err != nil {
lbc.recorder.Eventf(currIng, api.EventTypeWarning, "UPDATE", "error: %v", err) lbc.recorder.Eventf(currIng, api.EventTypeWarning, "UPDATE", "error: %v", err)
return return err
} }
lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", lbc.podInfo.NodeIP) lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", lbc.podInfo.NodeIP)
} }
return nil
} }
func (lbc *loadBalancerController) isStatusIPDefined(lbings []api.LoadBalancerIngress) bool { func (lbc *loadBalancerController) isStatusIPDefined(lbings []api.LoadBalancerIngress) bool {

View file

@ -56,7 +56,7 @@ func (ngx *Manager) Start() {
// shut down, stop accepting new connections and continue to service current requests // shut down, stop accepting new connections and continue to service current requests
// until all such requests are serviced. After that, the old worker processes exit. // until all such requests are serviced. After that, the old worker processes exit.
// http://nginx.org/en/docs/beginners_guide.html#control // http://nginx.org/en/docs/beginners_guide.html#control
func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressConfig) { func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressConfig) error {
ngx.reloadRateLimiter.Accept() ngx.reloadRateLimiter.Accept()
ngx.reloadLock.Lock() ngx.reloadLock.Lock()
@ -65,15 +65,18 @@ func (ngx *Manager) CheckAndReload(cfg config.Configuration, ingressCfg IngressC
newCfg, err := ngx.writeCfg(cfg, ingressCfg) newCfg, err := ngx.writeCfg(cfg, ingressCfg)
if err != nil { if err != nil {
glog.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err) return fmt.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err)
return
} }
if newCfg { if newCfg {
if err := ngx.shellOut("nginx -s reload"); err == nil { if err := ngx.shellOut("nginx -s reload"); err != nil {
glog.Info("change in configuration detected. Reloading...") return fmt.Errorf("error reloading nginx: %v", err)
} }
glog.Info("change in configuration detected. Reloading...")
} }
return nil
} }
// shellOut executes a command and returns its combined standard output and standard // shellOut executes a command and returns its combined standard output and standard

View file

@ -51,9 +51,9 @@ type StoreToConfigmapLister struct {
// invokes the given sync function for every work item inserted. // invokes the given sync function for every work item inserted.
type taskQueue struct { type taskQueue struct {
// queue is the work queue the worker polls // queue is the work queue the worker polls
queue *workqueue.Type queue workqueue.RateLimitingInterface
// sync is called for each item in the queue // sync is called for each item in the queue
sync func(string) sync func(string) error
// workerDone is closed when the worker exits // workerDone is closed when the worker exits
workerDone chan struct{} workerDone chan struct{}
} }
@ -72,9 +72,8 @@ func (t *taskQueue) enqueue(obj interface{}) {
t.queue.Add(key) t.queue.Add(key)
} }
func (t *taskQueue) requeue(key string, err error) { func (t *taskQueue) requeue(key string) {
glog.V(3).Infof("requeuing %v, err %v", key, err) t.queue.AddRateLimited(key)
t.queue.Add(key)
} }
// worker processes work in the queue through sync. // worker processes work in the queue through sync.
@ -86,7 +85,13 @@ func (t *taskQueue) worker() {
return return
} }
glog.V(3).Infof("syncing %v", key) glog.V(3).Infof("syncing %v", key)
t.sync(key.(string)) if err := t.sync(key.(string)); err != nil {
glog.V(3).Infof("requeuing %v, err %v", key, err)
t.requeue(key.(string))
} else {
t.queue.Forget(key)
}
t.queue.Done(key) t.queue.Done(key)
} }
} }
@ -99,9 +104,9 @@ func (t *taskQueue) shutdown() {
// NewTaskQueue creates a new task queue with the given sync function. // NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue. // The sync function is called for every element inserted into the queue.
func NewTaskQueue(syncFn func(string)) *taskQueue { func NewTaskQueue(syncFn func(string) error) *taskQueue {
return &taskQueue{ return &taskQueue{
queue: workqueue.New(), queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
sync: syncFn, sync: syncFn,
workerDone: make(chan struct{}), workerDone: make(chan struct{}),
} }