Use a ring channel to avoid blocking write of events

This commit is contained in:
Manuel de Brito Fontes 2018-02-13 14:54:06 -03:00
parent 33475b7184
commit 0313630862
3 changed files with 30 additions and 24 deletions

View file

@ -33,6 +33,7 @@ import (
"github.com/golang/glog"
proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/kubernetes/scheme"
@ -106,7 +107,7 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
}),
stopCh: make(chan struct{}),
updateCh: make(chan store.Event, 1024),
updateCh: channels.NewRingChannel(4096),
stopLock: &sync.Mutex{},
@ -209,7 +210,7 @@ type NGINXController struct {
stopLock *sync.Mutex
stopCh chan struct{}
updateCh chan store.Event
updateCh *channels.RingChannel
// ngxErrCh channel used to detect errors with the nginx processes
ngxErrCh chan error
@ -290,16 +291,20 @@ func (n *NGINXController) Start() {
// start a new nginx master process if the controller is not being stopped
n.start(cmd)
}
case evt := <-n.updateCh:
case event := <-n.updateCh.Out():
if n.isShuttingDown {
break
}
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
n.SetForceReload(true)
}
if evt, ok := event.(store.Event); ok {
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
n.SetForceReload(true)
}
n.syncQueue.Enqueue(evt.Obj)
n.syncQueue.Enqueue(evt.Obj)
} else {
glog.Warningf("unexpected event type received %T", event)
}
case <-n.stopCh:
break
}

View file

@ -226,7 +226,7 @@ func (s k8sStore) ReadSecrets(ing *extensions.Ingress) {
// sendDummyEvent sends a dummy event to trigger an update
// This is used in when a secret change
func (s *k8sStore) sendDummyEvent() {
s.updateCh <- Event{
s.updateCh.In() <- Event{
Type: UpdateEvent,
Obj: &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{

View file

@ -24,6 +24,7 @@ import (
"sync"
"time"
"github.com/eapache/channels"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
@ -194,7 +195,7 @@ type k8sStore struct {
filesystem file.Filesystem
// updateCh
updateCh chan Event
updateCh *channels.RingChannel
// mu mutex used to avoid simultaneous incovations to syncSecret
mu *sync.Mutex
@ -208,7 +209,7 @@ func New(checkOCSP bool,
resyncPeriod time.Duration,
client clientset.Interface,
fs file.Filesystem,
updateCh chan Event) Storer {
updateCh *channels.RingChannel) Storer {
store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
@ -246,7 +247,7 @@ func New(checkOCSP bool,
store.extractAnnotations(addIng)
recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
updateCh <- Event{
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
@ -272,7 +273,7 @@ func New(checkOCSP bool,
}
recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
store.listers.IngressAnnotation.Delete(delIng)
updateCh <- Event{
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
@ -293,7 +294,7 @@ func New(checkOCSP bool,
}
store.extractAnnotations(curIng)
updateCh <- Event{
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
@ -312,7 +313,7 @@ func New(checkOCSP bool,
_, err := store.GetLocalSecret(k8s.MetaNamespaceKey(sec))
if err == nil {
store.syncSecret(key)
updateCh <- Event{
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
@ -323,7 +324,7 @@ func New(checkOCSP bool,
store.extractAnnotations(ing)
}
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
@ -346,7 +347,7 @@ func New(checkOCSP bool,
}
}
store.sslStore.Delete(k8s.MetaNamespaceKey(sec))
updateCh <- Event{
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
@ -362,7 +363,7 @@ func New(checkOCSP bool,
}
}
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: sec,
}
@ -372,13 +373,13 @@ func New(checkOCSP bool,
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh <- Event{
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
updateCh <- Event{
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
@ -387,7 +388,7 @@ func New(checkOCSP bool,
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
updateCh <- Event{
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
@ -402,7 +403,7 @@ func New(checkOCSP bool,
if mapKey == configmap {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
store.setConfig(m)
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: obj,
}
@ -415,7 +416,7 @@ func New(checkOCSP bool,
if mapKey == configmap {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
store.setConfig(m)
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
@ -423,7 +424,7 @@ func New(checkOCSP bool,
// updates to configuration configmaps can trigger an update
if mapKey == tcp || mapKey == udp {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
updateCh <- Event{
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}