diff --git a/controllers/nginx/pkg/template/template_test.go b/controllers/nginx/pkg/template/template_test.go index 9bed3d3ac..55dde5627 100644 --- a/controllers/nginx/pkg/template/template_test.go +++ b/controllers/nginx/pkg/template/template_test.go @@ -322,7 +322,7 @@ func TestBuildNextUpstream(t *testing.T) { } func TestBuildRateLimit(t *testing.T) { - loc := ingress.Location{} + loc := &ingress.Location{} loc.RateLimit.Connections.Name = "con" loc.RateLimit.Connections.Limit = 1 diff --git a/core/pkg/ingress/controller/backend_ssl.go b/core/pkg/ingress/controller/backend_ssl.go index b07647f13..6fbaf303b 100644 --- a/core/pkg/ingress/controller/backend_ssl.go +++ b/core/pkg/ingress/controller/backend_ssl.go @@ -55,19 +55,17 @@ func (ic *GenericController) syncSecret(key string) { } glog.Infof("updating secret %v in the local store", key) ic.sslCertTracker.Update(key, cert) - // we need to force the sync of the secret to disk - ic.syncSecret(key) // this update must trigger an update // (like an update event from a change in Ingress) - ic.syncIngress("update-secret") + ic.syncQueue.Enqueue(&extensions.Ingress{}) return } glog.Infof("adding secret %v to the local store", key) ic.sslCertTracker.Add(key, cert) - // this new secret must trigger an update + // this update must trigger an update // (like an update event from a change in Ingress) - ic.syncIngress("add-secret") + ic.syncQueue.Enqueue(&extensions.Ingress{}) } // getPemCertificate receives a secret, and creates a ingress.SSLCert as return. @@ -130,34 +128,32 @@ func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLC // to a secret that is not present in the local secret store. // In this case we call syncSecret. func (ic *GenericController) checkMissingSecrets() { - for _, key := range ic.listers.Ingress.ListKeys() { - if obj, exists, _ := ic.listers.Ingress.GetByKey(key); exists { - ing := obj.(*extensions.Ingress) + for _, obj := range ic.listers.Ingress.List() { + ing := obj.(*extensions.Ingress) - if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { - continue - } - - for _, tls := range ing.Spec.TLS { - if tls.SecretName == "" { - continue - } - - key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) - if _, ok := ic.sslCertTracker.Get(key); !ok { - ic.syncSecret(key) - } - } - - key, _ := parser.GetStringAnnotation("ingress.kubernetes.io/auth-tls-secret", ing) - if key == "" { + if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { + continue + } + + for _, tls := range ing.Spec.TLS { + if tls.SecretName == "" { continue } + key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) if _, ok := ic.sslCertTracker.Get(key); !ok { ic.syncSecret(key) } } + + key, _ := parser.GetStringAnnotation("ingress.kubernetes.io/auth-tls-secret", ing) + if key == "" { + continue + } + + if _, ok := ic.sslCertTracker.Get(key); !ok { + ic.syncSecret(key) + } } } diff --git a/core/pkg/ingress/controller/backend_ssl_test.go b/core/pkg/ingress/controller/backend_ssl_test.go index e2386b1a5..a6416b61b 100644 --- a/core/pkg/ingress/controller/backend_ssl_test.go +++ b/core/pkg/ingress/controller/backend_ssl_test.go @@ -26,9 +26,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testclient "k8s.io/client-go/kubernetes/fake" cache_client "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/flowcontrol" "k8s.io/ingress/core/pkg/ingress" "k8s.io/ingress/core/pkg/ingress/store" + "k8s.io/ingress/core/pkg/task" "k8s.io/kubernetes/pkg/api" ) @@ -102,21 +104,17 @@ func buildControllerForBackendSSL() cache_client.Controller { } func buildGenericControllerForBackendSSL() *GenericController { - return &GenericController{ + gc := &GenericController{ + syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1), cfg: &Configuration{ Client: buildSimpleClientSetForBackendSSL(), }, - listers: buildListers(), - - ingController: buildControllerForBackendSSL(), - endpController: buildControllerForBackendSSL(), - svcController: buildControllerForBackendSSL(), - nodeController: buildControllerForBackendSSL(), - secrController: buildControllerForBackendSSL(), - mapController: buildControllerForBackendSSL(), - + listers: buildListers(), sslCertTracker: newSSLCertTracker(), } + + gc.syncQueue = task.NewTaskQueue(gc.syncIngress) + return gc } func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) { diff --git a/core/pkg/ingress/controller/controller.go b/core/pkg/ingress/controller/controller.go index e098d3566..0ec872e3f 100644 --- a/core/pkg/ingress/controller/controller.go +++ b/core/pkg/ingress/controller/controller.go @@ -34,13 +34,11 @@ import ( extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" @@ -80,14 +78,8 @@ var ( type GenericController struct { cfg *Configuration - ingController cache.Controller - endpController cache.Controller - svcController cache.Controller - nodeController cache.Controller - secrController cache.Controller - mapController cache.Controller - - listers *ingress.StoreLister + listers *ingress.StoreLister + cacheController *cacheController annotations annotationExtractor @@ -166,12 +158,11 @@ func newIngressController(config *Configuration) *GenericController { Component: "ingress-controller", }), sslCertTracker: newSSLCertTracker(), - listers: &ingress.StoreLister{}, } ic.syncQueue = task.NewTaskQueue(ic.syncIngress) - ic.createListers(config.DisableNodeList) + ic.listers, ic.cacheController = ic.createListers(config.DisableNodeList) if config.UpdateStatus { ic.syncStatus = status.NewStatusSyncer(status.Config{ @@ -684,23 +675,23 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress) } // GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret -func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.AuthSSLCert, error) { - if _, exists := ic.sslCertTracker.Get(secretName); !exists { - ic.syncSecret(secretName) +func (ic GenericController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) { + if _, exists := ic.sslCertTracker.Get(name); !exists { + ic.syncSecret(name) } - _, err := ic.listers.Secret.GetByName(secretName) + _, err := ic.listers.Secret.GetByName(name) if err != nil { return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err) } - bc, exists := ic.sslCertTracker.Get(secretName) + bc, exists := ic.sslCertTracker.Get(name) if !exists { - return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", secretName) + return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name) } cert := bc.(*ingress.SSLCert) return &resolver.AuthSSLCert{ - Secret: secretName, + Secret: name, CAFileName: cert.CAFileName, PemSHA: cert.PemSHA, }, nil @@ -1213,39 +1204,33 @@ func (ic GenericController) Stop() error { func (ic *GenericController) Start() { glog.Infof("starting Ingress controller") - go ic.ingController.Run(ic.stopCh) - go ic.endpController.Run(ic.stopCh) - go ic.svcController.Run(ic.stopCh) - go ic.nodeController.Run(ic.stopCh) - go ic.secrController.Run(ic.stopCh) - go ic.mapController.Run(ic.stopCh) - - go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh) - - // Wait for all involved caches to be synced, before processing items from the queue is started - if !cache.WaitForCacheSync(ic.stopCh, - ic.ingController.HasSynced, - ic.svcController.HasSynced, - ic.endpController.HasSynced, - ic.secrController.HasSynced, - ic.mapController.HasSynced, - ic.nodeController.HasSynced, - ) { - runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) - } - - // initial sync of secrets to avoid unnecessary reloads - ic.checkMissingSecrets() + ic.cacheController.Run(ic.stopCh) createDefaultSSLCertificate() + time.Sleep(5 * time.Second) + // initial sync of secrets to avoid unnecessary reloads + glog.Info("running initial sync of secret") + for _, obj := range ic.listers.Ingress.List() { + ing := obj.(*extensions.Ingress) + + if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { + a, _ := parser.GetStringAnnotation(class.IngressKey, ing) + glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a) + continue + } + + ic.readSecrets(ing) + } + go ic.syncQueue.Run(time.Second, ic.stopCh) if ic.syncStatus != nil { go ic.syncStatus.Run(ic.stopCh) } - time.Sleep(5 * time.Second) + go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh) + // force initial sync ic.syncQueue.Enqueue(&extensions.Ingress{}) diff --git a/core/pkg/ingress/controller/listers.go b/core/pkg/ingress/controller/listers.go index d3429060b..6765f3478 100644 --- a/core/pkg/ingress/controller/listers.go +++ b/core/pkg/ingress/controller/listers.go @@ -25,14 +25,46 @@ import ( apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/ingress/core/pkg/ingress" "k8s.io/ingress/core/pkg/ingress/annotations/class" "k8s.io/ingress/core/pkg/ingress/annotations/parser" ) -func (ic *GenericController) createListers(disableNodeLister bool) { +type cacheController struct { + Ingress cache.Controller + Endpoint cache.Controller + Service cache.Controller + Node cache.Controller + Secret cache.Controller + Configmap cache.Controller +} + +func (c *cacheController) Run(stopCh chan struct{}) { + go c.Ingress.Run(stopCh) + go c.Endpoint.Run(stopCh) + go c.Service.Run(stopCh) + go c.Node.Run(stopCh) + go c.Secret.Run(stopCh) + go c.Configmap.Run(stopCh) + + // Wait for all involved caches to be synced, before processing items from the queue is started + if !cache.WaitForCacheSync(stopCh, + c.Ingress.HasSynced, + c.Endpoint.HasSynced, + c.Service.HasSynced, + c.Node.HasSynced, + c.Secret.HasSynced, + c.Configmap.HasSynced, + ) { + runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) + } +} + +func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.StoreLister, *cacheController) { // from here to the end of the method all the code is just boilerplate // required to watch Ingress, Secrets, ConfigMaps and Endoints. // This is used to detect new content, updates or removals and act accordingly @@ -166,23 +198,27 @@ func (ic *GenericController) createListers(disableNodeLister bool) { watchNs = ic.cfg.Namespace } - ic.listers.Ingress.Store, ic.ingController = cache.NewInformer( + lister := &ingress.StoreLister{} + + controller := &cacheController{} + + lister.Ingress.Store, controller.Ingress = cache.NewInformer( cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()), &extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler) - ic.listers.Endpoint.Store, ic.endpController = cache.NewInformer( + lister.Endpoint.Store, controller.Endpoint = cache.NewInformer( cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()), &apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler) - ic.listers.Secret.Store, ic.secrController = cache.NewInformer( + lister.Secret.Store, controller.Secret = cache.NewInformer( cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()), &apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler) - ic.listers.ConfigMap.Store, ic.mapController = cache.NewInformer( + lister.ConfigMap.Store, controller.Configmap = cache.NewInformer( cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()), &apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler) - ic.listers.Service.Store, ic.svcController = cache.NewInformer( + lister.Service.Store, controller.Service = cache.NewInformer( cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()), &apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) @@ -192,7 +228,9 @@ func (ic *GenericController) createListers(disableNodeLister bool) { } else { nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) } - ic.listers.Node.Store, ic.nodeController = cache.NewInformer( + lister.Node.Store, controller.Node = cache.NewInformer( nodeListerWatcher, &apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) + + return lister, controller } diff --git a/examples/custom-controller/server.go b/examples/custom-controller/server.go index 6d54e4b7a..4a2c0ab40 100644 --- a/examples/custom-controller/server.go +++ b/examples/custom-controller/server.go @@ -99,7 +99,7 @@ func (n DummyController) ConfigureFlags(*pflag.FlagSet) { func (n DummyController) OverrideFlags(*pflag.FlagSet) { } -func (n DummyController) SetListers(lister ingress.StoreLister) { +func (n DummyController) SetListers(lister *ingress.StoreLister) { }