Rate limit requeues on error

This commit is contained in:
Prashanth Balasubramanian 2016-06-27 20:06:35 -07:00
parent acf87ef9d5
commit 5db8389fb3
2 changed files with 24 additions and 36 deletions

View file

@ -260,31 +260,27 @@ func (lbc *LoadBalancerController) storesSynced() bool {
} }
// sync manages Ingress create/updates/deletes. // sync manages Ingress create/updates/deletes.
func (lbc *LoadBalancerController) sync(key string) { func (lbc *LoadBalancerController) sync(key string) (err error) {
if !lbc.hasSynced() { if !lbc.hasSynced() {
time.Sleep(storeSyncPollPeriod) time.Sleep(storeSyncPollPeriod)
lbc.ingQueue.requeue(key, fmt.Errorf("Waiting for stores to sync")) return fmt.Errorf("Waiting for stores to sync")
return
} }
glog.V(3).Infof("Syncing %v", key) glog.V(3).Infof("Syncing %v", key)
paths, err := lbc.ingLister.List() paths, err := lbc.ingLister.List()
if err != nil { if err != nil {
lbc.ingQueue.requeue(key, err) return err
return
} }
nodePorts := lbc.tr.toNodePorts(&paths) nodePorts := lbc.tr.toNodePorts(&paths)
lbNames := lbc.ingLister.Store.ListKeys() lbNames := lbc.ingLister.Store.ListKeys()
lbs, _ := lbc.ListRuntimeInfo() lbs, _ := lbc.ListRuntimeInfo()
nodeNames, err := lbc.getReadyNodeNames() nodeNames, err := lbc.getReadyNodeNames()
if err != nil { if err != nil {
lbc.ingQueue.requeue(key, err) return err
return
} }
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key) obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if err != nil { if err != nil {
lbc.ingQueue.requeue(key, err) return err
return
} }
// This performs a 2 phase checkpoint with the cloud: // This performs a 2 phase checkpoint with the cloud:
@ -299,8 +295,8 @@ func (lbc *LoadBalancerController) sync(key string) {
// don't have an associated Kubernetes Ingress/Service/Endpoint. // don't have an associated Kubernetes Ingress/Service/Endpoint.
defer func() { defer func() {
if err := lbc.CloudClusterManager.GC(lbNames, nodePorts); err != nil { if deferErr := lbc.CloudClusterManager.GC(lbNames, nodePorts); deferErr != nil {
lbc.ingQueue.requeue(key, err) err = fmt.Errorf("Error during sync %v, error during GC %v", err, deferErr)
} }
glog.V(3).Infof("Finished syncing %v", key) glog.V(3).Infof("Finished syncing %v", key)
}() }()
@ -323,16 +319,12 @@ func (lbc *LoadBalancerController) sync(key string) {
} }
if !ingExists { if !ingExists {
if syncError != nil { return syncError
lbc.ingQueue.requeue(key, err)
}
return
} }
// Update the UrlMap of the single loadbalancer that came through the watch. // Update the UrlMap of the single loadbalancer that came through the watch.
l7, err := lbc.CloudClusterManager.l7Pool.Get(key) l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
if err != nil { if err != nil {
lbc.ingQueue.requeue(key, fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err)) return fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err)
return
} }
ing := *obj.(*extensions.Ingress) ing := *obj.(*extensions.Ingress)
@ -345,10 +337,7 @@ func (lbc *LoadBalancerController) sync(key string) {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error()) lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err) syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
} }
if syncError != nil { return syncError
lbc.ingQueue.requeue(key, syncError)
}
return
} }
// updateIngressStatus updates the IP and annotations of a loadbalancer. // updateIngressStatus updates the IP and annotations of a loadbalancer.
@ -421,16 +410,15 @@ func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7Run
// syncNodes manages the syncing of kubernetes nodes to gce instance groups. // syncNodes manages the syncing of kubernetes nodes to gce instance groups.
// The instancegroups are referenced by loadbalancer backends. // The instancegroups are referenced by loadbalancer backends.
func (lbc *LoadBalancerController) syncNodes(key string) { func (lbc *LoadBalancerController) syncNodes(key string) error {
nodeNames, err := lbc.getReadyNodeNames() nodeNames, err := lbc.getReadyNodeNames()
if err != nil { if err != nil {
lbc.nodeQueue.requeue(key, err) return err
return
} }
if err := lbc.CloudClusterManager.instancePool.Sync(nodeNames); err != nil { if err := lbc.CloudClusterManager.instancePool.Sync(nodeNames); err != nil {
lbc.nodeQueue.requeue(key, err) return err
} }
return return nil
} }
func nodeReady(node api.Node) bool { func nodeReady(node api.Node) bool {

View file

@ -85,9 +85,9 @@ func (e errorNodePortNotFound) Error() string {
// invokes the given sync function for every work item inserted. // invokes the given sync function for every work item inserted.
type taskQueue struct { type taskQueue struct {
// queue is the work queue the worker polls // queue is the work queue the worker polls
queue *workqueue.Type queue workqueue.RateLimitingInterface
// sync is called for each item in the queue // sync is called for each item in the queue
sync func(string) sync func(string) error
// workerDone is closed when the worker exits // workerDone is closed when the worker exits
workerDone chan struct{} workerDone chan struct{}
} }
@ -106,11 +106,6 @@ func (t *taskQueue) enqueue(obj interface{}) {
t.queue.Add(key) t.queue.Add(key)
} }
func (t *taskQueue) requeue(key string, err error) {
glog.Errorf("Requeuing %v, err %v", key, err)
t.queue.Add(key)
}
// worker processes work in the queue through sync. // worker processes work in the queue through sync.
func (t *taskQueue) worker() { func (t *taskQueue) worker() {
for { for {
@ -120,7 +115,12 @@ func (t *taskQueue) worker() {
return return
} }
glog.V(3).Infof("Syncing %v", key) glog.V(3).Infof("Syncing %v", key)
t.sync(key.(string)) if err := t.sync(key.(string)); err != nil {
glog.Errorf("Requeuing %v, err %v", key, err)
t.queue.AddRateLimited(key)
} else {
t.queue.Forget(key)
}
t.queue.Done(key) t.queue.Done(key)
} }
} }
@ -133,9 +133,9 @@ func (t *taskQueue) shutdown() {
// NewTaskQueue creates a new task queue with the given sync function. // NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue. // The sync function is called for every element inserted into the queue.
func NewTaskQueue(syncFn func(string)) *taskQueue { func NewTaskQueue(syncFn func(string) error) *taskQueue {
return &taskQueue{ return &taskQueue{
queue: workqueue.New(), queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
sync: syncFn, sync: syncFn,
workerDone: make(chan struct{}), workerDone: make(chan struct{}),
} }