From dee4056cf0bf6e4126dc899c0f347e34d04a08e7 Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Fri, 17 Nov 2017 21:15:43 -0300 Subject: [PATCH] Refactor lister and controllers --- .travis.yml | 3 + cmd/nginx/flags.go | 8 +- cmd/nginx/main.go | 1 + internal/ingress/controller/controller.go | 141 ++---- internal/ingress/controller/listers.go | 228 ---------- internal/ingress/controller/nginx.go | 75 ++-- internal/ingress/controller/process/nginx.go | 8 +- internal/ingress/controller/tcp.go | 2 + internal/ingress/status/status.go | 12 +- internal/ingress/status/status_test.go | 2 - .../{controller => store}/backend_ssl.go | 120 +++-- .../{controller => store}/backend_ssl_test.go | 26 +- internal/ingress/store/configmap.go | 41 ++ internal/ingress/store/endpoint.go | 40 ++ internal/ingress/store/ingress.go | 41 ++ internal/ingress/store/ingress_annotation.go | 26 ++ internal/ingress/store/local_secrets.go | 30 ++ internal/ingress/store/main.go | 113 ----- internal/ingress/store/secret.go | 41 ++ internal/ingress/store/service.go | 41 ++ internal/ingress/store/store.go | 412 ++++++++++++++++++ internal/ingress/store/store_test.go | 314 +++++++++++++ internal/ingress/types.go | 28 +- 23 files changed, 1182 insertions(+), 571 deletions(-) delete mode 100644 internal/ingress/controller/listers.go rename internal/ingress/{controller => store}/backend_ssl.go (65%) rename internal/ingress/{controller => store}/backend_ssl_test.go (96%) create mode 100644 internal/ingress/store/configmap.go create mode 100644 internal/ingress/store/endpoint.go create mode 100644 internal/ingress/store/ingress.go create mode 100644 internal/ingress/store/ingress_annotation.go create mode 100644 internal/ingress/store/local_secrets.go delete mode 100644 internal/ingress/store/main.go create mode 100644 internal/ingress/store/secret.go create mode 100644 internal/ingress/store/service.go create mode 100644 internal/ingress/store/store.go create mode 100644 internal/ingress/store/store_test.go diff --git a/.travis.yml b/.travis.yml index c40926a84..8bd9033fe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -34,6 +34,9 @@ jobs: - go get github.com/golang/lint/golint - make fmt lint vet - stage: Coverage + before_script: + - make e2e-image + - test/e2e/up.sh script: - go get github.com/mattn/goveralls - go get github.com/modocache/gover diff --git a/cmd/nginx/flags.go b/cmd/nginx/flags.go index e58230cc0..5394527f8 100644 --- a/cmd/nginx/flags.go +++ b/cmd/nginx/flags.go @@ -27,15 +27,12 @@ import ( apiv1 "k8s.io/api/core/v1" + "k8s.io/ingress-nginx/internal/ingress" "k8s.io/ingress-nginx/internal/ingress/controller" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" ing_net "k8s.io/ingress-nginx/internal/net" ) -const ( - defIngressClass = "nginx" -) - func parseFlags() (bool, *controller.Configuration, error) { var ( flags = pflag.NewFlagSet("", pflag.ExitOnError) @@ -157,6 +154,8 @@ func parseFlags() (bool, *controller.Configuration, error) { } } + ingress.IngressClass = *ingressClass + // check port collisions if !ing_net.IsPortAvailable(*httpPort) { return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --http-port", *httpPort) @@ -198,7 +197,6 @@ func parseFlags() (bool, *controller.Configuration, error) { EnableSSLChainCompletion: *enableSSLChainCompletion, ResyncPeriod: *resyncPeriod, DefaultService: *defaultSvc, - IngressClass: *ingressClass, Namespace: *watchNamespace, ConfigMapName: *configMap, TCPConfigMapName: *tcpConfigMapName, diff --git a/cmd/nginx/main.go b/cmd/nginx/main.go index ee8d88d04..8f24b61f7 100644 --- a/cmd/nginx/main.go +++ b/cmd/nginx/main.go @@ -127,6 +127,7 @@ func main() { ngx := controller.NewNGINXController(conf) if conf.EnableSSLPassthrough { + glog.Info("setting up TLS proxy for SSL passthrough") setupSSLProxy(conf.ListenPorts.HTTPS, conf.ListenPorts.SSLProxy, ngx) } diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 9da69ad59..3fd8fdcb8 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -37,8 +37,6 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/annotations" - "k8s.io/ingress-nginx/internal/ingress/annotations/class" "k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck" "k8s.io/ingress-nginx/internal/ingress/annotations/parser" "k8s.io/ingress-nginx/internal/ingress/annotations/proxy" @@ -76,8 +74,8 @@ type Configuration struct { ConfigMapName string DefaultService string - IngressClass string - Namespace string + + Namespace string ForceNamespaceIsolation bool @@ -117,26 +115,6 @@ func (n NGINXController) GetDefaultBackend() defaults.Backend { return n.backendDefaults } -// GetPublishService returns the configured service used to set ingress status -func (n NGINXController) GetPublishService() *apiv1.Service { - s, err := n.listers.Service.GetByName(n.cfg.PublishService) - if err != nil { - return nil - } - - return s -} - -// GetSecret searches for a secret in the local secrets Store -func (n NGINXController) GetSecret(name string) (*apiv1.Secret, error) { - return n.listers.Secret.GetByName(name) -} - -// GetService searches for a service in the local secrets Store -func (n NGINXController) GetService(name string) (*apiv1.Service, error) { - return n.listers.Service.GetByName(name) -} - // GetAnnotationWithPrefix returns the prefix of ingress annotations func (n NGINXController) GetAnnotationWithPrefix(suffix string) string { return fmt.Sprintf("%v/%v", n.cfg.AnnotationsPrefix, suffix) @@ -154,32 +132,20 @@ func (n *NGINXController) syncIngress(item interface{}) error { if element, ok := item.(task.Element); ok { if name, ok := element.Key.(string); ok { - if obj, exists, _ := n.listers.Ingress.GetByKey(name); exists { - ing := obj.(*extensions.Ingress) + if ing, err := n.store.GetIngress(name); err == nil { n.readSecrets(ing) } } } // Sort ingress rules using the ResourceVersion field - ings := n.listers.Ingress.List() - sort.SliceStable(ings, func(i, j int) bool { - ir := ings[i].(*extensions.Ingress).ResourceVersion - jr := ings[j].(*extensions.Ingress).ResourceVersion + ingresses := n.store.ListIngresses() + sort.SliceStable(ingresses, func(i, j int) bool { + ir := ingresses[i].ResourceVersion + jr := ingresses[j].ResourceVersion return ir < jr }) - // filter ingress rules - var ingresses []*extensions.Ingress - for _, ingIf := range ings { - ing := ingIf.(*extensions.Ingress) - if !class.IsValid(ing, n.cfg.IngressClass, n.cfg.DefaultIngressClass) { - continue - } - - ingresses = append(ingresses, ing) - } - upstreams, servers := n.getBackendServers(ingresses) var passUpstreams []*ingress.SSLPassthroughBackend @@ -248,7 +214,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr return []ingress.L4Service{} } - configmap, err := n.listers.ConfigMap.GetByName(configmapName) + configmap, err := n.store.GetConfigMap(configmapName) if err != nil { glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err) return []ingress.L4Service{} @@ -306,19 +272,12 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr continue } - svcObj, svcExists, err := n.listers.Service.GetByKey(nsName) + svc, err := n.store.GetService(nsName) if err != nil { glog.Warningf("error getting service %v: %v", nsName, err) continue } - if !svcExists { - glog.Warningf("service %v was not found", nsName) - continue - } - - svc := svcObj.(*apiv1.Service) - var endps []ingress.Endpoint targetPort, err := strconv.Atoi(svcPort) if err != nil { @@ -375,20 +334,13 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend { Name: defUpstreamName, } svcKey := n.cfg.DefaultService - svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey) + svc, err := n.store.GetService(svcKey) if err != nil { glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err) upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint()) return upstream } - if !svcExists { - glog.Warningf("service %v does not exist", svcKey) - upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint()) - return upstream - } - - svc := svcObj.(*apiv1.Service) endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{}) if len(endps) == 0 { glog.Warningf("service %v does not have any active endpoints", svcKey) @@ -408,7 +360,10 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([] servers := n.createServers(ingresses, upstreams, du) for _, ing := range ingresses { - anns := n.getIngressAnnotations(ing) + anns, err := n.store.GetIngressAnnotations(ing) + if err != nil { + glog.Warningf("%v", err) + } for _, rule := range ing.Spec.Rules { host := rule.Host @@ -622,20 +577,19 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([] // GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret func (n NGINXController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) { - if _, exists := n.sslCertTracker.Get(name); !exists { + if _, err := n.store.GetLocalSecret(name); err != nil { n.syncSecret(name) } - _, err := n.listers.Secret.GetByName(name) + _, err := n.store.GetLocalSecret(name) if err != nil { return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err) } - bc, exists := n.sslCertTracker.Get(name) - if !exists { - return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name) + cert, err := n.store.GetLocalSecret(name) + if err != nil { + return &resolver.AuthSSLCert{}, err } - cert := bc.(*ingress.SSLCert) return &resolver.AuthSSLCert{ Secret: name, CAFileName: cert.CAFileName, @@ -650,7 +604,10 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres upstreams[defUpstreamName] = du for _, ing := range data { - anns := n.getIngressAnnotations(ing) + anns, err := n.store.GetIngressAnnotations(ing) + if err != nil { + glog.Warningf("%v", err) + } var defBackend string if ing.Spec.Backend != nil { @@ -737,7 +694,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres upstreams[name].Endpoints = endp } - s, err := n.listers.Service.GetByName(svcKey) + s, err := n.store.GetService(svcKey) if err != nil { glog.Warningf("error obtaining service: %v", err) continue @@ -752,13 +709,11 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres } func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) { - svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey) - - if !svcExists { - return endpoint, fmt.Errorf("service %v does not exist", svcKey) + svc, err := n.store.GetService(svcKey) + if err != nil { + return endpoint, err } - svc := svcObj.(*apiv1.Service) if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" { return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey) } @@ -790,7 +745,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *exte // to a service. func (n *NGINXController) serviceEndpoints(svcKey, backendPort string, hz *healthcheck.Config) ([]ingress.Endpoint, error) { - svc, err := n.listers.Service.GetByName(svcKey) + svc, err := n.store.GetService(svcKey) var upstreams []ingress.Endpoint if err != nil { @@ -915,7 +870,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, // initialize all the servers for _, ing := range data { - anns := n.getIngressAnnotations(ing) + anns, err := n.store.GetIngressAnnotations(ing) + if err != nil { + glog.Warningf("%v", err) + } // default upstream server un := du.Name @@ -966,7 +924,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, // configure default location, alias, and SSL for _, ing := range data { - anns := n.getIngressAnnotations(ing) + anns, err := n.store.GetIngressAnnotations(ing) + if err != nil { + glog.Warningf("%v", err) + } for _, rule := range ing.Spec.Rules { host := rule.Host @@ -1031,13 +992,12 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, } key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName) - bc, exists := n.sslCertTracker.Get(key) - if !exists { - glog.Warningf("ssl certificate \"%v\" does not exist in local store", key) + cert, err := n.store.GetLocalSecret(key) + if err != nil { + glog.Warningf("%v", err) continue } - cert := bc.(*ingress.SSLCert) err = cert.Certificate.VerifyHostname(host) if err != nil { glog.Warningf("ssl certificate %v does not contain a Common Name or Subject Alternative Name for host %v", key, host) @@ -1107,7 +1067,7 @@ func (n *NGINXController) getEndpoints( } glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String()) - ep, err := n.listers.Endpoint.GetServiceEndpoints(s) + ep, err := n.store.GetServiceEndpoints(s) if err != nil { glog.Warningf("unexpected error obtaining service endpoints: %v", err) return upsServers @@ -1187,24 +1147,3 @@ func (n *NGINXController) SetForceReload(shouldReload bool) { atomic.StoreInt32(&n.forceReload, 0) } } - -func (n *NGINXController) extractAnnotations(ing *extensions.Ingress) { - anns := n.annotations.Extract(ing) - glog.V(3).Infof("updating annotations information for ingres %v/%v", anns.Namespace, anns.Name) - n.listers.IngressAnnotation.Update(anns) -} - -// getByIngress returns the parsed annotations from an Ingress -func (n *NGINXController) getIngressAnnotations(ing *extensions.Ingress) *annotations.Ingress { - key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name) - item, exists, err := n.listers.IngressAnnotation.GetByKey(key) - if err != nil { - glog.Errorf("unexpected error getting ingress annotation %v: %v", key, err) - return &annotations.Ingress{} - } - if !exists { - glog.Errorf("ingress annotation %v was not found", key) - return &annotations.Ingress{} - } - return item.(*annotations.Ingress) -} diff --git a/internal/ingress/controller/listers.go b/internal/ingress/controller/listers.go deleted file mode 100644 index adfa71982..000000000 --- a/internal/ingress/controller/listers.go +++ /dev/null @@ -1,228 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "fmt" - "reflect" - - "github.com/golang/glog" - - apiv1 "k8s.io/api/core/v1" - extensions "k8s.io/api/extensions/v1beta1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/cache" - cache_client "k8s.io/client-go/tools/cache" - - "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/annotations/class" - "k8s.io/ingress-nginx/internal/ingress/annotations/parser" -) - -type cacheController struct { - Ingress cache.Controller - Endpoint cache.Controller - Service cache.Controller - Secret cache.Controller - Configmap cache.Controller -} - -func (c *cacheController) Run(stopCh chan struct{}) { - go c.Ingress.Run(stopCh) - go c.Endpoint.Run(stopCh) - go c.Service.Run(stopCh) - go c.Secret.Run(stopCh) - go c.Configmap.Run(stopCh) - - // Wait for all involved caches to be synced, before processing items from the queue is started - if !cache.WaitForCacheSync(stopCh, - c.Ingress.HasSynced, - c.Endpoint.HasSynced, - c.Service.HasSynced, - c.Secret.HasSynced, - c.Configmap.HasSynced, - ) { - runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) - } -} - -func (n *NGINXController) createListers(stopCh chan struct{}) (*ingress.StoreLister, *cacheController) { - ingEventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - addIng := obj.(*extensions.Ingress) - if !class.IsValid(addIng, n.cfg.IngressClass, defIngressClass) { - a, _ := parser.GetStringAnnotation(class.IngressKey, addIng, n) - glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a) - return - } - - n.extractAnnotations(addIng) - n.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) - n.syncQueue.Enqueue(obj) - }, - DeleteFunc: func(obj interface{}) { - delIng, ok := obj.(*extensions.Ingress) - if !ok { - // If we reached here it means the ingress was deleted but its final state is unrecorded. - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("couldn't get object from tombstone %#v", obj) - return - } - delIng, ok = tombstone.Obj.(*extensions.Ingress) - if !ok { - glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj) - return - } - } - if !class.IsValid(delIng, n.cfg.IngressClass, defIngressClass) { - glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey) - return - } - n.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) - n.listers.IngressAnnotation.Delete(delIng) - n.syncQueue.Enqueue(obj) - }, - UpdateFunc: func(old, cur interface{}) { - oldIng := old.(*extensions.Ingress) - curIng := cur.(*extensions.Ingress) - validOld := class.IsValid(oldIng, n.cfg.IngressClass, defIngressClass) - validCur := class.IsValid(curIng, n.cfg.IngressClass, defIngressClass) - if !validOld && validCur { - glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey) - n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) - } else if validOld && !validCur { - glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey) - n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) - } else if validCur && !reflect.DeepEqual(old, cur) { - n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) - } - - n.extractAnnotations(curIng) - n.syncQueue.Enqueue(cur) - }, - } - - secrEventHandler := cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(old, cur interface{}) { - if !reflect.DeepEqual(old, cur) { - sec := cur.(*apiv1.Secret) - key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) - _, exists := n.sslCertTracker.Get(key) - if exists { - n.syncSecret(key) - } - } - }, - DeleteFunc: func(obj interface{}) { - sec, ok := obj.(*apiv1.Secret) - if !ok { - // If we reached here it means the secret was deleted but its final state is unrecorded. - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("couldn't get object from tombstone %#v", obj) - return - } - sec, ok = tombstone.Obj.(*apiv1.Secret) - if !ok { - glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj) - return - } - } - key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) - n.sslCertTracker.Delete(key) - n.syncQueue.Enqueue(key) - }, - } - - eventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - n.syncQueue.Enqueue(obj) - }, - DeleteFunc: func(obj interface{}) { - n.syncQueue.Enqueue(obj) - }, - UpdateFunc: func(old, cur interface{}) { - oep := old.(*apiv1.Endpoints) - ocur := cur.(*apiv1.Endpoints) - if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) { - n.syncQueue.Enqueue(cur) - } - }, - } - - mapEventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - upCmap := obj.(*apiv1.ConfigMap) - mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) - if mapKey == n.cfg.ConfigMapName { - glog.V(2).Infof("adding configmap %v to backend", mapKey) - n.SetConfig(upCmap) - n.SetForceReload(true) - } - }, - UpdateFunc: func(old, cur interface{}) { - if !reflect.DeepEqual(old, cur) { - upCmap := cur.(*apiv1.ConfigMap) - mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) - if mapKey == n.cfg.ConfigMapName { - glog.V(2).Infof("updating configmap backend (%v)", mapKey) - n.SetConfig(upCmap) - n.SetForceReload(true) - } - // updates to configuration configmaps can trigger an update - if mapKey == n.cfg.ConfigMapName || mapKey == n.cfg.TCPConfigMapName || mapKey == n.cfg.UDPConfigMapName { - n.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) - n.syncQueue.Enqueue(cur) - } - } - }, - } - - watchNs := apiv1.NamespaceAll - if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll { - watchNs = n.cfg.Namespace - } - - lister := &ingress.StoreLister{} - lister.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) - - controller := &cacheController{} - - lister.Ingress.Store, controller.Ingress = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", n.cfg.Namespace, fields.Everything()), - &extensions.Ingress{}, n.cfg.ResyncPeriod, ingEventHandler) - - lister.Endpoint.Store, controller.Endpoint = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "endpoints", n.cfg.Namespace, fields.Everything()), - &apiv1.Endpoints{}, n.cfg.ResyncPeriod, eventHandler) - - lister.Secret.Store, controller.Secret = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()), - &apiv1.Secret{}, n.cfg.ResyncPeriod, secrEventHandler) - - lister.ConfigMap.Store, controller.Configmap = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()), - &apiv1.ConfigMap{}, n.cfg.ResyncPeriod, mapEventHandler) - - lister.Service.Store, controller.Service = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "services", n.cfg.Namespace, fields.Everything()), - &apiv1.Service{}, n.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) - - return lister, controller -} diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index b74c05cb0..617298ef2 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -69,10 +69,9 @@ const ( ) var ( - tmplPath = "/etc/nginx/template/nginx.tmpl" - cfgPath = "/etc/nginx/nginx.conf" - nginxBinary = "/usr/sbin/nginx" - defIngressClass = "nginx" + tmplPath = "/etc/nginx/template/nginx.tmpl" + cfgPath = "/etc/nginx/nginx.conf" + nginxBinary = "/usr/sbin/nginx" ) // NewNGINXController creates a new NGINX Ingress controller. @@ -103,9 +102,9 @@ func NewNGINXController(config *Configuration) *NGINXController { isIPV6Enabled: ing_net.IsIPv6Enabled(), - resolver: h, - cfg: config, - sslCertTracker: store.NewSSLCertTracker(), + resolver: h, + cfg: config, + syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ @@ -113,19 +112,26 @@ func NewNGINXController(config *Configuration) *NGINXController { }), stopCh: make(chan struct{}), + updateCh: make(chan interface{}), + stopLock: &sync.Mutex{}, fileSystem: filesystem.DefaultFs{}, } - n.listers, n.controllers = n.createListers(n.stopCh) + watchNs := apiv1.NamespaceAll + if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll { + watchNs = n.cfg.Namespace + } + + ae := annotations.NewAnnotationExtractor(n) + + n.store = store.New(watchNs, n.cfg.ResyncPeriod, n.recorder, n.cfg.Client, ae, n.updateCh) n.stats = newStatsCollector(config.Namespace, config.IngressClass, n.binary, n.cfg.ListenPorts.Status) n.syncQueue = task.NewTaskQueue(n.syncIngress) - n.annotations = annotations.NewAnnotationExtractor(n) - if config.UpdateStatus { n.syncStatus = status.NewStatusSyncer(status.Config{ Client: config.Client, @@ -174,21 +180,12 @@ Error loading new template : %v type NGINXController struct { cfg *Configuration - listers *ingress.StoreLister - controllers *cacheController - - annotations annotations.Extractor - recorder record.EventRecorder syncQueue *task.Queue syncStatus status.Sync - // local store of SSL certificates - // (only certificates used in ingress) - sslCertTracker *store.SSLCertTracker - syncRateLimiter flowcontrol.RateLimiter // stopLock is used to enforce only a single call to Stop is active. @@ -201,7 +198,7 @@ type NGINXController struct { // ngxErrCh channel used to detect errors with the nginx processes ngxErrCh chan error - // runningConfig contains the running configuration in the Backend + // runningConfig contains the running configuration runningConfig *ingress.Configuration forceReload int32 @@ -210,7 +207,7 @@ type NGINXController struct { configmap *apiv1.ConfigMap - storeLister *ingress.StoreLister + store store.Storer binary string resolver []net.IP @@ -224,8 +221,6 @@ type NGINXController struct { // returns true if proxy protocol es enabled IsProxyProtocolEnabled bool - isSSLPassthroughEnabled bool - isShuttingDown bool Proxy *TCPProxy @@ -239,7 +234,7 @@ type NGINXController struct { func (n *NGINXController) Start() { glog.Infof("starting Ingress controller") - n.controllers.Run(n.stopCh) + n.store.Start() // initial sync of secrets to avoid unnecessary reloads glog.Info("running initial sync of secrets") @@ -265,8 +260,6 @@ func (n *NGINXController) Start() { go n.syncStatus.Run(n.stopCh) } - go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh) - done := make(chan error, 1) cmd := exec.Command(n.binary, "-c", cfgPath) @@ -472,16 +465,40 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { } } + svcIP := svc.Spec.ClusterIP + + // in case of headless services we can use just the first address + isHeadlessService := svc.Spec.ClusterIP == "None" + if isHeadlessService { + ep, err := n.listers.Endpoint.GetServiceEndpoints(svc) + if err != nil { + glog.Warningf("unexpected error obtaining service endpoints: %v", err) + continue + } + + if len(ep.Subsets) == 0 { + glog.Warningf("invalid service headless definition (no subsets)") + continue + } + + if len(ep.Subsets[0].Addresses) == 0 { + glog.Warningf("invalid service headless definition (no addresses)") + continue + } + + svcIP = ep.Subsets[0].Addresses[0].IP + } + //TODO: Allow PassthroughBackends to specify they support proxy-protocol servers = append(servers, &TCPServer{ Hostname: pb.Hostname, - IP: svc.Spec.ClusterIP, + IP: svcIP, Port: port, ProxyProtocol: false, }) } - if n.isSSLPassthroughEnabled { + if n.cfg.EnableSSLPassthrough { n.Proxy.ServerList = servers } @@ -627,7 +644,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { Cfg: cfg, IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6, RedirectServers: redirectServers, - IsSSLPassthroughEnabled: n.isSSLPassthroughEnabled, + IsSSLPassthroughEnabled: n.cfg.EnableSSLPassthrough, ListenPorts: n.cfg.ListenPorts, PublishService: n.GetPublishService(), } diff --git a/internal/ingress/controller/process/nginx.go b/internal/ingress/controller/process/nginx.go index 4a925a3e1..cf19e6b0a 100644 --- a/internal/ingress/controller/process/nginx.go +++ b/internal/ingress/controller/process/nginx.go @@ -29,6 +29,10 @@ import ( "github.com/ncabatoff/process-exporter/proc" ) +const ( + processName = "nginx" +) + // IsRespawnIfRequired checks if error type is exec.ExitError or not func IsRespawnIfRequired(err error) bool { exitError, ok := err.(*exec.ExitError) @@ -68,7 +72,7 @@ func WaitUntilPortIsAvailable(port int) { continue } - if pn == "nginx" { + if pn == processName { osp, err := os.FindProcess(p.PID) if err != nil { glog.Errorf("unexpected error obtaining process information: %v", err) @@ -85,7 +89,7 @@ func WaitUntilPortIsAvailable(port int) { func IsNginxRunning() bool { processes, _ := ps.Processes() for _, p := range processes { - if p.Executable() == "nginx" { + if p.Executable() == processName { return true } } diff --git a/internal/ingress/controller/tcp.go b/internal/ingress/controller/tcp.go index efcc83077..26f3d4979 100644 --- a/internal/ingress/controller/tcp.go +++ b/internal/ingress/controller/tcp.go @@ -40,6 +40,7 @@ type TCPProxy struct { func (p *TCPProxy) Get(host string) *TCPServer { if p.ServerList == nil { + glog.Warning("there is no servers configured with SSL passthrough. Returning default backend") return p.Default } @@ -94,6 +95,7 @@ func (p *TCPProxy) Handle(conn net.Conn) { glog.V(4).Infof("Writing proxy protocol header - %s", proxyProtocolHeader) _, err = fmt.Fprintf(clientConn, proxyProtocolHeader) } + if err != nil { glog.Errorf("unexpected error writing proxy-protocol header: %s", err) clientConn.Close() diff --git a/internal/ingress/status/status.go b/internal/ingress/status/status.go index effdb9f7f..a0350a67c 100644 --- a/internal/ingress/status/status.go +++ b/internal/ingress/status/status.go @@ -40,6 +40,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" + "k8s.io/ingress-nginx/internal/ingress" "k8s.io/ingress-nginx/internal/ingress/annotations/class" "k8s.io/ingress-nginx/internal/ingress/store" "k8s.io/ingress-nginx/internal/k8s" @@ -69,9 +70,6 @@ type Config struct { UseNodeInternalIP bool IngressLister store.IngressLister - - DefaultIngressClass string - IngressClass string } // statusSync keeps the status IP in each Ingress rule updated executing a periodic check @@ -180,9 +178,9 @@ func NewStatusSyncer(config Config) Sync { // we need to use the defined ingress class to allow multiple leaders // in order to update information about ingress status - electionID := fmt.Sprintf("%v-%v", config.ElectionID, config.DefaultIngressClass) - if config.IngressClass != "" { - electionID = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass) + electionID := fmt.Sprintf("%v-%v", config.ElectionID, ingress.DefaultIngressClass) + if ingress.IngressClass != "" { + electionID = fmt.Sprintf("%v-%v", config.ElectionID, ingress.IngressClass) } callbacks := leaderelection.LeaderCallbacks{ @@ -314,7 +312,7 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) { for _, cur := range ings { ing := cur.(*extensions.Ingress) - if !class.IsValid(ing, s.Config.IngressClass, s.Config.DefaultIngressClass) { + if !class.IsValid(ing, ingress.IngressClass, ingress.DefaultIngressClass) { continue } diff --git a/internal/ingress/status/status_test.go b/internal/ingress/status/status_test.go index 97c0b5033..efa855aa1 100644 --- a/internal/ingress/status/status_test.go +++ b/internal/ingress/status/status_test.go @@ -261,8 +261,6 @@ func TestStatusActions(t *testing.T) { Client: buildSimpleClientSet(), PublishService: "", IngressLister: buildIngressListener(), - DefaultIngressClass: "nginx", - IngressClass: "", UpdateStatusOnShutdown: true, } // create object diff --git a/internal/ingress/controller/backend_ssl.go b/internal/ingress/store/backend_ssl.go similarity index 65% rename from internal/ingress/controller/backend_ssl.go rename to internal/ingress/store/backend_ssl.go index a4f3f138f..10c91cfb8 100644 --- a/internal/ingress/controller/backend_ssl.go +++ b/internal/ingress/store/backend_ssl.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package store import ( "fmt" @@ -25,53 +25,51 @@ import ( "github.com/imdario/mergo" apiv1 "k8s.io/api/core/v1" - extensions "k8s.io/api/extensions/v1beta1" "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/annotations/class" - "k8s.io/ingress-nginx/internal/ingress/annotations/parser" "k8s.io/ingress-nginx/internal/net/ssl" ) // syncSecret keeps in sync Secrets used by Ingress rules with the files on // disk to allow copy of the content of the secret to disk to be used // by external processes. -func (ic *NGINXController) syncSecret(key string) { +func (s k8sStore) syncSecret(key string) { glog.V(3).Infof("starting syncing of secret %v", key) - cert, err := ic.getPemCertificate(key) + // cert + _, err := s.getPemCertificate(key) if err != nil { glog.Warningf("error obtaining PEM from secret %v: %v", key, err) return } - - // create certificates and add or update the item in the store - cur, exists := ic.sslCertTracker.Get(key) - if exists { - s := cur.(*ingress.SSLCert) - if s.Equal(cert) { - // no need to update + /* + // create certificates and add or update the item in the store + cur, exists := s.GetSecret(key) + if exists { + s := cur.(*ingress.SSLCert) + if s.Equal(cert) { + // no need to update + return + } + glog.Infof("updating secret %v in the local store", key) + ic.store.UpdateLocalSecret(key, cert) + // this update must trigger an update + // (like an update event from a change in Ingress) + ic.syncQueue.Enqueue(&extensions.Ingress{}) return } - glog.Infof("updating secret %v in the local store", key) - ic.sslCertTracker.Update(key, cert) + + glog.Infof("adding secret %v to the local store", key) + ic.store.AddLocalSecret(key, cert) // this update must trigger an update // (like an update event from a change in Ingress) - ic.syncQueue.Enqueue(&extensions.Ingress{}) - return - } - - glog.Infof("adding secret %v to the local store", key) - ic.sslCertTracker.Add(key, cert) - // this update must trigger an update - // (like an update event from a change in Ingress) - ic.syncQueue.Enqueue(&extensions.Ingress{}) + ic.syncQueue.Enqueue(&extensions.Ingress{})*/ } // getPemCertificate receives a secret, and creates a ingress.SSLCert as return. // It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only. -func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCert, error) { - secret, err := ic.listers.Secret.GetByName(secretName) +func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error) { + secret, err := s.listers.Secret.GetByNamespaceName(secretName) if err != nil { return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err) } @@ -83,7 +81,7 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer // namespace/secretName -> namespace-secretName nsSecName := strings.Replace(secretName, "/", "-", -1) - var s *ingress.SSLCert + var sslCert *ingress.SSLCert if okcert && okkey { if cert == nil { return nil, fmt.Errorf("secret %v has no 'tls.crt'", secretName) @@ -94,18 +92,18 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer // If 'ca.crt' is also present, it will allow this secret to be used in the // 'nginx.ingress.kubernetes.io/auth-tls-secret' annotation - s, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca) + sslCert, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca) if err != nil { return nil, fmt.Errorf("unexpected error creating pem file: %v", err) } - - glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, s.CN) - if ca != nil { - glog.V(3).Infof("found 'ca.crt', secret %v can also be used for Certificate Authentication", secretName) - } - + /* + glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, s.CN) + if ca != nil { + glog.V(3).Infof("found 'ca.crt', secret %v can also be used for Certificate Authentication", secretName) + } + */ } else if ca != nil { - s, err = ssl.AddCertAuth(nsSecName, ca) + sslCert, err = ssl.AddCertAuth(nsSecName, ca) if err != nil { return nil, fmt.Errorf("unexpected error creating pem file: %v", err) @@ -119,15 +117,20 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer return nil, fmt.Errorf("no keypair or CA cert could be found in %v", secretName) } - s.Name = secret.Name - s.Namespace = secret.Namespace - return s, nil + sslCert.Name = secret.Name + sslCert.Namespace = secret.Namespace + + return sslCert, nil } -func (ic *NGINXController) checkSSLChainIssues() { - for _, secretName := range ic.sslCertTracker.ListKeys() { - s, _ := ic.sslCertTracker.Get(secretName) - secret := s.(*ingress.SSLCert) +func (s k8sStore) checkSSLChainIssues() { + for _, item := range s.ListLocalSecrets() { + secretName := fmt.Sprintf("%v/%v", item.Namespace, item.Name) + + secret, err := s.GetLocalSecret(secretName) + if err != nil { + continue + } if secret.FullChainPemFileName != "" { // chain already checked @@ -158,42 +161,37 @@ func (ic *NGINXController) checkSSLChainIssues() { dst.FullChainPemFileName = fullChainPemFileName glog.Infof("updating local copy of ssl certificate %v with missing intermediate CA certs", secretName) - ic.sslCertTracker.Update(secretName, dst) + s.sslStore.Update(secretName, dst) // this update must trigger an update // (like an update event from a change in Ingress) - ic.syncQueue.Enqueue(&extensions.Ingress{}) + //ic.syncQueue.Enqueue(&extensions.Ingress{}) } } // checkMissingSecrets verify if one or more ingress rules contains a reference // to a secret that is not present in the local secret store. // In this case we call syncSecret. -func (ic *NGINXController) checkMissingSecrets() { - for _, obj := range ic.listers.Ingress.List() { - ing := obj.(*extensions.Ingress) - - if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { - continue - } - +func (s k8sStore) checkMissingSecrets() { + for _, ing := range s.ListIngresses() { for _, tls := range ing.Spec.TLS { if tls.SecretName == "" { continue } key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) - if _, ok := ic.sslCertTracker.Get(key); !ok { - ic.syncSecret(key) + if _, ok := s.sslStore.Get(key); !ok { + s.syncSecret(key) } } - key, _ := parser.GetStringAnnotation("auth-tls-secret", ing, ic) - if key == "" { - continue - } + /* + key, _ := parser.GetStringAnnotation("auth-tls-secret", ing, resolver) + if key == "" { + continue + } - if _, ok := ic.sslCertTracker.Get(key); !ok { - ic.syncSecret(key) - } + if _, ok := ic.sslCertTracker.Get(key); !ok { + ic.syncSecret(key) + }*/ } } diff --git a/internal/ingress/controller/backend_ssl_test.go b/internal/ingress/store/backend_ssl_test.go similarity index 96% rename from internal/ingress/controller/backend_ssl_test.go rename to internal/ingress/store/backend_ssl_test.go index 16892da62..31304bd56 100644 --- a/internal/ingress/controller/backend_ssl_test.go +++ b/internal/ingress/store/backend_ssl_test.go @@ -14,24 +14,20 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package store import ( "encoding/base64" "fmt" "io/ioutil" - "testing" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testclient "k8s.io/client-go/kubernetes/fake" cache_client "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/flowcontrol" + "k8s.io/kubernetes/pkg/api" "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/store" - "k8s.io/ingress-nginx/internal/task" - "k8s.io/kubernetes/pkg/api" ) const ( @@ -66,8 +62,8 @@ func buildSimpleClientSetForBackendSSL() *testclient.Clientset { return testclient.NewSimpleClientset() } -func buildIngListenerForBackendSSL() store.IngressLister { - ingLister := store.IngressLister{} +func buildIngListenerForBackendSSL() IngressLister { + ingLister := IngressLister{} ingLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) return ingLister } @@ -81,20 +77,21 @@ func buildSecretForBackendSSL() *apiv1.Secret { } } -func buildSecrListerForBackendSSL() store.SecretLister { - secrLister := store.SecretLister{} +func buildSecrListerForBackendSSL() SecretLister { + secrLister := SecretLister{} secrLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) return secrLister } +/* func buildListers() *ingress.StoreLister { sl := &ingress.StoreLister{} sl.Ingress.Store = buildIngListenerForBackendSSL() sl.Secret.Store = buildSecrListerForBackendSSL() return sl } - +*/ func buildControllerForBackendSSL() cache_client.Controller { cfg := &cache_client.Config{ Queue: &MockQueue{Synced: true}, @@ -103,6 +100,7 @@ func buildControllerForBackendSSL() cache_client.Controller { return cache_client.New(cfg) } +/* func buildGenericControllerForBackendSSL() *NGINXController { gc := &NGINXController{ syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1), @@ -110,13 +108,13 @@ func buildGenericControllerForBackendSSL() *NGINXController { Client: buildSimpleClientSetForBackendSSL(), }, listers: buildListers(), - sslCertTracker: store.NewSSLCertTracker(), + sslCertTracker: NewSSLCertTracker(), } gc.syncQueue = task.NewTaskQueue(gc.syncIngress) return gc } - +*/ func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) { // prepare td, err := ioutil.TempDir("", "ssl") @@ -140,6 +138,7 @@ func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) { return dCrt, dKey, dCa, nil } +/* func TestSyncSecret(t *testing.T) { // prepare for test dCrt, dKey, dCa, err := buildCrtKeyAndCA() @@ -232,3 +231,4 @@ func TestGetPemCertificate(t *testing.T) { }) } } +*/ diff --git a/internal/ingress/store/configmap.go b/internal/ingress/store/configmap.go new file mode 100644 index 000000000..2b545f720 --- /dev/null +++ b/internal/ingress/store/configmap.go @@ -0,0 +1,41 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +// ConfigMapLister makes a Store that lists Configmaps. +type ConfigMapLister struct { + cache.Store +} + +// GetByNamespaceName searches for a configmap in the local configmaps Store +func (cml *ConfigMapLister) GetByNamespaceName(key string) (*apiv1.ConfigMap, error) { + s, exists, err := cml.GetByKey(key) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("configmap %v was not found", key) + } + return s.(*apiv1.ConfigMap), nil +} diff --git a/internal/ingress/store/endpoint.go b/internal/ingress/store/endpoint.go new file mode 100644 index 000000000..c464e98b5 --- /dev/null +++ b/internal/ingress/store/endpoint.go @@ -0,0 +1,40 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +// EndpointLister makes a Store that lists Endpoints. +type EndpointLister struct { + cache.Store +} + +// GetServiceEndpoints returns the endpoints of a service, matched on service name. +func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) { + for _, m := range s.Store.List() { + ep := m.(*apiv1.Endpoints) + if svc.Name == ep.Name && svc.Namespace == ep.Namespace { + return ep, nil + } + } + return nil, fmt.Errorf("could not find endpoints for service: %v", svc.Name) +} diff --git a/internal/ingress/store/ingress.go b/internal/ingress/store/ingress.go new file mode 100644 index 000000000..137b0f564 --- /dev/null +++ b/internal/ingress/store/ingress.go @@ -0,0 +1,41 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/client-go/tools/cache" +) + +// IngressLister makes a Store that lists Ingress. +type IngressLister struct { + cache.Store +} + +// GetByNamespaceName searches for an ingress in the local ingress Store +func (il IngressLister) GetByNamespaceName(key string) (*extensions.Ingress, error) { + i, exists, err := il.GetByKey(key) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("ingress %v was not found", key) + } + return i.(*extensions.Ingress), nil +} diff --git a/internal/ingress/store/ingress_annotation.go b/internal/ingress/store/ingress_annotation.go new file mode 100644 index 000000000..676875aca --- /dev/null +++ b/internal/ingress/store/ingress_annotation.go @@ -0,0 +1,26 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "k8s.io/client-go/tools/cache" +) + +// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules. +type IngressAnnotationsLister struct { + cache.Store +} diff --git a/internal/ingress/store/local_secrets.go b/internal/ingress/store/local_secrets.go new file mode 100644 index 000000000..566b9d9e8 --- /dev/null +++ b/internal/ingress/store/local_secrets.go @@ -0,0 +1,30 @@ +package store + +import ( + "fmt" + + "k8s.io/client-go/tools/cache" + + "k8s.io/ingress-nginx/internal/ingress" +) + +// SSLCertTracker holds a store of referenced Secrets in Ingress rules +type SSLCertTracker struct { + cache.ThreadSafeStore +} + +// NewSSLCertTracker creates a new SSLCertTracker store +func NewSSLCertTracker() *SSLCertTracker { + return &SSLCertTracker{ + cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), + } +} + +// GetByNamespaceName searches for an ingress in the local ingress Store +func (s SSLCertTracker) GetByNamespaceName(key string) (*ingress.SSLCert, error) { + cert, exists := s.Get(key) + if !exists { + return nil, fmt.Errorf("local SSL certificate %v was not found", key) + } + return cert.(*ingress.SSLCert), nil +} diff --git a/internal/ingress/store/main.go b/internal/ingress/store/main.go deleted file mode 100644 index 299f54c0b..000000000 --- a/internal/ingress/store/main.go +++ /dev/null @@ -1,113 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package store - -import ( - "fmt" - - apiv1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/cache" -) - -// IngressLister makes a Store that lists Ingress. -type IngressLister struct { - cache.Store -} - -// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules. -type IngressAnnotationsLister struct { - cache.Store -} - -// SecretLister makes a Store that lists Secrets. -type SecretLister struct { - cache.Store -} - -// GetByName searches for a secret in the local secrets Store -func (sl *SecretLister) GetByName(name string) (*apiv1.Secret, error) { - s, exists, err := sl.GetByKey(name) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("secret %v was not found", name) - } - return s.(*apiv1.Secret), nil -} - -// ConfigMapLister makes a Store that lists Configmaps. -type ConfigMapLister struct { - cache.Store -} - -// GetByName searches for a configmap in the local configmaps Store -func (cml *ConfigMapLister) GetByName(name string) (*apiv1.ConfigMap, error) { - s, exists, err := cml.GetByKey(name) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("configmap %v was not found", name) - } - return s.(*apiv1.ConfigMap), nil -} - -// ServiceLister makes a Store that lists Services. -type ServiceLister struct { - cache.Store -} - -// GetByName searches for a service in the local secrets Store -func (sl *ServiceLister) GetByName(name string) (*apiv1.Service, error) { - s, exists, err := sl.GetByKey(name) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("service %v was not found", name) - } - return s.(*apiv1.Service), nil -} - -// EndpointLister makes a Store that lists Endpoints. -type EndpointLister struct { - cache.Store -} - -// GetServiceEndpoints returns the endpoints of a service, matched on service name. -func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) { - for _, m := range s.Store.List() { - ep := m.(*apiv1.Endpoints) - if svc.Name == ep.Name && svc.Namespace == ep.Namespace { - return ep, nil - } - } - return nil, fmt.Errorf("could not find endpoints for service: %v", svc.Name) -} - -// SSLCertTracker holds a store of referenced Secrets in Ingress rules -type SSLCertTracker struct { - cache.ThreadSafeStore -} - -// NewSSLCertTracker creates a new SSLCertTracker store -func NewSSLCertTracker() *SSLCertTracker { - return &SSLCertTracker{ - cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), - } -} diff --git a/internal/ingress/store/secret.go b/internal/ingress/store/secret.go new file mode 100644 index 000000000..c89ac5230 --- /dev/null +++ b/internal/ingress/store/secret.go @@ -0,0 +1,41 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +// SecretLister makes a Store that lists Secrets. +type SecretLister struct { + cache.Store +} + +// GetByNamespaceName searches for a secret in the local secrets Store +func (sl *SecretLister) GetByNamespaceName(key string) (*apiv1.Secret, error) { + s, exists, err := sl.GetByKey(key) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("secret %v was not found", key) + } + return s.(*apiv1.Secret), nil +} diff --git a/internal/ingress/store/service.go b/internal/ingress/store/service.go new file mode 100644 index 000000000..97afc4902 --- /dev/null +++ b/internal/ingress/store/service.go @@ -0,0 +1,41 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +// ServiceLister makes a Store that lists Services. +type ServiceLister struct { + cache.Store +} + +// GetByNamespaceName searches for a service in the local secrets Store +func (sl *ServiceLister) GetByNamespaceName(key string) (*apiv1.Service, error) { + s, exists, err := sl.GetByKey(key) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("service %v was not found", key) + } + return s.(*apiv1.Service), nil +} diff --git a/internal/ingress/store/store.go b/internal/ingress/store/store.go new file mode 100644 index 000000000..9555c8264 --- /dev/null +++ b/internal/ingress/store/store.go @@ -0,0 +1,412 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + "reflect" + "time" + + "github.com/golang/glog" + + apiv1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + cache_client "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + + "k8s.io/ingress-nginx/internal/ingress" + "k8s.io/ingress-nginx/internal/ingress/annotations" + "k8s.io/ingress-nginx/internal/ingress/annotations/class" + "k8s.io/ingress-nginx/internal/ingress/annotations/parser" + "k8s.io/ingress-nginx/internal/ingress/resolver" +) + +// Storer is the interface that wraps the required methods to gather information +// about ingresses, services, secrets and ingress annotations. +type Storer interface { + GetConfigMap(key string) (*apiv1.ConfigMap, error) + + // GetSecret returns a Secret using the namespace and name as key + GetSecret(key string) (*apiv1.Secret, error) + + // GetService returns a Service using the namespace and name as key + GetService(key string) (*apiv1.Service, error) + + GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) + + // GetSecret returns an Ingress using the namespace and name as key + GetIngress(key string) (*extensions.Ingress, error) + + // ListIngresses returns the list of Ingresses + ListIngresses() []*extensions.Ingress + + // GetIngressAnnotations returns the annotations associated to an Ingress + GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) + + // GetLocalSecret returns the local copy of a Secret + GetLocalSecret(name string) (*ingress.SSLCert, error) + + // ListLocalSecrets returns the list of local Secrets + ListLocalSecrets() []*ingress.SSLCert + + // StartSync initiates the synchronization of the controllers + StartSync(stopCh chan struct{}) +} + +// lister returns the stores for ingresses, services, endpoints, secrets and configmaps. +type lister struct { + Ingress IngressLister + Service ServiceLister + Endpoint EndpointLister + Secret SecretLister + ConfigMap ConfigMapLister + IngressAnnotation IngressAnnotationsLister +} + +// controller defines the required controllers that interact agains the api server +type controller struct { + Ingress cache.Controller + Endpoint cache.Controller + Service cache.Controller + Secret cache.Controller + Configmap cache.Controller +} + +// Run initiates the synchronization of the controllers against the api server +func (c *controller) Run(stopCh chan struct{}) { + go c.Ingress.Run(stopCh) + go c.Endpoint.Run(stopCh) + go c.Service.Run(stopCh) + go c.Secret.Run(stopCh) + go c.Configmap.Run(stopCh) + + // wait for all involved caches to be synced, before processing items + // from the queue is started + if !cache.WaitForCacheSync(stopCh, + c.Ingress.HasSynced, + c.Endpoint.HasSynced, + c.Service.HasSynced, + c.Secret.HasSynced, + c.Configmap.HasSynced, + ) { + runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) + } +} + +type k8sStore struct { + cache *controller + // listers + listers *lister + + // sslStore local store of SSL certificates (certificates used in ingress) + sslStore *SSLCertTracker + + // annotations parser + annotations annotations.Extractor +} + +// New creates a new object store to be used in the ingress controller +func New(namespace, configmap, tcp, udp string, + resyncPeriod time.Duration, + recorder record.EventRecorder, + client clientset.Interface, + annotations annotations.Extractor, + r resolver.Resolver, + updateCh chan ingress.Event) Storer { + + store := &k8sStore{ + cache: &controller{}, + listers: &lister{}, + sslStore: NewSSLCertTracker(), + annotations: annotations, + } + + ingEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + addIng := obj.(*extensions.Ingress) + if !class.IsValid(addIng, ingress.IngressClass, ingress.DefaultIngressClass) { + a, _ := parser.GetStringAnnotation(class.IngressKey, addIng, r) + glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a) + return + } + + store.extractAnnotations(addIng) + recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) + updateCh <- ingress.Event{ + Type: ingress.CreateEvent, + Obj: obj, + } + }, + DeleteFunc: func(obj interface{}) { + delIng, ok := obj.(*extensions.Ingress) + if !ok { + // If we reached here it means the ingress was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("couldn't get object from tombstone %#v", obj) + return + } + delIng, ok = tombstone.Obj.(*extensions.Ingress) + if !ok { + glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj) + return + } + } + if !class.IsValid(delIng, ingress.IngressClass, ingress.DefaultIngressClass) { + glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey) + return + } + recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) + store.listers.IngressAnnotation.Delete(delIng) + updateCh <- ingress.Event{ + Type: ingress.DeleteEvent, + Obj: obj, + } + }, + UpdateFunc: func(old, cur interface{}) { + oldIng := old.(*extensions.Ingress) + curIng := cur.(*extensions.Ingress) + validOld := class.IsValid(oldIng, ingress.IngressClass, ingress.DefaultIngressClass) + validCur := class.IsValid(curIng, ingress.IngressClass, ingress.DefaultIngressClass) + if !validOld && validCur { + glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey) + recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + } else if validOld && !validCur { + glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey) + recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + } else if validCur && !reflect.DeepEqual(old, cur) { + recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + } + + store.extractAnnotations(curIng) + updateCh <- ingress.Event{ + Type: ingress.UpdateEvent, + Obj: cur, + } + }, + } + + secrEventHandler := cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + sec := cur.(*apiv1.Secret) + key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) + _, exists := store.sslStore.Get(key) + if exists { + updateCh <- ingress.Event{ + Type: ingress.UpdateEvent, + Obj: cur, + } + } + } + }, + DeleteFunc: func(obj interface{}) { + sec, ok := obj.(*apiv1.Secret) + if !ok { + // If we reached here it means the secret was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("couldn't get object from tombstone %#v", obj) + return + } + sec, ok = tombstone.Obj.(*apiv1.Secret) + if !ok { + glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj) + return + } + } + key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) + store.sslStore.Delete(key) + updateCh <- ingress.Event{ + Type: ingress.DeleteEvent, + Obj: obj, + } + }, + } + + eventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + updateCh <- ingress.Event{ + Type: ingress.CreateEvent, + Obj: obj, + } + }, + DeleteFunc: func(obj interface{}) { + updateCh <- ingress.Event{ + Type: ingress.DeleteEvent, + Obj: obj, + } + }, + UpdateFunc: func(old, cur interface{}) { + oep := old.(*apiv1.Endpoints) + ocur := cur.(*apiv1.Endpoints) + if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) { + updateCh <- ingress.Event{ + Type: ingress.UpdateEvent, + Obj: cur, + } + } + }, + } + + mapEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + upCmap := obj.(*apiv1.ConfigMap) + mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) + if mapKey == configmap { + glog.V(2).Infof("adding configmap %v to backend", mapKey) + updateCh <- ingress.Event{ + Type: ingress.CreateEvent, + Obj: obj, + } + } + }, + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + upCmap := cur.(*apiv1.ConfigMap) + mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) + if mapKey == configmap { + glog.V(2).Infof("updating configmap backend (%v)", mapKey) + updateCh <- ingress.Event{ + Type: ingress.UpdateEvent, + Obj: cur, + } + } + // updates to configuration configmaps can trigger an update + if mapKey == configmap || mapKey == tcp || mapKey == udp { + recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) + updateCh <- ingress.Event{ + Type: ingress.UpdateEvent, + Obj: cur, + } + } + } + }, + } + + store.listers.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) + + store.listers.Ingress.Store, store.cache.Ingress = cache.NewInformer( + cache.NewListWatchFromClient(client.ExtensionsV1beta1().RESTClient(), "ingresses", namespace, fields.Everything()), + &extensions.Ingress{}, resyncPeriod, ingEventHandler) + + store.listers.Endpoint.Store, store.cache.Endpoint = cache.NewInformer( + cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", namespace, fields.Everything()), + &apiv1.Endpoints{}, resyncPeriod, eventHandler) + + store.listers.Secret.Store, store.cache.Secret = cache.NewInformer( + cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "secrets", namespace, fields.Everything()), + &apiv1.Secret{}, resyncPeriod, secrEventHandler) + + store.listers.ConfigMap.Store, store.cache.Configmap = cache.NewInformer( + cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()), + &apiv1.ConfigMap{}, resyncPeriod, mapEventHandler) + + store.listers.Service.Store, store.cache.Service = cache.NewInformer( + cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "services", namespace, fields.Everything()), + &apiv1.Service{}, resyncPeriod, cache.ResourceEventHandlerFuncs{}) + + return store +} + +func (s k8sStore) extractAnnotations(ing *extensions.Ingress) { + anns := s.annotations.Extract(ing) + glog.V(3).Infof("updating annotations information for ingres %v/%v", anns.Namespace, anns.Name) + s.listers.IngressAnnotation.Update(anns) +} + +// GetSecret returns a Secret using the namespace and name as key +func (s k8sStore) GetSecret(key string) (*apiv1.Secret, error) { + return s.listers.Secret.GetByNamespaceName(key) +} + +// ListLocalSecrets returns the list of local Secrets +func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert { + var certs []*ingress.SSLCert + for _, item := range s.sslStore.List() { + if s, ok := item.(*ingress.SSLCert); ok { + certs = append(certs, s) + } + } + + return certs +} + +// GetService returns a Service using the namespace and name as key +func (s k8sStore) GetService(key string) (*apiv1.Service, error) { + return s.listers.Service.GetByNamespaceName(key) +} + +// GetSecret returns an Ingress using the namespace and name as key +func (s k8sStore) GetIngress(key string) (*extensions.Ingress, error) { + return s.listers.Ingress.GetByNamespaceName(key) +} + +// ListIngresses returns the list of Ingresses +func (s k8sStore) ListIngresses() []*extensions.Ingress { + // filter ingress rules + var ingresses []*extensions.Ingress + for _, item := range s.listers.Ingress.List() { + ing := item.(*extensions.Ingress) + if !class.IsValid(ing, ingress.IngressClass, ingress.DefaultIngressClass) { + continue + } + + ingresses = append(ingresses, ing) + } + + return ingresses +} + +// GetIngressAnnotations returns the annotations associated to an Ingress +func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) { + key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name) + item, exists, err := s.listers.IngressAnnotation.GetByKey(key) + if err != nil { + return nil, fmt.Errorf("unexpected error getting ingress annotation %v: %v", key, err) + } + if !exists { + return nil, fmt.Errorf("ingress annotation %v was not found", key) + } + return item.(*annotations.Ingress), nil +} + +// GetLocalSecret returns the local copy of a Secret +func (s k8sStore) GetLocalSecret(key string) (*ingress.SSLCert, error) { + return s.sslStore.GetByNamespaceName(key) +} + +func (s k8sStore) GetConfigMap(key string) (*apiv1.ConfigMap, error) { + return s.listers.ConfigMap.GetByNamespaceName(key) +} + +func (s k8sStore) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) { + return s.listers.Endpoint.GetServiceEndpoints(svc) +} + +// StartSync initiates the synchronization of the controllers +func (s k8sStore) StartSync(stopCh chan struct{}) { + // start controllers + s.cache.Run(stopCh) + // start goroutine to check for missing local secrets + go wait.Until(s.checkMissingSecrets, 30*time.Second, stopCh) +} diff --git a/internal/ingress/store/store_test.go b/internal/ingress/store/store_test.go new file mode 100644 index 000000000..af10b4d09 --- /dev/null +++ b/internal/ingress/store/store_test.go @@ -0,0 +1,314 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + "os" + "sync/atomic" + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/api/extensions/v1beta1" + extensions "k8s.io/api/extensions/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" + + "k8s.io/ingress-nginx/internal/ingress" + "k8s.io/ingress-nginx/internal/ingress/annotations" + "k8s.io/ingress-nginx/internal/ingress/resolver" + "k8s.io/ingress-nginx/test/e2e/framework" +) + +func TestStore(t *testing.T) { + // TODO: find a way to avoid the need to use a real api server + home := os.Getenv("HOME") + kubeConfigFile := fmt.Sprintf("%v/.kube/config", home) + kubeContext := "" + + kubeConfig, err := framework.LoadConfig(kubeConfigFile, kubeContext) + if err != nil { + t.Errorf("unexpected error loading kubeconfig file: %v", err) + } + + clientSet, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + t.Errorf("unexpected error creating ingress client: %v", err) + } + + t.Run("should return an error searching for non existing objects", func(t *testing.T) { + ns := createNamespace(clientSet, t) + defer deleteNamespace(ns, clientSet, t) + + stopCh := make(chan struct{}) + defer close(stopCh) + + updateCh := make(chan ingress.Event) + defer close(updateCh) + + go func(ch chan ingress.Event) { + for { + <-ch + } + }(updateCh) + + storer := New(ns.Name, + fmt.Sprintf("%v/config", ns.Name), + fmt.Sprintf("%v/tcp", ns.Name), + fmt.Sprintf("%v/udp", ns.Name), + 10*time.Minute, + &record.FakeRecorder{}, + clientSet, + annotations.Extractor{}, + resolver.Mock{}, + updateCh) + + storer.StartSync(stopCh) + + key := fmt.Sprintf("%v/anything", ns.Name) + ing, err := storer.GetIngress(key) + if err == nil { + t.Errorf("expected an error but none returned") + } + if ing != nil { + t.Errorf("expected an Ingres but none returned") + } + + ls, err := storer.GetLocalSecret(key) + if err == nil { + t.Errorf("expected an error but none returned") + } + if ls != nil { + t.Errorf("expected an Ingres but none returned") + } + + s, err := storer.GetSecret(key) + if err == nil { + t.Errorf("expected an error but none returned") + } + if s != nil { + t.Errorf("expected an Ingres but none returned") + } + + svc, err := storer.GetService(key) + if err == nil { + t.Errorf("expected an error but none returned") + } + if svc != nil { + t.Errorf("expected an Ingres but none returned") + } + }) + + t.Run("should return ingress one event for add, update and delete", func(t *testing.T) { + ns := createNamespace(clientSet, t) + defer deleteNamespace(ns, clientSet, t) + + stopCh := make(chan struct{}) + defer close(stopCh) + + updateCh := make(chan ingress.Event) + defer close(updateCh) + + var add uint64 + var upd uint64 + var del uint64 + + go func(ch chan ingress.Event) { + for { + e := <-ch + if e.Obj == nil { + continue + } + if _, ok := e.Obj.(*extensions.Ingress); !ok { + t.Errorf("expected an Ingress type but %T returned", e.Obj) + } + switch e.Type { + case ingress.CreateEvent: + atomic.AddUint64(&add, 1) + break + case ingress.UpdateEvent: + atomic.AddUint64(&upd, 1) + break + case ingress.DeleteEvent: + atomic.AddUint64(&del, 1) + break + } + } + }(updateCh) + + storer := New(ns.Name, + fmt.Sprintf("%v/config", ns.Name), + fmt.Sprintf("%v/tcp", ns.Name), + fmt.Sprintf("%v/udp", ns.Name), + 10*time.Minute, + &record.FakeRecorder{}, + clientSet, + annotations.Extractor{}, + resolver.Mock{}, + updateCh) + + storer.StartSync(stopCh) + + ing, err := ensureIngress(&v1beta1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dummy", + Namespace: ns.Name, + }, + Spec: v1beta1.IngressSpec{ + Rules: []v1beta1.IngressRule{ + { + Host: "dummy", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Path: "/", + Backend: v1beta1.IngressBackend{ + ServiceName: "http-svc", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + }, + }, clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + // create an invalid ingress (different class) + _, err = ensureIngress(&v1beta1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "custom-class", + Namespace: ns.Name, + Annotations: map[string]string{ + "kubernetes.io/ingress.class": "something", + }, + }, + Spec: v1beta1.IngressSpec{ + Rules: []v1beta1.IngressRule{ + { + Host: "dummy", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Path: "/", + Backend: v1beta1.IngressBackend{ + ServiceName: "http-svc", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + }, + }, clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + ni := ing.DeepCopy() + ni.Spec.Rules[0].Host = "update-dummy" + _, err = ensureIngress(ni, clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + err = clientSet.ExtensionsV1beta1(). + Ingresses(ni.Namespace). + Delete(ni.Name, &metav1.DeleteOptions{}) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + waitForNoIngressInNamespace(clientSet, ni.Namespace, ni.Name) + + if atomic.LoadUint64(&add) != 1 { + t.Errorf("expected 1 event of type Create but %v ocurred", add) + } + if atomic.LoadUint64(&upd) != 1 { + t.Errorf("expected 1 event of type Update but %v ocurred", upd) + } + if atomic.LoadUint64(&del) != 1 { + t.Errorf("expected 1 event of type Delete but %v ocurred", del) + } + }) +} + +func createNamespace(clientSet *kubernetes.Clientset, t *testing.T) *apiv1.Namespace { + t.Log("creating temporal namespace") + ns, err := framework.CreateKubeNamespace("store-test", clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress client: %v", err) + } + t.Logf("temporal namespace %v created", ns.Name) + + return ns +} + +func deleteNamespace(ns *apiv1.Namespace, clientSet *kubernetes.Clientset, t *testing.T) { + t.Logf("deleting temporal namespace %v created", ns.Name) + err := framework.DeleteKubeNamespace(clientSet, ns.Name) + if err != nil { + t.Errorf("unexpected error creating ingress client: %v", err) + } + t.Logf("temporal namespace %v deleted", ns.Name) +} + +func ensureIngress(ingress *extensions.Ingress, clientSet *kubernetes.Clientset) (*extensions.Ingress, error) { + s, err := clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress) + if err != nil { + if k8sErrors.IsNotFound(err) { + return clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(ingress) + } + return nil, err + } + return s, nil +} + +func waitForNoIngressInNamespace(c kubernetes.Interface, namespace, name string) error { + return wait.PollImmediate(1*time.Second, time.Minute*2, noIngressInNamespace(c, namespace, name)) +} + +func noIngressInNamespace(c kubernetes.Interface, namespace, name string) wait.ConditionFunc { + return func() (bool, error) { + ing, err := c.ExtensionsV1beta1().Ingresses(namespace).Get(name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return true, nil + } + if err != nil { + return false, err + } + + if ing == nil { + return true, nil + } + return false, nil + } +} diff --git a/internal/ingress/types.go b/internal/ingress/types.go index d9da68b2f..5cc210466 100644 --- a/internal/ingress/types.go +++ b/internal/ingress/types.go @@ -33,7 +33,6 @@ import ( "k8s.io/ingress-nginx/internal/ingress/annotations/redirect" "k8s.io/ingress-nginx/internal/ingress/annotations/rewrite" "k8s.io/ingress-nginx/internal/ingress/resolver" - "k8s.io/ingress-nginx/internal/ingress/store" ) var ( @@ -42,17 +41,26 @@ var ( // The name of each file is -.pem. The content is the concatenated // certificate and key. DefaultSSLDirectory = "/ingress-controller/ssl" + + // DefaultIngressClass defines the default ingress class of the ingress controller + DefaultIngressClass = "nginx" + + // IngressClass contains the configured ingress class. + // By default is empty + IngressClass = "" ) -// StoreLister returns the configured stores for ingresses, services, -// endpoints, secrets and configmaps. -type StoreLister struct { - Ingress store.IngressLister - Service store.ServiceLister - Endpoint store.EndpointLister - Secret store.SecretLister - ConfigMap store.ConfigMapLister - IngressAnnotation store.IngressAnnotationsLister +type EventType string + +const ( + CreateEvent EventType = "CREATE" + UpdateEvent EventType = "UPDATE" + DeleteEvent EventType = "DELETE" +) + +type Event struct { + Type EventType + Obj interface{} } // Configuration holds the definition of all the parts required to describe all