ingress-nginx-helm/internal/task/queue.go

194 lines
5 KiB
Go
Raw Normal View History

/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package task
import (
"fmt"
"time"
2020-08-08 23:31:02 +00:00
"k8s.io/klog/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2017-04-01 14:39:42 +00:00
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
2017-09-25 21:53:03 +00:00
// Queue manages a time work queue through an independent worker that invokes the
// given sync function for every work item inserted.
// The queue uses an internal timestamp that allows the removal of certain elements
// which timestamp is older than the last successful get operation.
type Queue struct {
// queue is the work queue the worker polls
queue workqueue.TypedRateLimitingInterface[any]
// sync is called for each item in the queue
sync func(interface{}) error
// workerDone is closed when the worker exits
workerDone chan bool
// fn makes a key for an API object
fn func(obj interface{}) (interface{}, error)
// lastSync is the Unix epoch time of the last execution of 'sync'
2017-09-25 21:53:03 +00:00
lastSync int64
}
2017-09-27 23:53:31 +00:00
// Element represents one item of the queue
type Element struct {
Key interface{}
Timestamp int64
IsSkippable bool
}
// Run starts processing elements in the queue
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {
wait.Until(t.worker, period, stopCh)
}
// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
t.enqueue(obj, false)
}
// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
t.enqueue(obj, true)
}
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
2017-01-04 12:14:08 +00:00
if t.IsShuttingDown() {
2020-09-27 20:32:40 +00:00
klog.ErrorS(nil, "queue has been shutdown, failed to enqueue", "key", obj)
2017-01-04 12:14:08 +00:00
return
}
2017-09-25 21:53:03 +00:00
ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano()
}
2020-09-27 20:32:40 +00:00
klog.V(3).InfoS("queuing", "item", obj)
key, err := t.fn(obj)
if err != nil {
2020-09-27 20:32:40 +00:00
klog.ErrorS(err, "creating object key", "item", obj)
return
}
2017-09-27 23:53:31 +00:00
t.queue.Add(Element{
2017-09-25 21:53:03 +00:00
Key: key,
Timestamp: ts,
})
}
func (t *Queue) defaultKeyFunc(obj interface{}) (interface{}, error) {
key, err := keyFunc(obj)
if err != nil {
return "", fmt.Errorf("could not get key for object %+v: %v", obj, err)
}
return key, nil
}
// worker processes work in the queue through sync.
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
if quit {
if !isClosed(t.workerDone) {
close(t.workerDone)
}
return
}
2017-09-25 21:53:03 +00:00
ts := time.Now().UnixNano()
item, ok := key.(Element)
if !ok {
klog.ErrorS(nil, "invalid item type", "key", key)
}
if item.Timestamp != 0 && t.lastSync > item.Timestamp {
2020-09-27 20:32:40 +00:00
klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
2017-09-25 21:53:03 +00:00
t.queue.Forget(key)
t.queue.Done(key)
continue
}
2017-04-09 16:52:10 +00:00
2020-09-27 20:32:40 +00:00
klog.V(3).InfoS("syncing", "key", item.Key)
2017-04-27 01:52:04 +00:00
if err := t.sync(key); err != nil {
2020-09-27 20:32:40 +00:00
klog.ErrorS(err, "requeuing", "key", item.Key)
2017-09-27 23:53:31 +00:00
t.queue.AddRateLimited(Element{
2017-09-25 21:53:03 +00:00
Key: item.Key,
Timestamp: 0,
2017-09-25 21:53:03 +00:00
})
2017-04-27 01:52:04 +00:00
} else {
t.queue.Forget(key)
2017-09-25 21:53:03 +00:00
t.lastSync = ts
}
2017-04-27 01:52:04 +00:00
t.queue.Done(key)
}
}
func isClosed(ch <-chan bool) bool {
select {
case <-ch:
return true
default:
}
return false
}
// Shutdown shuts down the work queue and waits for the worker to ACK
func (t *Queue) Shutdown() {
t.queue.ShutDown()
<-t.workerDone
}
// IsShuttingDown returns if the method Shutdown was invoked
func (t *Queue) IsShuttingDown() bool {
return t.queue.ShuttingDown()
}
// NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue.
func NewTaskQueue(syncFn func(interface{}) error) *Queue {
return NewCustomTaskQueue(syncFn, nil)
}
// NewCustomTaskQueue creates a new custom task queue with the given sync function.
func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue {
q := &Queue{
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()),
sync: syncFn,
workerDone: make(chan bool),
fn: fn,
}
if fn == nil {
q.fn = q.defaultKeyFunc
}
return q
}
// GetDummyObject returns a valid object that can be used in the Queue
func GetDummyObject(name string) *metav1.ObjectMeta {
return &metav1.ObjectMeta{
Name: name,
}
}