Merge pull request #906 from aledbf/fix-race-condition

Fix race condition with closed channels
This commit is contained in:
Manuel Alejandro de Brito Fontes 2017-06-28 14:47:57 -04:00 committed by GitHub
commit 9af4fb573e

View file

@ -39,7 +39,7 @@ type Queue struct {
// sync is called for each item in the queue // sync is called for each item in the queue
sync func(interface{}) error sync func(interface{}) error
// workerDone is closed when the worker exits // workerDone is closed when the worker exits
workerDone chan struct{} workerDone chan bool
fn func(obj interface{}) (interface{}, error) fn func(obj interface{}) (interface{}, error)
} }
@ -79,7 +79,9 @@ func (t *Queue) worker() {
for { for {
key, quit := t.queue.Get() key, quit := t.queue.Get()
if quit { if quit {
close(t.workerDone) if !isClosed(t.workerDone) {
close(t.workerDone)
}
return return
} }
@ -95,6 +97,16 @@ func (t *Queue) worker() {
} }
} }
func isClosed(ch <-chan bool) bool {
select {
case <-ch:
return true
default:
}
return false
}
// Shutdown shuts down the work queue and waits for the worker to ACK // Shutdown shuts down the work queue and waits for the worker to ACK
func (t *Queue) Shutdown() { func (t *Queue) Shutdown() {
t.queue.ShutDown() t.queue.ShutDown()
@ -117,7 +129,7 @@ func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (in
q := &Queue{ q := &Queue{
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
sync: syncFn, sync: syncFn,
workerDone: make(chan struct{}), workerDone: make(chan bool),
fn: fn, fn: fn,
} }