diff --git a/core/pkg/ingress/controller/backend_ssl.go b/core/pkg/ingress/controller/backend_ssl.go index 419b05744..4f87f475e 100644 --- a/core/pkg/ingress/controller/backend_ssl.go +++ b/core/pkg/ingress/controller/backend_ssl.go @@ -25,64 +25,52 @@ import ( "github.com/golang/glog" api "k8s.io/client-go/pkg/api/v1" - extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/ingress/core/pkg/ingress" - "k8s.io/ingress/core/pkg/ingress/annotations/parser" ssl "k8s.io/ingress/core/pkg/net/ssl" ) // syncSecret keeps in sync Secrets used by Ingress rules with the files on -// disk to allow being used in controllers. -func (ic *GenericController) syncSecret(k interface{}) error { - if ic.secretQueue.IsShuttingDown() { - return nil - } +// disk to allow copy of the content of the secret to disk to be used +// by external processes. +func (ic *GenericController) syncSecret() { + glog.V(3).Infof("starting syncing of secrets") + if !ic.controllersInSync() { time.Sleep(podStoreSyncedPollPeriod) - return fmt.Errorf("deferring sync till endpoints controller has synced") + glog.Warningf("deferring sync till endpoints controller has synced") + return } - var key string var cert *ingress.SSLCert var err error - key = k.(string) - - secObj, exists, err := ic.secrLister.Store.GetByKey(key) - if err != nil { - return fmt.Errorf("error getting secret %v: %v", key, err) - } - if !exists { - return fmt.Errorf("secret %v was not found", key) - } - sec := secObj.(*api.Secret) - if !ic.secrReferenced(sec.Name, sec.Namespace) { - glog.V(3).Infof("secret %v/%v is not used in Ingress rules. skipping ", sec.Namespace, sec.Name) - return nil - } - - cert, err = ic.getPemCertificate(key) - if err != nil { - return err - } - - // create certificates and add or update the item in the store - cur, exists := ic.sslCertTracker.Get(key) - if exists { - s := cur.(*ingress.SSLCert) - if reflect.DeepEqual(s, cert) { - // no need to update - return nil + keys := ic.secretTracker.List() + for _, k := range keys { + key := k.(string) + cert, err = ic.getPemCertificate(key) + if err != nil { + glog.Warningf("error obtaining PEM from secret %v: %v", key, err) + continue } - glog.Infof("updating secret %v/%v in the local store", sec.Namespace, sec.Name) - ic.sslCertTracker.Update(key, cert) - return nil + + // create certificates and add or update the item in the store + cur, exists := ic.sslCertTracker.Get(key) + if exists { + s := cur.(*ingress.SSLCert) + if reflect.DeepEqual(s, cert) { + // no need to update + continue + } + glog.Infof("updating secret %v in the local store", key) + ic.sslCertTracker.Update(key, cert) + continue + } + + glog.Infof("adding secret %v to the local store", key) + ic.sslCertTracker.Add(key, cert) } - glog.Infof("adding secret %v/%v to the local store", sec.Namespace, sec.Name) - ic.sslCertTracker.Add(key, cert) - return nil } // getPemCertificate receives a secret, and creates a ingress.SSLCert as return. @@ -106,10 +94,10 @@ func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLC var s *ingress.SSLCert if okcert && okkey { - glog.Infof("found certificate and private key, configuring %v as a TLS Secret", secretName) + glog.V(3).Infof("found certificate and private key, configuring %v as a TLS Secret", secretName) s, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca) } else if ca != nil { - glog.Infof("found only ca.crt, configuring %v as an Certificate Authentication secret", secretName) + glog.V(3).Infof("found only ca.crt, configuring %v as an Certificate Authentication secret", secretName) s, err = ssl.AddCertAuth(nsSecName, ca) } else { return nil, fmt.Errorf("no keypair or CA cert could be found in %v", secretName) @@ -124,30 +112,6 @@ func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLC return s, nil } -// secrReferenced checks if a secret is referenced or not by one or more Ingress rules -func (ic *GenericController) secrReferenced(name, namespace string) bool { - for _, ingIf := range ic.ingLister.Store.List() { - ing := ingIf.(*extensions.Ingress) - - if ic.annotations.ContainsCertificateAuth(ing) { - str, _ := parser.GetStringAnnotation("ingress.kubernetes.io/auth-tls-secret", ing) - if str == fmt.Sprintf("%v/%v", namespace, name) { - return true - } - } - - if ing.Namespace != namespace { - continue - } - for _, tls := range ing.Spec.TLS { - if tls.SecretName == name { - return true - } - } - } - return false -} - // sslCertTracker holds a store of referenced Secrets in Ingress rules type sslCertTracker struct { cache.ThreadSafeStore @@ -158,3 +122,14 @@ func newSSLCertTracker() *sslCertTracker { cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), } } + +// secretTracker holds a store of Secrets +type secretTracker struct { + cache.ThreadSafeStore +} + +func newSecretTracker() *secretTracker { + return &secretTracker{ + cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), + } +} diff --git a/core/pkg/ingress/controller/controller.go b/core/pkg/ingress/controller/controller.go index 9f803625d..7bb21a57a 100644 --- a/core/pkg/ingress/controller/controller.go +++ b/core/pkg/ingress/controller/controller.go @@ -27,10 +27,10 @@ import ( "time" "github.com/golang/glog" - "github.com/kylelemons/godebug/pretty" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" def_api "k8s.io/client-go/pkg/api" @@ -43,6 +43,7 @@ import ( "k8s.io/ingress/core/pkg/ingress" "k8s.io/ingress/core/pkg/ingress/annotations/class" "k8s.io/ingress/core/pkg/ingress/annotations/healthcheck" + "k8s.io/ingress/core/pkg/ingress/annotations/parser" "k8s.io/ingress/core/pkg/ingress/annotations/proxy" "k8s.io/ingress/core/pkg/ingress/annotations/service" "k8s.io/ingress/core/pkg/ingress/defaults" @@ -96,9 +97,8 @@ type GenericController struct { // local store of SSL certificates // (only certificates used in ingress) sslCertTracker *sslCertTracker - // TaskQueue in charge of keep the secrets referenced from Ingress - // in sync with the files on disk - secretQueue *task.Queue + // store of secret names referenced from Ingress + secretTracker *secretTracker syncRateLimiter flowcontrol.RateLimiter @@ -154,10 +154,10 @@ func newIngressController(config *Configuration) *GenericController { Component: "ingress-controller", }), sslCertTracker: newSSLCertTracker(), + secretTracker: newSecretTracker(), } - ic.syncQueue = task.NewTaskQueue(ic.sync) - ic.secretQueue = task.NewTaskQueue(ic.syncSecret) + ic.syncQueue = task.NewTaskQueue(ic.syncIngress) // from here to the end of the method all the code is just boilerplate // required to watch Ingress, Secrets, ConfigMaps and Endoints. @@ -171,12 +171,7 @@ func newIngressController(config *Configuration) *GenericController { } ic.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) ic.syncQueue.Enqueue(obj) - if ic.annotations.ContainsCertificateAuth(addIng) { - s, err := ic.annotations.CertificateAuthSecret(addIng) - if err == nil { - ic.secretQueue.Enqueue(s) - } - } + ic.extractSecretNames(addIng) }, DeleteFunc: func(obj interface{}) { delIng := obj.(*extensions.Ingress) @@ -198,57 +193,17 @@ func newIngressController(config *Configuration) *GenericController { if !reflect.DeepEqual(old, cur) { upIng := cur.(*extensions.Ingress) ic.recorder.Eventf(upIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", upIng.Namespace, upIng.Name)) - // the referenced secret is different? - if diff := pretty.Compare(curIng.Spec.TLS, oldIng.Spec.TLS); diff != "" { - for _, secretName := range curIng.Spec.TLS { - secKey := "" - if secretName.SecretName != "" { - secKey = fmt.Sprintf("%v/%v", curIng.Namespace, secretName.SecretName) - } - glog.Infof("TLS section in ingress %v/%v changed (secret is now \"%v\")", upIng.Namespace, upIng.Name, secKey) - // default cert is already queued - if secKey != "" { - go func() { - // we need to wait until the ingress store is updated - time.Sleep(10 * time.Second) - key, err := ic.GetSecret(secKey) - if err != nil { - glog.Errorf("unexpected error: %v", err) - } - if key != nil { - ic.secretQueue.Enqueue(key) - } - }() - } - } - } - if ic.annotations.ContainsCertificateAuth(upIng) { - s, err := ic.annotations.CertificateAuthSecret(upIng) - if err == nil { - ic.secretQueue.Enqueue(s) - } - } - ic.syncQueue.Enqueue(cur) + ic.extractSecretNames(upIng) } }, } secrEventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - sec := obj.(*api.Secret) - ic.secretQueue.Enqueue(sec) - }, DeleteFunc: func(obj interface{}) { sec := obj.(*api.Secret) ic.sslCertTracker.Delete(fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)) }, - UpdateFunc: func(old, cur interface{}) { - if !reflect.DeepEqual(old, cur) { - sec := cur.(*api.Secret) - ic.secretQueue.Enqueue(sec) - } - }, } eventHandler := cache.ResourceEventHandlerFuncs{ @@ -391,7 +346,7 @@ func (ic *GenericController) getConfigMap(ns, name string) (*api.ConfigMap, erro // sync collects all the pieces required to assemble the configuration file and // then sends the content to the backend (OnUpdate) receiving the populated // template as response reloading the backend if is required. -func (ic *GenericController) sync(key interface{}) error { +func (ic *GenericController) syncIngress(key interface{}) error { ic.syncRateLimiter.Accept() if ic.syncQueue.IsShuttingDown() { @@ -735,13 +690,10 @@ func (ic *GenericController) getBackendServers() ([]*ingress.Backend, []*ingress // GetAuthCertificate ... func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.AuthSSLCert, error) { - key, err := ic.GetSecret(secretName) + _, err := ic.GetSecret(secretName) if err != nil { return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err) } - if key != nil { - ic.secretQueue.Enqueue(key) - } bc, exists := ic.sslCertTracker.Get(secretName) if !exists { @@ -1121,6 +1073,27 @@ func (ic *GenericController) getEndpoints( return upsServers } +// extractSecretNames extracts information about secrets inside the Ingress rule +func (ic GenericController) extractSecretNames(ing *extensions.Ingress) { + if ic.annotations.ContainsCertificateAuth(ing) { + key, _ := parser.GetStringAnnotation("ingress.kubernetes.io/auth-tls-secret", ing) + if key != "" { + _, exists := ic.secretTracker.Get(key) + if !exists { + ic.secretTracker.Add(key, key) + } + } + } + + for _, tls := range ing.Spec.TLS { + key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) + _, exists := ic.secretTracker.Get(key) + if !exists { + ic.secretTracker.Add(key, key) + } + } +} + // Stop stops the loadbalancer controller. func (ic GenericController) Stop() error { ic.stopLock.Lock() @@ -1131,7 +1104,6 @@ func (ic GenericController) Stop() error { glog.Infof("shutting down controller queues") close(ic.stopCh) go ic.syncQueue.Shutdown() - go ic.secretQueue.Shutdown() if ic.syncStatus != nil { ic.syncStatus.Shutdown() } @@ -1152,9 +1124,10 @@ func (ic GenericController) Start() { go ic.secrController.Run(ic.stopCh) go ic.mapController.Run(ic.stopCh) - go ic.secretQueue.Run(5*time.Second, ic.stopCh) go ic.syncQueue.Run(5*time.Second, ic.stopCh) + go wait.Forever(ic.syncSecret, 10*time.Second) + if ic.syncStatus != nil { go ic.syncStatus.Run(ic.stopCh) } diff --git a/core/pkg/task/queue.go b/core/pkg/task/queue.go index a6786a622..9c5d64719 100644 --- a/core/pkg/task/queue.go +++ b/core/pkg/task/queue.go @@ -82,15 +82,17 @@ func (t *Queue) worker() { close(t.workerDone) return } + defer t.queue.Done(key) + glog.V(3).Infof("syncing %v", key) - if err := t.sync(key); err != nil { - glog.Warningf("requeuing %v, err %v", key, err) - t.queue.AddRateLimited(key) - } else { + err := t.sync(key) + if err == nil { t.queue.Forget(key) + return } - t.queue.Done(key) + glog.Warningf("requeuing %v, err %v", key, err) + t.queue.AddRateLimited(key) } } diff --git a/core/pkg/task/queue_test.go b/core/pkg/task/queue_test.go index 5d2cf2f41..e9b1fab86 100644 --- a/core/pkg/task/queue_test.go +++ b/core/pkg/task/queue_test.go @@ -65,7 +65,7 @@ func TestEnqueueSuccess(t *testing.T) { q := NewCustomTaskQueue(mockSynFn, mockKeyFn) stopCh := make(chan struct{}) // run queue - go q.Run(10*time.Second, stopCh) + go q.Run(5*time.Second, stopCh) // mock object whichi will be enqueue mo := mockEnqueueObj{ k: "testKey", @@ -88,7 +88,7 @@ func TestEnqueueFailed(t *testing.T) { q := NewCustomTaskQueue(mockSynFn, mockKeyFn) stopCh := make(chan struct{}) // run queue - go q.Run(10*time.Second, stopCh) + go q.Run(5*time.Second, stopCh) // mock object whichi will be enqueue mo := mockEnqueueObj{ k: "testKey", @@ -114,7 +114,7 @@ func TestEnqueueKeyError(t *testing.T) { q := NewCustomTaskQueue(mockSynFn, mockErrorKeyFn) stopCh := make(chan struct{}) // run queue - go q.Run(10*time.Second, stopCh) + go q.Run(5*time.Second, stopCh) // mock object whichi will be enqueue mo := mockEnqueueObj{ k: "testKey",