diff --git a/cmd/nginx/main.go b/cmd/nginx/main.go index d7edf8118..a3a09e37a 100644 --- a/cmd/nginx/main.go +++ b/cmd/nginx/main.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "math/rand" - "net" "net/http" "net/http/pprof" "os" @@ -29,7 +28,6 @@ import ( "syscall" "time" - proxyproto "github.com/armon/go-proxyproto" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -121,7 +119,7 @@ func main() { // create the default SSL certificate (dummy) defCert, defKey := ssl.GetFakeSSLCert() - c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{}) + c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{}, fs) if err != nil { glog.Fatalf("Error generating self signed certificate: %v", err) } @@ -133,10 +131,6 @@ func main() { ngx := controller.NewNGINXController(conf, fs) - if conf.EnableSSLPassthrough { - setupSSLProxy(conf.ListenPorts.HTTPS, conf.ListenPorts.SSLProxy, ngx) - } - go handleSigterm(ngx, func(code int) { os.Exit(code) }) @@ -168,49 +162,6 @@ func handleSigterm(ngx *controller.NGINXController, exit exiter) { exit(exitCode) } -func setupSSLProxy(sslPort, proxyPort int, n *controller.NGINXController) { - glog.Info("starting TLS proxy for SSL passthrough") - n.Proxy = &controller.TCPProxy{ - Default: &controller.TCPServer{ - Hostname: "localhost", - IP: "127.0.0.1", - Port: proxyPort, - ProxyProtocol: true, - }, - } - - listener, err := net.Listen("tcp", fmt.Sprintf(":%v", sslPort)) - if err != nil { - glog.Fatalf("%v", err) - } - - proxyList := &proxyproto.Listener{Listener: listener} - - // start goroutine that accepts tcp connections in port 443 - go func() { - for { - var conn net.Conn - var err error - - if n.IsProxyProtocolEnabled { - // we need to wrap the listener in order to decode - // proxy protocol before handling the connection - conn, err = proxyList.Accept() - } else { - conn, err = listener.Accept() - } - - if err != nil { - glog.Warningf("unexpected error accepting tcp connection: %v", err) - continue - } - - glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr()) - go n.Proxy.Handle(conn) - } - }() -} - // createApiserverClient creates new Kubernetes Apiserver client. When kubeconfig or apiserverHost param is empty // the function assumes that it is running inside a Kubernetes cluster and attempts to // discover the Apiserver. Otherwise, it connects to the Apiserver specified. @@ -325,10 +276,12 @@ func registerHandlers(enableProfiling bool, port int, ic *controller.NGINXContro } server := &http.Server{ - Addr: fmt.Sprintf(":%v", port), - Handler: mux, - ReadTimeout: 10 * time.Second, - WriteTimeout: 30 * time.Second, + Addr: fmt.Sprintf(":%v", port), + Handler: mux, + ReadTimeout: 10 * time.Second, + ReadHeaderTimeout: 10 * time.Second, + WriteTimeout: 300 * time.Second, + IdleTimeout: 120 * time.Second, } glog.Fatal(server.ListenAndServe()) } diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 306c96bc1..5d5f15d02 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -36,14 +36,9 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/annotations" - "k8s.io/ingress-nginx/internal/ingress/annotations/class" "k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck" - "k8s.io/ingress-nginx/internal/ingress/annotations/parser" "k8s.io/ingress-nginx/internal/ingress/annotations/proxy" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" - "k8s.io/ingress-nginx/internal/ingress/defaults" - "k8s.io/ingress-nginx/internal/ingress/resolver" "k8s.io/ingress-nginx/internal/k8s" "k8s.io/ingress-nginx/internal/task" ) @@ -101,14 +96,9 @@ type Configuration struct { SyncRateLimit float32 } -// GetDefaultBackend returns the default backend -func (n NGINXController) GetDefaultBackend() defaults.Backend { - return n.backendDefaults -} - // GetPublishService returns the configured service used to set ingress status func (n NGINXController) GetPublishService() *apiv1.Service { - s, err := n.listers.Service.GetByName(n.cfg.PublishService) + s, err := n.store.GetService(n.cfg.PublishService) if err != nil { return nil } @@ -116,16 +106,6 @@ func (n NGINXController) GetPublishService() *apiv1.Service { return s } -// GetSecret searches for a secret in the local secrets Store -func (n NGINXController) GetSecret(name string) (*apiv1.Secret, error) { - return n.listers.Secret.GetByName(name) -} - -// GetService searches for a service in the local secrets Store -func (n NGINXController) GetService(name string) (*apiv1.Service, error) { - return n.listers.Service.GetByName(name) -} - // sync collects all the pieces required to assemble the configuration file and // then sends the content to the backend (OnUpdate) receiving the populated // template as response reloading the backend if is required. @@ -138,33 +118,21 @@ func (n *NGINXController) syncIngress(item interface{}) error { if element, ok := item.(task.Element); ok { if name, ok := element.Key.(string); ok { - if obj, exists, _ := n.listers.Ingress.GetByKey(name); exists { - ing := obj.(*extensions.Ingress) - n.readSecrets(ing) + if ing, err := n.store.GetIngress(name); err == nil { + n.store.ReadSecrets(ing) } } } // Sort ingress rules using the ResourceVersion field - ings := n.listers.Ingress.List() + ings := n.store.ListIngresses() sort.SliceStable(ings, func(i, j int) bool { - ir := ings[i].(*extensions.Ingress).ResourceVersion - jr := ings[j].(*extensions.Ingress).ResourceVersion + ir := ings[i].ResourceVersion + jr := ings[j].ResourceVersion return ir < jr }) - // filter ingress rules - var ingresses []*extensions.Ingress - for _, ingIf := range ings { - ing := ingIf.(*extensions.Ingress) - if !class.IsValid(ing) { - continue - } - - ingresses = append(ingresses, ing) - } - - upstreams, servers := n.getBackendServers(ingresses) + upstreams, servers := n.getBackendServers(ings) var passUpstreams []*ingress.SSLPassthroughBackend for _, server := range servers { @@ -232,7 +200,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr return []ingress.L4Service{} } - configmap, err := n.listers.ConfigMap.GetByName(configmapName) + configmap, err := n.store.GetConfigMap(configmapName) if err != nil { glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err) return []ingress.L4Service{} @@ -242,6 +210,17 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr var svcProxyProtocol ingress.ProxyProtocol // k -> port to expose // v -> /: + + rp := []int{ + n.cfg.ListenPorts.HTTP, + n.cfg.ListenPorts.HTTPS, + n.cfg.ListenPorts.SSLProxy, + n.cfg.ListenPorts.Status, + n.cfg.ListenPorts.Health, + n.cfg.ListenPorts.Default, + } + reserverdPorts := sets.NewInt(rp...) + for k, v := range configmap.Data { externalPort, err := strconv.Atoi(k) if err != nil { @@ -249,16 +228,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr continue } - rp := []int{ - n.cfg.ListenPorts.HTTP, - n.cfg.ListenPorts.HTTPS, - n.cfg.ListenPorts.SSLProxy, - n.cfg.ListenPorts.Status, - n.cfg.ListenPorts.Health, - n.cfg.ListenPorts.Default, - } - - if intInSlice(externalPort, rp) { + if reserverdPorts.Has(externalPort) { glog.Warningf("port %v cannot be used for TCP or UDP services. It is reserved for the Ingress controller", k) continue } @@ -290,19 +260,12 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr continue } - svcObj, svcExists, err := n.listers.Service.GetByKey(nsName) + svc, err := n.store.GetService(nsName) if err != nil { glog.Warningf("error getting service %v: %v", nsName, err) continue } - if !svcExists { - glog.Warningf("service %v was not found", nsName) - continue - } - - svc := svcObj.(*apiv1.Service) - var endps []ingress.Endpoint targetPort, err := strconv.Atoi(svcPort) if err != nil { @@ -359,20 +322,13 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend { Name: defUpstreamName, } svcKey := n.cfg.DefaultService - svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey) + svc, err := n.store.GetService(svcKey) if err != nil { glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err) upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint()) return upstream } - if !svcExists { - glog.Warningf("service %v does not exist", svcKey) - upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint()) - return upstream - } - - svc := svcObj.(*apiv1.Service) endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{}) if len(endps) == 0 { glog.Warningf("service %v does not have any active endpoints", svcKey) @@ -392,7 +348,10 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([] servers := n.createServers(ingresses, upstreams, du) for _, ing := range ingresses { - anns := n.getIngressAnnotations(ing) + anns, err := n.store.GetIngressAnnotations(ing) + if err != nil { + glog.Errorf("unexpected error reading ingress annotations: %v", err) + } for _, rule := range ing.Spec.Rules { host := rule.Host @@ -603,29 +562,6 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([] return aUpstreams, aServers } -// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret -func (n NGINXController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) { - if _, exists := n.sslCertTracker.Get(name); !exists { - n.syncSecret(name) - } - - _, err := n.listers.Secret.GetByName(name) - if err != nil { - return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err) - } - - bc, exists := n.sslCertTracker.Get(name) - if !exists { - return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name) - } - cert := bc.(*ingress.SSLCert) - return &resolver.AuthSSLCert{ - Secret: name, - CAFileName: cert.CAFileName, - PemSHA: cert.PemSHA, - }, nil -} - // createUpstreams creates the NGINX upstreams for each service referenced in // Ingress rules. The servers inside the upstream are endpoints. func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingress.Backend) map[string]*ingress.Backend { @@ -633,7 +569,10 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres upstreams[defUpstreamName] = du for _, ing := range data { - anns := n.getIngressAnnotations(ing) + anns, err := n.store.GetIngressAnnotations(ing) + if err != nil { + glog.Errorf("unexpected error reading ingress annotations: %v", err) + } var defBackend string if ing.Spec.Backend != nil { @@ -730,7 +669,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres upstreams[name].Endpoints = endp } - s, err := n.listers.Service.GetByName(svcKey) + s, err := n.store.GetService(svcKey) if err != nil { glog.Warningf("error obtaining service: %v", err) continue @@ -745,13 +684,11 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres } func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) { - svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey) - - if !svcExists { + svc, err := n.store.GetService(svcKey) + if err != nil { return endpoint, fmt.Errorf("service %v does not exist", svcKey) } - svc := svcObj.(*apiv1.Service) if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" { return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey) } @@ -783,7 +720,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *exte // to a service. func (n *NGINXController) serviceEndpoints(svcKey, backendPort string, hz *healthcheck.Config) ([]ingress.Endpoint, error) { - svc, err := n.listers.Service.GetByName(svcKey) + svc, err := n.store.GetService(svcKey) var upstreams []ingress.Endpoint if err != nil { @@ -865,7 +802,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, // remove the alias to avoid conflicts. aliases := make(map[string]string, len(data)) - bdef := n.GetDefaultBackend() + bdef := n.store.GetDefaultBackend() ngxProxy := proxy.Config{ BodySize: bdef.ProxyBodySize, ConnectTimeout: bdef.ProxyConnectTimeout, @@ -885,7 +822,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, // Tries to fetch the default Certificate from nginx configuration. // If it does not exists, use the ones generated on Start() - defaultCertificate, err := n.getPemCertificate(n.cfg.DefaultSSLCertificate) + defaultCertificate, err := n.store.GetLocalSecret(n.cfg.DefaultSSLCertificate) if err == nil { defaultPemFileName = defaultCertificate.PemFileName defaultPemSHA = defaultCertificate.PemSHA @@ -908,7 +845,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, // initialize all the servers for _, ing := range data { - anns := n.getIngressAnnotations(ing) + anns, err := n.store.GetIngressAnnotations(ing) + if err != nil { + glog.Errorf("unexpected error reading ingress annotations: %v", err) + } // default upstream server un := du.Name @@ -976,7 +916,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, // configure default location, alias, and SSL for _, ing := range data { - anns := n.getIngressAnnotations(ing) + anns, err := n.store.GetIngressAnnotations(ing) + if err != nil { + glog.Errorf("unexpected error reading ingress annotations: %v", err) + } for _, rule := range ing.Spec.Rules { host := rule.Host @@ -1041,13 +984,12 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, } key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName) - bc, exists := n.sslCertTracker.Get(key) - if !exists { + cert, err := n.store.GetLocalSecret(key) + if err != nil { glog.Warningf("ssl certificate \"%v\" does not exist in local store", key) continue } - cert := bc.(*ingress.SSLCert) err = cert.Certificate.VerifyHostname(host) if err != nil { glog.Warningf("unexpected error validating SSL certificate %v for host %v. Reason: %v", key, host, err) @@ -1124,7 +1066,7 @@ func (n *NGINXController) getEndpoints( } glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String()) - ep, err := n.listers.Endpoint.GetServiceEndpoints(s) + ep, err := n.store.GetServiceEndpoints(s) if err != nil { glog.Warningf("unexpected error obtaining service endpoints: %v", err) return upsServers @@ -1173,24 +1115,6 @@ func (n *NGINXController) getEndpoints( return upsServers } -// readSecrets extracts information about secrets from an Ingress rule -func (n *NGINXController) readSecrets(ing *extensions.Ingress) { - for _, tls := range ing.Spec.TLS { - if tls.SecretName == "" { - continue - } - - key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) - n.syncSecret(key) - } - - key, _ := parser.GetStringAnnotation("auth-tls-secret", ing) - if key == "" { - return - } - n.syncSecret(key) -} - func (n *NGINXController) isForceReload() bool { return atomic.LoadInt32(&n.forceReload) != 0 } @@ -1204,27 +1128,3 @@ func (n *NGINXController) SetForceReload(shouldReload bool) { atomic.StoreInt32(&n.forceReload, 0) } } - -func (n *NGINXController) extractAnnotations(ing *extensions.Ingress) { - glog.V(3).Infof("updating annotations information for ingress %v/%v", ing.Namespace, ing.Name) - anns := n.annotations.Extract(ing) - err := n.listers.IngressAnnotation.Update(anns) - if err != nil { - glog.Errorf("unexpected error updating annotations information for ingress %v/%v: %v", anns.Namespace, anns.Name, err) - } -} - -// getByIngress returns the parsed annotations from an Ingress -func (n *NGINXController) getIngressAnnotations(ing *extensions.Ingress) *annotations.Ingress { - key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name) - item, exists, err := n.listers.IngressAnnotation.GetByKey(key) - if err != nil { - glog.Errorf("unexpected error getting ingress annotation %v: %v", key, err) - return &annotations.Ingress{} - } - if !exists { - glog.Errorf("ingress annotation %v was not found", key) - return &annotations.Ingress{} - } - return item.(*annotations.Ingress) -} diff --git a/internal/ingress/controller/listers.go b/internal/ingress/controller/listers.go deleted file mode 100644 index 205f5acba..000000000 --- a/internal/ingress/controller/listers.go +++ /dev/null @@ -1,244 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "fmt" - "reflect" - "time" - - "github.com/golang/glog" - - apiv1 "k8s.io/api/core/v1" - extensions "k8s.io/api/extensions/v1beta1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/cache" - cache_client "k8s.io/client-go/tools/cache" - - "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/annotations/class" -) - -type cacheController struct { - Ingress cache.Controller - Endpoint cache.Controller - Service cache.Controller - Secret cache.Controller - Configmap cache.Controller -} - -func (c *cacheController) Run(stopCh chan struct{}) { - go c.Endpoint.Run(stopCh) - go c.Service.Run(stopCh) - go c.Secret.Run(stopCh) - go c.Configmap.Run(stopCh) - - time.Sleep(1 * time.Second) - go c.Ingress.Run(stopCh) - - // Wait for all involved caches to be synced, before processing items from the queue is started - if !cache.WaitForCacheSync(stopCh, - c.Endpoint.HasSynced, - c.Service.HasSynced, - c.Secret.HasSynced, - c.Configmap.HasSynced, - ) { - runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) - } - - // We need to wait before start syncing the ingress rules - // because the rules requires content from other listers - time.Sleep(1 * time.Second) - go c.Ingress.Run(stopCh) - if !cache.WaitForCacheSync(stopCh, - c.Ingress.HasSynced, - ) { - runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) - } -} - -func (n *NGINXController) createListers(stopCh chan struct{}) (*ingress.StoreLister, *cacheController) { - ingEventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - addIng := obj.(*extensions.Ingress) - if !class.IsValid(addIng) { - a := addIng.GetAnnotations()[class.IngressKey] - glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a) - return - } - - n.extractAnnotations(addIng) - n.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) - n.syncQueue.Enqueue(obj) - }, - DeleteFunc: func(obj interface{}) { - delIng, ok := obj.(*extensions.Ingress) - if !ok { - // If we reached here it means the ingress was deleted but its final state is unrecorded. - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("couldn't get object from tombstone %#v", obj) - return - } - delIng, ok = tombstone.Obj.(*extensions.Ingress) - if !ok { - glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj) - return - } - } - if !class.IsValid(delIng) { - glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey) - return - } - n.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) - n.listers.IngressAnnotation.Delete(delIng) - n.syncQueue.Enqueue(obj) - }, - UpdateFunc: func(old, cur interface{}) { - oldIng := old.(*extensions.Ingress) - curIng := cur.(*extensions.Ingress) - validOld := class.IsValid(oldIng) - validCur := class.IsValid(curIng) - - c := curIng.GetAnnotations()[class.IngressKey] - if !validOld && validCur { - glog.Infof("creating ingress %v/%v based on annotation %v with value '%v'", curIng.Namespace, curIng.Name, class.IngressKey, c) - n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) - } else if validOld && !validCur { - glog.Infof("removing ingress %v/%v based on annotation %v with value '%v'", curIng.Namespace, curIng.Name, class.IngressKey, c) - n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) - } else if validCur && !reflect.DeepEqual(old, cur) { - n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) - } - - n.extractAnnotations(curIng) - n.syncQueue.Enqueue(cur) - }, - } - - secrEventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - n.syncQueue.Enqueue(obj) - }, - UpdateFunc: func(old, cur interface{}) { - if !reflect.DeepEqual(old, cur) { - sec := cur.(*apiv1.Secret) - key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) - _, exists := n.sslCertTracker.Get(key) - if exists { - n.syncSecret(key) - } - } - }, - DeleteFunc: func(obj interface{}) { - sec, ok := obj.(*apiv1.Secret) - if !ok { - // If we reached here it means the secret was deleted but its final state is unrecorded. - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("couldn't get object from tombstone %#v", obj) - return - } - sec, ok = tombstone.Obj.(*apiv1.Secret) - if !ok { - glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj) - return - } - } - key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) - n.sslCertTracker.Delete(key) - n.syncQueue.Enqueue(obj) - }, - } - - eventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - n.syncQueue.Enqueue(obj) - }, - DeleteFunc: func(obj interface{}) { - n.syncQueue.Enqueue(obj) - }, - UpdateFunc: func(old, cur interface{}) { - oep := old.(*apiv1.Endpoints) - ocur := cur.(*apiv1.Endpoints) - if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) { - n.syncQueue.Enqueue(cur) - } - }, - } - - mapEventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - upCmap := obj.(*apiv1.ConfigMap) - mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) - if mapKey == n.cfg.ConfigMapName { - glog.V(2).Infof("adding configmap %v to backend", mapKey) - n.SetConfig(upCmap) - n.SetForceReload(true) - } - }, - UpdateFunc: func(old, cur interface{}) { - if !reflect.DeepEqual(old, cur) { - upCmap := cur.(*apiv1.ConfigMap) - mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) - if mapKey == n.cfg.ConfigMapName { - glog.V(2).Infof("updating configmap backend (%v)", mapKey) - n.SetConfig(upCmap) - n.SetForceReload(true) - } - // updates to configuration configmaps can trigger an update - if mapKey == n.cfg.ConfigMapName || mapKey == n.cfg.TCPConfigMapName || mapKey == n.cfg.UDPConfigMapName { - n.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) - n.syncQueue.Enqueue(cur) - } - } - }, - } - - watchNs := apiv1.NamespaceAll - if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll { - watchNs = n.cfg.Namespace - } - - lister := &ingress.StoreLister{} - lister.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) - - controller := &cacheController{} - - lister.Ingress.Store, controller.Ingress = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", n.cfg.Namespace, fields.Everything()), - &extensions.Ingress{}, n.cfg.ResyncPeriod, ingEventHandler) - - lister.Endpoint.Store, controller.Endpoint = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "endpoints", n.cfg.Namespace, fields.Everything()), - &apiv1.Endpoints{}, n.cfg.ResyncPeriod, eventHandler) - - lister.Secret.Store, controller.Secret = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()), - &apiv1.Secret{}, n.cfg.ResyncPeriod, secrEventHandler) - - lister.ConfigMap.Store, controller.Configmap = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()), - &apiv1.ConfigMap{}, n.cfg.ResyncPeriod, mapEventHandler) - - lister.Service.Store, controller.Service = cache.NewInformer( - cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "services", n.cfg.Namespace, fields.Everything()), - &apiv1.Service{}, n.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) - - return lister, controller -} diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index a21052039..76519b492 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -18,14 +18,12 @@ package controller import ( "bytes" - "encoding/base64" "errors" "fmt" "io/ioutil" "net" "os" "os/exec" - "runtime" "strconv" "strings" "sync" @@ -34,9 +32,9 @@ import ( "github.com/golang/glog" + proxyproto "github.com/armon/go-proxyproto" apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -49,10 +47,9 @@ import ( "k8s.io/ingress-nginx/internal/ingress/annotations/class" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" "k8s.io/ingress-nginx/internal/ingress/controller/process" + "k8s.io/ingress-nginx/internal/ingress/controller/store" ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template" - "k8s.io/ingress-nginx/internal/ingress/defaults" "k8s.io/ingress-nginx/internal/ingress/status" - "k8s.io/ingress-nginx/internal/ingress/store" ing_net "k8s.io/ingress-nginx/internal/net" "k8s.io/ingress-nginx/internal/net/dns" "k8s.io/ingress-nginx/internal/net/ssl" @@ -96,16 +93,12 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl } n := &NGINXController{ - backendDefaults: ngx_config.NewDefault().Backend, - binary: ngx, - - configmap: &apiv1.ConfigMap{}, + binary: ngx, isIPV6Enabled: ing_net.IsIPv6Enabled(), resolver: h, cfg: config, - sslCertTracker: store.NewSSLCertTracker(), syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ @@ -113,6 +106,8 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl }), stopCh: make(chan struct{}), + updateCh: make(chan store.Event, 1024), + stopLock: &sync.Mutex{}, fileSystem: fs, @@ -121,19 +116,27 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl runningConfig: &ingress.Configuration{}, } - n.listers, n.controllers = n.createListers(n.stopCh) + n.store = store.New(true, + config.Namespace, + config.ConfigMapName, + config.TCPConfigMapName, + config.UDPConfigMapName, + config.ResyncPeriod, + config.Client, + fs, + n.updateCh) n.stats = newStatsCollector(config.Namespace, class.IngressClass, n.binary, n.cfg.ListenPorts.Status) n.syncQueue = task.NewTaskQueue(n.syncIngress) - n.annotations = annotations.NewAnnotationExtractor(n) + n.annotations = annotations.NewAnnotationExtractor(n.store) if config.UpdateStatus { n.syncStatus = status.NewStatusSyncer(status.Config{ Client: config.Client, PublishService: config.PublishService, - IngressLister: n.listers.Ingress, + IngressLister: n.store, ElectionID: config.ElectionID, IngressClass: class.IngressClass, DefaultIngressClass: class.DefaultClass, @@ -186,9 +189,6 @@ Error loading new template : %v type NGINXController struct { cfg *Configuration - listers *ingress.StoreLister - controllers *cacheController - annotations annotations.Extractor recorder record.EventRecorder @@ -197,10 +197,6 @@ type NGINXController struct { syncStatus status.Sync - // local store of SSL certificates - // (only certificates used in ingress) - sslCertTracker *store.SSLCertTracker - syncRateLimiter flowcontrol.RateLimiter // stopLock is used to enforce only a single call to Stop is active. @@ -208,7 +204,8 @@ type NGINXController struct { // allowing concurrent stoppers leads to stack traces. stopLock *sync.Mutex - stopCh chan struct{} + stopCh chan struct{} + updateCh chan store.Event // ngxErrCh channel used to detect errors with the nginx processes ngxErrCh chan error @@ -220,8 +217,6 @@ type NGINXController struct { t *ngx_template.Template - configmap *apiv1.ConfigMap - binary string resolver []net.IP @@ -231,14 +226,11 @@ type NGINXController struct { // returns true if IPV6 is enabled in the pod isIPV6Enabled bool - // returns true if proxy protocol es enabled - IsProxyProtocolEnabled bool - isShuttingDown bool Proxy *TCPProxy - backendDefaults defaults.Backend + store store.Storer fileSystem filesystem.Filesystem } @@ -247,32 +239,12 @@ type NGINXController struct { func (n *NGINXController) Start() { glog.Infof("starting Ingress controller") - n.controllers.Run(n.stopCh) - - // initial sync of secrets to avoid unnecessary reloads - glog.Info("running initial sync of secrets") - for _, obj := range n.listers.Ingress.List() { - ing := obj.(*extensions.Ingress) - - if !class.IsValid(ing) { - a := ing.GetAnnotations()[class.IngressKey] - glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a) - continue - } - - n.readSecrets(ing) - } - - if n.cfg.EnableSSLChainCompletion { - go wait.Until(n.checkSSLChainIssues, 60*time.Second, n.stopCh) - } + n.store.Run(n.stopCh) if n.syncStatus != nil { go n.syncStatus.Run() } - go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh) - done := make(chan error, 1) cmd := exec.Command(n.binary, "-c", cfgPath) @@ -283,6 +255,10 @@ func (n *NGINXController) Start() { Pgid: 0, } + if n.cfg.EnableSSLPassthrough { + n.setupSSLProxy() + } + glog.Info("starting NGINX process...") n.start(cmd) @@ -310,6 +286,16 @@ func (n *NGINXController) Start() { // start a new nginx master process if the controller is not being stopped n.start(cmd) } + case evt := <-n.updateCh: + if n.isShuttingDown { + break + } + glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj) + if evt.Type == store.ConfigurationEvent { + n.SetForceReload(true) + } + + n.syncQueue.Enqueue(evt.Obj) case <-n.stopCh: break } @@ -412,37 +398,6 @@ Error: %v return nil } -// SetConfig sets the configured configmap -func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) { - n.configmap = cmap - n.IsProxyProtocolEnabled = false - - m := map[string]string{} - if cmap != nil { - m = cmap.Data - } - - val, ok := m["use-proxy-protocol"] - if ok { - b, err := strconv.ParseBool(val) - if err == nil { - n.IsProxyProtocolEnabled = b - } - } - - c := ngx_template.ReadConfig(m) - if c.SSLSessionTicketKey != "" { - d, err := base64.StdEncoding.DecodeString(c.SSLSessionTicketKey) - if err != nil { - glog.Warningf("unexpected error decoding key ssl-session-ticket-key: %v", err) - c.SSLSessionTicketKey = "" - } - ioutil.WriteFile("/etc/nginx/tickets.key", d, 0644) - } - - n.backendDefaults = c.Backend -} - // OnUpdate is called periodically by syncQueue to keep the configuration in sync. // // 1. converts configmap configuration to custom configuration object @@ -452,7 +407,7 @@ func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) { // returning nill implies the backend will be reloaded. // if an error is returned means requeue the update func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { - cfg := ngx_template.ReadConfig(n.configmap.Data) + cfg := n.store.GetBackendConfiguration() cfg.Resolver = n.resolver servers := []*TCPServer{} @@ -488,10 +443,6 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { }) } - if n.cfg.EnableSSLPassthrough { - n.Proxy.ServerList = servers - } - // we need to check if the status module configuration changed if cfg.EnableVtsStatus { n.setupMonitor(vtsStatusModule) @@ -562,63 +513,48 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { setHeaders := map[string]string{} if cfg.ProxySetHeaders != "" { - cmap, exists, err := n.listers.ConfigMap.GetByKey(cfg.ProxySetHeaders) + cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders) if err != nil { glog.Warningf("unexpected error reading configmap %v: %v", cfg.ProxySetHeaders, err) } - if exists { - setHeaders = cmap.(*apiv1.ConfigMap).Data - } + setHeaders = cmap.Data } addHeaders := map[string]string{} if cfg.AddHeaders != "" { - cmap, exists, err := n.listers.ConfigMap.GetByKey(cfg.AddHeaders) + cmap, err := n.store.GetConfigMap(cfg.AddHeaders) if err != nil { glog.Warningf("unexpected error reading configmap %v: %v", cfg.AddHeaders, err) } - if exists { - addHeaders = cmap.(*apiv1.ConfigMap).Data - } + addHeaders = cmap.Data } sslDHParam := "" if cfg.SSLDHParam != "" { secretName := cfg.SSLDHParam - s, exists, err := n.listers.Secret.GetByKey(secretName) + + secret, err := n.store.GetSecret(secretName) if err != nil { glog.Warningf("unexpected error reading secret %v: %v", secretName, err) } - if exists { - secret := s.(*apiv1.Secret) - nsSecName := strings.Replace(secretName, "/", "-", -1) + nsSecName := strings.Replace(secretName, "/", "-", -1) - dh, ok := secret.Data["dhparam.pem"] - if ok { - pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh) - if err != nil { - glog.Warningf("unexpected error adding or updating dhparam %v file: %v", nsSecName, err) - } else { - sslDHParam = pemFileName - } + dh, ok := secret.Data["dhparam.pem"] + if ok { + pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh, n.fileSystem) + if err != nil { + glog.Warningf("unexpected error adding or updating dhparam %v file: %v", nsSecName, err) + } else { + sslDHParam = pemFileName } } } cfg.SSLDHParam = sslDHParam - // disable features are not available in some platforms - switch runtime.GOARCH { - case "arm", "arm64", "ppc64le": - cfg.EnableModsecurity = false - case "s390x": - cfg.EnableModsecurity = false - cfg.EnableBrotli = false - } - tc := ngx_config.TemplateConfig{ ProxySetHeaders: setHeaders, AddHeaders: addHeaders, @@ -713,3 +649,49 @@ func nextPowerOf2(v int) int { return v } + +func (n *NGINXController) setupSSLProxy() { + sslPort := n.cfg.ListenPorts.HTTPS + proxyPort := n.cfg.ListenPorts.SSLProxy + + glog.Info("starting TLS proxy for SSL passthrough") + n.Proxy = &TCPProxy{ + Default: &TCPServer{ + Hostname: "localhost", + IP: "127.0.0.1", + Port: proxyPort, + ProxyProtocol: true, + }, + } + + listener, err := net.Listen("tcp", fmt.Sprintf(":%v", sslPort)) + if err != nil { + glog.Fatalf("%v", err) + } + + proxyList := &proxyproto.Listener{Listener: listener} + + // start goroutine that accepts tcp connections in port 443 + go func() { + for { + var conn net.Conn + var err error + + if n.store.GetBackendConfiguration().UseProxyProtocol { + // we need to wrap the listener in order to decode + // proxy protocol before handling the connection + conn, err = proxyList.Accept() + } else { + conn, err = listener.Accept() + } + + if err != nil { + glog.Warningf("unexpected error accepting tcp connection: %v", err) + continue + } + + glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr()) + go n.Proxy.Handle(conn) + } + }() +} diff --git a/internal/ingress/controller/backend_ssl.go b/internal/ingress/controller/store/backend_ssl.go similarity index 62% rename from internal/ingress/controller/backend_ssl.go rename to internal/ingress/controller/store/backend_ssl.go index c48a34d54..6d58cb5ce 100644 --- a/internal/ingress/controller/backend_ssl.go +++ b/internal/ingress/controller/store/backend_ssl.go @@ -14,11 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package store import ( "fmt" - "io/ioutil" "strings" "github.com/golang/glog" @@ -26,52 +25,57 @@ import ( apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/ingress-nginx/internal/file" "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/annotations/class" "k8s.io/ingress-nginx/internal/ingress/annotations/parser" + "k8s.io/ingress-nginx/internal/k8s" "k8s.io/ingress-nginx/internal/net/ssl" ) // syncSecret keeps in sync Secrets used by Ingress rules with the files on // disk to allow copy of the content of the secret to disk to be used // by external processes. -func (ic *NGINXController) syncSecret(key string) { +func (s k8sStore) syncSecret(key string) { + s.mu.Lock() + defer s.mu.Unlock() + glog.V(3).Infof("starting syncing of secret %v", key) - cert, err := ic.getPemCertificate(key) + // TODO: getPemCertificate should not write to disk to avoid unnecessary overhead + cert, err := s.getPemCertificate(key) if err != nil { glog.Warningf("error obtaining PEM from secret %v: %v", key, err) return } // create certificates and add or update the item in the store - cur, exists := ic.sslCertTracker.Get(key) - if exists { - s := cur.(*ingress.SSLCert) - if s.Equal(cert) { + cur, err := s.GetLocalSecret(key) + if err == nil { + if cur.Equal(cert) { // no need to update return } glog.Infof("updating secret %v in the local store", key) - ic.sslCertTracker.Update(key, cert) + s.sslStore.Update(key, cert) // this update must trigger an update // (like an update event from a change in Ingress) - ic.syncQueue.Enqueue(&extensions.Ingress{}) + s.sendDummyEvent() return } glog.Infof("adding secret %v to the local store", key) - ic.sslCertTracker.Add(key, cert) + s.sslStore.Add(key, cert) // this update must trigger an update // (like an update event from a change in Ingress) - ic.syncQueue.Enqueue(&extensions.Ingress{}) + s.sendDummyEvent() } // getPemCertificate receives a secret, and creates a ingress.SSLCert as return. // It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only. -func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCert, error) { - secret, err := ic.listers.Secret.GetByName(secretName) +func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error) { + secret, err := s.listers.Secret.ByKey(secretName) if err != nil { return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err) } @@ -83,7 +87,7 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer // namespace/secretName -> namespace-secretName nsSecName := strings.Replace(secretName, "/", "-", -1) - var s *ingress.SSLCert + var sslCert *ingress.SSLCert if okcert && okkey { if cert == nil { return nil, fmt.Errorf("secret %v has no 'tls.crt'", secretName) @@ -94,18 +98,17 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer // If 'ca.crt' is also present, it will allow this secret to be used in the // 'nginx.ingress.kubernetes.io/auth-tls-secret' annotation - s, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca) + sslCert, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca, s.filesystem) if err != nil { return nil, fmt.Errorf("unexpected error creating pem file: %v", err) } - glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, s.CN) + glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, sslCert.CN) if ca != nil { glog.V(3).Infof("found 'ca.crt', secret %v can also be used for Certificate Authentication", secretName) } - } else if ca != nil { - s, err = ssl.AddCertAuth(nsSecName, ca) + sslCert, err = ssl.AddCertAuth(nsSecName, ca, s.filesystem) if err != nil { return nil, fmt.Errorf("unexpected error creating pem file: %v", err) @@ -119,29 +122,40 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer return nil, fmt.Errorf("no keypair or CA cert could be found in %v", secretName) } - s.Name = secret.Name - s.Namespace = secret.Namespace - return s, nil + sslCert.Name = secret.Name + sslCert.Namespace = secret.Namespace + + return sslCert, nil } -func (ic *NGINXController) checkSSLChainIssues() { - for _, secretName := range ic.sslCertTracker.ListKeys() { - s, _ := ic.sslCertTracker.Get(secretName) - secret := s.(*ingress.SSLCert) +func (s k8sStore) checkSSLChainIssues() { + for _, item := range s.ListLocalSecrets() { + secretName := k8s.MetaNamespaceKey(item) + secret, err := s.GetLocalSecret(secretName) + if err != nil { + continue + } if secret.FullChainPemFileName != "" { // chain already checked continue } - data, err := ssl.FullChainCert(secret.PemFileName) + data, err := ssl.FullChainCert(secret.PemFileName, s.filesystem) if err != nil { glog.Errorf("unexpected error generating SSL certificate with full intermediate chain CA certs: %v", err) continue } - fullChainPemFileName := fmt.Sprintf("%v/%v-%v-full-chain.pem", ingress.DefaultSSLDirectory, secret.Namespace, secret.Name) - err = ioutil.WriteFile(fullChainPemFileName, data, 0655) + fullChainPemFileName := fmt.Sprintf("%v/%v-%v-full-chain.pem", file.DefaultSSLDirectory, secret.Namespace, secret.Name) + + file, err := s.filesystem.Create(fullChainPemFileName) + if err != nil { + glog.Errorf("unexpected error creating SSL certificate file %v: %v", fullChainPemFileName, err) + continue + } + + _, err = file.Write(data) if err != nil { glog.Errorf("unexpected error creating SSL certificate: %v", err) continue @@ -158,42 +172,67 @@ func (ic *NGINXController) checkSSLChainIssues() { dst.FullChainPemFileName = fullChainPemFileName glog.Infof("updating local copy of ssl certificate %v with missing intermediate CA certs", secretName) - ic.sslCertTracker.Update(secretName, dst) + s.sslStore.Update(secretName, dst) // this update must trigger an update // (like an update event from a change in Ingress) - ic.syncQueue.Enqueue(&extensions.Ingress{}) + s.sendDummyEvent() } } -// checkMissingSecrets verify if one or more ingress rules contains a reference -// to a secret that is not present in the local secret store. -// In this case we call syncSecret. -func (ic *NGINXController) checkMissingSecrets() { - for _, obj := range ic.listers.Ingress.List() { - ing := obj.(*extensions.Ingress) - - if !class.IsValid(ing) { - continue - } - +// checkMissingSecrets verifies if one or more ingress rules contains +// a reference to a secret that is not present in the local secret store. +func (s k8sStore) checkMissingSecrets() { + for _, ing := range s.ListIngresses() { for _, tls := range ing.Spec.TLS { if tls.SecretName == "" { continue } key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) - if _, ok := ic.sslCertTracker.Get(key); !ok { - ic.syncSecret(key) + if _, ok := s.sslStore.Get(key); !ok { + s.syncSecret(key) } } key, _ := parser.GetStringAnnotation("auth-tls-secret", ing) if key == "" { - continue + return } - if _, ok := ic.sslCertTracker.Get(key); !ok { - ic.syncSecret(key) + if _, ok := s.sslStore.Get(key); !ok { + s.syncSecret(key) } } } + +// ReadSecrets extracts information about secrets from an Ingress rule +func (s k8sStore) ReadSecrets(ing *extensions.Ingress) { + for _, tls := range ing.Spec.TLS { + if tls.SecretName == "" { + continue + } + + key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) + s.syncSecret(key) + } + + key, _ := parser.GetStringAnnotation("auth-tls-secret", ing) + if key == "" { + return + } + s.syncSecret(key) +} + +// sendDummyEvent sends a dummy event to trigger an update +// This is used in when a secret change +func (s *k8sStore) sendDummyEvent() { + s.updateCh <- Event{ + Type: UpdateEvent, + Obj: &extensions.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dummy", + Namespace: "dummy", + }, + }, + } +} diff --git a/internal/ingress/controller/backend_ssl_test.go b/internal/ingress/controller/store/backend_ssl_test.go similarity index 87% rename from internal/ingress/controller/backend_ssl_test.go rename to internal/ingress/controller/store/backend_ssl_test.go index 7c0de477d..6477f51c6 100644 --- a/internal/ingress/controller/backend_ssl_test.go +++ b/internal/ingress/controller/store/backend_ssl_test.go @@ -14,23 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package store import ( "encoding/base64" - "fmt" - "io/ioutil" - "testing" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testclient "k8s.io/client-go/kubernetes/fake" cache_client "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/flowcontrol" - - "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/store" - "k8s.io/ingress-nginx/internal/task" ) const ( @@ -65,8 +57,8 @@ func buildSimpleClientSetForBackendSSL() *testclient.Clientset { return testclient.NewSimpleClientset() } -func buildIngListenerForBackendSSL() store.IngressLister { - ingLister := store.IngressLister{} +func buildIngListenerForBackendSSL() IngressLister { + ingLister := IngressLister{} ingLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) return ingLister } @@ -80,20 +72,21 @@ func buildSecretForBackendSSL() *apiv1.Secret { } } -func buildSecrListerForBackendSSL() store.SecretLister { - secrLister := store.SecretLister{} +func buildSecrListerForBackendSSL() SecretLister { + secrLister := SecretLister{} secrLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) return secrLister } +/* func buildListers() *ingress.StoreLister { sl := &ingress.StoreLister{} sl.Ingress.Store = buildIngListenerForBackendSSL() sl.Secret.Store = buildSecrListerForBackendSSL() return sl } - +*/ func buildControllerForBackendSSL() cache_client.Controller { cfg := &cache_client.Config{ Queue: &MockQueue{Synced: true}, @@ -102,6 +95,7 @@ func buildControllerForBackendSSL() cache_client.Controller { return cache_client.New(cfg) } +/* func buildGenericControllerForBackendSSL() *NGINXController { gc := &NGINXController{ syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1), @@ -109,21 +103,15 @@ func buildGenericControllerForBackendSSL() *NGINXController { Client: buildSimpleClientSetForBackendSSL(), }, listers: buildListers(), - sslCertTracker: store.NewSSLCertTracker(), + sslCertTracker: NewSSLCertTracker(), } gc.syncQueue = task.NewTaskQueue(gc.syncIngress) return gc } +*/ func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) { - // prepare - td, err := ioutil.TempDir("", "ssl") - if err != nil { - return nil, nil, nil, fmt.Errorf("error occurs while creating temp directory: %v", err) - } - ingress.DefaultSSLDirectory = td - dCrt, err := base64.StdEncoding.DecodeString(tlsCrt) if err != nil { return nil, nil, nil, err @@ -139,6 +127,7 @@ func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) { return dCrt, dKey, dCa, nil } +/* func TestSyncSecret(t *testing.T) { // prepare for test dCrt, dKey, dCa, err := buildCrtKeyAndCA() @@ -152,8 +141,8 @@ func TestSyncSecret(t *testing.T) { Data map[string][]byte expectSuccess bool }{ - {"getPemCertificate_error", "default/foo_secret", map[string][]byte{apiv1.TLSPrivateKeyKey: dKey}, false}, - {"normal_test", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt, apiv1.TLSPrivateKeyKey: dKey, tlscaName: dCa}, true}, + {"getPemCertificate_error", "default/foo_secret", map[string][]byte{api.TLSPrivateKeyKey: dKey}, false}, + {"normal_test", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt, api.TLSPrivateKeyKey: dKey, tlscaName: dCa}, true}, } for _, foo := range foos { @@ -199,12 +188,12 @@ func TestGetPemCertificate(t *testing.T) { }{ {"sceret_not_exist", "default/foo_secret_not_exist", nil, true}, {"data_not_complete_all_not_exist", "default/foo_secret", map[string][]byte{}, true}, - {"data_not_complete_TLSCertKey_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSPrivateKeyKey: dKey, tlscaName: dCa}, false}, - {"data_not_complete_TLSCertKeyAndCA_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSPrivateKeyKey: dKey}, true}, - {"data_not_complete_TLSPrivateKeyKey_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt, tlscaName: dCa}, false}, - {"data_not_complete_TLSPrivateKeyKeyAndCA_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt}, true}, - {"data_not_complete_CA_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt, apiv1.TLSPrivateKeyKey: dKey}, false}, - {"normal_test", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt, apiv1.TLSPrivateKeyKey: dKey, tlscaName: dCa}, false}, + {"data_not_complete_TLSCertKey_not_exist", "default/foo_secret", map[string][]byte{api.TLSPrivateKeyKey: dKey, tlscaName: dCa}, false}, + {"data_not_complete_TLSCertKeyAndCA_not_exist", "default/foo_secret", map[string][]byte{api.TLSPrivateKeyKey: dKey}, true}, + {"data_not_complete_TLSPrivateKeyKey_not_exist", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt, tlscaName: dCa}, false}, + {"data_not_complete_TLSPrivateKeyKeyAndCA_not_exist", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt}, true}, + {"data_not_complete_CA_not_exist", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt, api.TLSPrivateKeyKey: dKey}, false}, + {"normal_test", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt, api.TLSPrivateKeyKey: dKey, tlscaName: dCa}, false}, } for _, foo := range foos { @@ -231,3 +220,4 @@ func TestGetPemCertificate(t *testing.T) { }) } } +*/ diff --git a/internal/ingress/controller/store/configmap.go b/internal/ingress/controller/store/configmap.go new file mode 100644 index 000000000..d679a86c4 --- /dev/null +++ b/internal/ingress/controller/store/configmap.go @@ -0,0 +1,41 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +// ConfigMapLister makes a Store that lists Configmaps. +type ConfigMapLister struct { + cache.Store +} + +// ByKey searches for a configmap in the local configmaps Store +func (cml *ConfigMapLister) ByKey(key string) (*apiv1.ConfigMap, error) { + s, exists, err := cml.GetByKey(key) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("configmap %v was not found", key) + } + return s.(*apiv1.ConfigMap), nil +} diff --git a/internal/ingress/controller/store/endpoint.go b/internal/ingress/controller/store/endpoint.go new file mode 100644 index 000000000..126b616b6 --- /dev/null +++ b/internal/ingress/controller/store/endpoint.go @@ -0,0 +1,42 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +// EndpointLister makes a Store that lists Endpoints. +type EndpointLister struct { + cache.Store +} + +// GetServiceEndpoints returns the endpoints of a service, matched on service name. +func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) { + key := fmt.Sprintf("%v/%v", svc.Namespace, svc.Name) + eps, exists, err := s.GetByKey(key) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("could not find endpoints for service %v", key) + } + return eps.(*apiv1.Endpoints), nil +} diff --git a/internal/ingress/controller/store/ingress.go b/internal/ingress/controller/store/ingress.go new file mode 100644 index 000000000..67a7b6c53 --- /dev/null +++ b/internal/ingress/controller/store/ingress.go @@ -0,0 +1,41 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/client-go/tools/cache" +) + +// IngressLister makes a Store that lists Ingress. +type IngressLister struct { + cache.Store +} + +// ByKey searches for an ingress in the local ingress Store +func (il IngressLister) ByKey(key string) (*extensions.Ingress, error) { + i, exists, err := il.GetByKey(key) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("ingress %v was not found", key) + } + return i.(*extensions.Ingress), nil +} diff --git a/internal/ingress/controller/store/ingress_annotation.go b/internal/ingress/controller/store/ingress_annotation.go new file mode 100644 index 000000000..676875aca --- /dev/null +++ b/internal/ingress/controller/store/ingress_annotation.go @@ -0,0 +1,26 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "k8s.io/client-go/tools/cache" +) + +// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules. +type IngressAnnotationsLister struct { + cache.Store +} diff --git a/internal/ingress/controller/store/local_secret.go b/internal/ingress/controller/store/local_secret.go new file mode 100644 index 000000000..ee3ced398 --- /dev/null +++ b/internal/ingress/controller/store/local_secret.go @@ -0,0 +1,46 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + "k8s.io/client-go/tools/cache" + + "k8s.io/ingress-nginx/internal/ingress" +) + +// SSLCertTracker holds a store of referenced Secrets in Ingress rules +type SSLCertTracker struct { + cache.ThreadSafeStore +} + +// NewSSLCertTracker creates a new SSLCertTracker store +func NewSSLCertTracker() *SSLCertTracker { + return &SSLCertTracker{ + cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), + } +} + +// ByKey searches for an ingress in the local ingress Store +func (s SSLCertTracker) ByKey(key string) (*ingress.SSLCert, error) { + cert, exists := s.Get(key) + if !exists { + return nil, fmt.Errorf("local SSL certificate %v was not found", key) + } + return cert.(*ingress.SSLCert), nil +} diff --git a/internal/ingress/controller/store/local_secret_test.go b/internal/ingress/controller/store/local_secret_test.go new file mode 100644 index 000000000..0b532c41d --- /dev/null +++ b/internal/ingress/controller/store/local_secret_test.go @@ -0,0 +1,39 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import "testing" + +func TestSSLCertTracker(t *testing.T) { + tracker := NewSSLCertTracker() + + items := len(tracker.List()) + if items != 0 { + t.Errorf("expected 0 items in the store but %v returned", items) + } + + tracker.Add("key", "value") + items = len(tracker.List()) + if items != 1 { + t.Errorf("expected 1 item in the store but %v returned", items) + } + + item, exists := tracker.Get("key") + if !exists || item == nil { + t.Errorf("expected an item from the store but none returned") + } +} diff --git a/internal/ingress/controller/store/secret.go b/internal/ingress/controller/store/secret.go new file mode 100644 index 000000000..54774e8e9 --- /dev/null +++ b/internal/ingress/controller/store/secret.go @@ -0,0 +1,41 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +// SecretLister makes a Store that lists Secrets. +type SecretLister struct { + cache.Store +} + +// ByKey searches for a secret in the local secrets Store +func (sl *SecretLister) ByKey(key string) (*apiv1.Secret, error) { + s, exists, err := sl.GetByKey(key) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("secret %v was not found", key) + } + return s.(*apiv1.Secret), nil +} diff --git a/internal/ingress/controller/store/service.go b/internal/ingress/controller/store/service.go new file mode 100644 index 000000000..44d235558 --- /dev/null +++ b/internal/ingress/controller/store/service.go @@ -0,0 +1,41 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +// ServiceLister makes a Store that lists Services. +type ServiceLister struct { + cache.Store +} + +// ByKey searches for a service in the local secrets Store +func (sl *ServiceLister) ByKey(key string) (*apiv1.Service, error) { + s, exists, err := sl.GetByKey(key) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("service %v was not found", key) + } + return s.(*apiv1.Service), nil +} diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go new file mode 100644 index 000000000..4895f3a47 --- /dev/null +++ b/internal/ingress/controller/store/store.go @@ -0,0 +1,554 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "encoding/base64" + "fmt" + "io/ioutil" + "reflect" + "sync" + "time" + + "github.com/golang/glog" + + apiv1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + cache_client "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + + "k8s.io/ingress-nginx/internal/file" + "k8s.io/ingress-nginx/internal/ingress" + "k8s.io/ingress-nginx/internal/ingress/annotations" + "k8s.io/ingress-nginx/internal/ingress/annotations/class" + "k8s.io/ingress-nginx/internal/ingress/annotations/parser" + ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" + ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template" + "k8s.io/ingress-nginx/internal/ingress/defaults" + "k8s.io/ingress-nginx/internal/ingress/resolver" + "k8s.io/ingress-nginx/internal/k8s" +) + +// Storer is the interface that wraps the required methods to gather information +// about ingresses, services, secrets and ingress annotations. +type Storer interface { + // GetBackendConfiguration returns the nginx configuration stored in a configmap + GetBackendConfiguration() ngx_config.Configuration + + // GetConfigMap returns a ConfigmMap using the namespace and name as key + GetConfigMap(key string) (*apiv1.ConfigMap, error) + + // GetSecret returns a Secret using the namespace and name as key + GetSecret(key string) (*apiv1.Secret, error) + + // GetService returns a Service using the namespace and name as key + GetService(key string) (*apiv1.Service, error) + + GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) + + // GetSecret returns an Ingress using the namespace and name as key + GetIngress(key string) (*extensions.Ingress, error) + + // ListIngresses returns the list of Ingresses + ListIngresses() []*extensions.Ingress + + // GetIngressAnnotations returns the annotations associated to an Ingress + GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) + + // GetLocalSecret returns the local copy of a Secret + GetLocalSecret(name string) (*ingress.SSLCert, error) + + // ListLocalSecrets returns the list of local Secrets + ListLocalSecrets() []*ingress.SSLCert + + // GetAuthCertificate resolves a given secret name into an SSL certificate. + // The secret must contain 3 keys named: + // ca.crt: contains the certificate chain used for authentication + GetAuthCertificate(string) (*resolver.AuthSSLCert, error) + + // GetDefaultBackend returns the default backend configuration + GetDefaultBackend() defaults.Backend + + // Run initiates the synchronization of the controllers + Run(stopCh chan struct{}) + + // ReadSecrets extracts information about secrets from an Ingress rule + ReadSecrets(*extensions.Ingress) +} + +// EventType type of event associated with an informer +type EventType string + +const ( + // CreateEvent event associated with new objects in an informer + CreateEvent EventType = "CREATE" + // UpdateEvent event associated with an object update in an informer + UpdateEvent EventType = "UPDATE" + // DeleteEvent event associated when an object is removed from an informer + DeleteEvent EventType = "DELETE" + // ConfigurationEvent event associated when a configuration object is created or updated + ConfigurationEvent EventType = "CONFIGURATION" +) + +// Event holds the context of an event +type Event struct { + Type EventType + Obj interface{} +} + +// Lister returns the stores for ingresses, services, endpoints, secrets and configmaps. +type Lister struct { + Ingress IngressLister + Service ServiceLister + Endpoint EndpointLister + Secret SecretLister + ConfigMap ConfigMapLister + IngressAnnotation IngressAnnotationsLister +} + +// Controller defines the required controllers that interact agains the api server +type Controller struct { + Ingress cache.Controller + Endpoint cache.Controller + Service cache.Controller + Secret cache.Controller + Configmap cache.Controller +} + +// Run initiates the synchronization of the controllers against the api server +func (c *Controller) Run(stopCh chan struct{}) { + go c.Endpoint.Run(stopCh) + go c.Service.Run(stopCh) + go c.Secret.Run(stopCh) + go c.Configmap.Run(stopCh) + + // Wait for all involved caches to be synced, before processing items from the queue is started + if !cache.WaitForCacheSync(stopCh, + c.Endpoint.HasSynced, + c.Service.HasSynced, + c.Secret.HasSynced, + c.Configmap.HasSynced, + ) { + runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) + } + + // We need to wait before start syncing the ingress rules + // because the rules requires content from other listers + time.Sleep(1 * time.Second) + go c.Ingress.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, + c.Ingress.HasSynced, + ) { + runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) + } +} + +// k8sStore internal Storer implementation using informers and thread safe stores +type k8sStore struct { + isOCSPCheckEnabled bool + + // backendConfig contains the running configuration from the configmap + // this is required because this rarely changes but is a very expensive + // operation to execute in each OnUpdate invocation + backendConfig ngx_config.Configuration + + // cache contains the cache Controllers + cache *Controller + + // listers contains the cache.Store used in the ingress controller + listers *Lister + + // sslStore local store of SSL certificates (certificates used in ingress) + // this is required because the certificates must be present in the + // container filesystem + sslStore *SSLCertTracker + + annotations annotations.Extractor + + filesystem file.Filesystem + + // updateCh + updateCh chan Event + + // mu mutex used to avoid simultaneous incovations to syncSecret + mu *sync.Mutex +} + +// New creates a new object store to be used in the ingress controller +func New(checkOCSP bool, + namespace, configmap, tcp, udp string, + resyncPeriod time.Duration, + client clientset.Interface, + fs file.Filesystem, + updateCh chan Event) Storer { + + store := &k8sStore{ + isOCSPCheckEnabled: checkOCSP, + cache: &Controller{}, + listers: &Lister{}, + sslStore: NewSSLCertTracker(), + filesystem: fs, + updateCh: updateCh, + backendConfig: ngx_config.NewDefault(), + mu: &sync.Mutex{}, + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ + Interface: client.CoreV1().Events(namespace), + }) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ + Component: "nginx-ingress-controller", + }) + + // k8sStore fulfils resolver.Resolver interface + store.annotations = annotations.NewAnnotationExtractor(store) + + ingEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + addIng := obj.(*extensions.Ingress) + if !class.IsValid(addIng) { + a, _ := parser.GetStringAnnotation(class.IngressKey, addIng) + glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a) + return + } + + store.extractAnnotations(addIng) + recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) + updateCh <- Event{ + Type: CreateEvent, + Obj: obj, + } + }, + DeleteFunc: func(obj interface{}) { + delIng, ok := obj.(*extensions.Ingress) + if !ok { + // If we reached here it means the ingress was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("couldn't get object from tombstone %#v", obj) + return + } + delIng, ok = tombstone.Obj.(*extensions.Ingress) + if !ok { + glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj) + return + } + } + if !class.IsValid(delIng) { + glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey) + return + } + recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) + store.listers.IngressAnnotation.Delete(delIng) + updateCh <- Event{ + Type: DeleteEvent, + Obj: obj, + } + }, + UpdateFunc: func(old, cur interface{}) { + oldIng := old.(*extensions.Ingress) + curIng := cur.(*extensions.Ingress) + validOld := class.IsValid(oldIng) + validCur := class.IsValid(curIng) + if !validOld && validCur { + glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey) + recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + } else if validOld && !validCur { + glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey) + recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + } else if validCur && !reflect.DeepEqual(old, cur) { + recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + } + + store.extractAnnotations(curIng) + updateCh <- Event{ + Type: UpdateEvent, + Obj: cur, + } + }, + } + + secrEventHandler := cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + sec := cur.(*apiv1.Secret) + _, exists := store.sslStore.Get(k8s.MetaNamespaceKey(sec)) + if exists { + updateCh <- Event{ + Type: UpdateEvent, + Obj: cur, + } + } + } + }, + DeleteFunc: func(obj interface{}) { + sec, ok := obj.(*apiv1.Secret) + if !ok { + // If we reached here it means the secret was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("couldn't get object from tombstone %#v", obj) + return + } + sec, ok = tombstone.Obj.(*apiv1.Secret) + if !ok { + glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj) + return + } + } + store.sslStore.Delete(k8s.MetaNamespaceKey(sec)) + updateCh <- Event{ + Type: DeleteEvent, + Obj: obj, + } + }, + } + + eventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + updateCh <- Event{ + Type: CreateEvent, + Obj: obj, + } + }, + DeleteFunc: func(obj interface{}) { + updateCh <- Event{ + Type: DeleteEvent, + Obj: obj, + } + }, + UpdateFunc: func(old, cur interface{}) { + oep := old.(*apiv1.Endpoints) + ocur := cur.(*apiv1.Endpoints) + if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) { + updateCh <- Event{ + Type: UpdateEvent, + Obj: cur, + } + } + }, + } + + mapEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + m := obj.(*apiv1.ConfigMap) + mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name) + if mapKey == configmap { + glog.V(2).Infof("adding configmap %v to backend", mapKey) + store.setConfig(m) + updateCh <- Event{ + Type: CreateEvent, + Obj: obj, + } + } + }, + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + m := cur.(*apiv1.ConfigMap) + mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name) + if mapKey == configmap { + glog.V(2).Infof("updating configmap backend (%v)", mapKey) + store.setConfig(m) + updateCh <- Event{ + Type: ConfigurationEvent, + Obj: cur, + } + } + // updates to configuration configmaps can trigger an update + if mapKey == configmap || mapKey == tcp || mapKey == udp { + recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) + updateCh <- Event{ + Type: ConfigurationEvent, + Obj: cur, + } + } + } + }, + } + + store.listers.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) + + store.listers.Ingress.Store, store.cache.Ingress = cache.NewInformer( + cache.NewListWatchFromClient(client.ExtensionsV1beta1().RESTClient(), "ingresses", namespace, fields.Everything()), + &extensions.Ingress{}, resyncPeriod, ingEventHandler) + + store.listers.Endpoint.Store, store.cache.Endpoint = cache.NewInformer( + cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", namespace, fields.Everything()), + &apiv1.Endpoints{}, resyncPeriod, eventHandler) + + store.listers.Secret.Store, store.cache.Secret = cache.NewInformer( + cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "secrets", namespace, fields.Everything()), + &apiv1.Secret{}, resyncPeriod, secrEventHandler) + + store.listers.ConfigMap.Store, store.cache.Configmap = cache.NewInformer( + cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()), + &apiv1.ConfigMap{}, resyncPeriod, mapEventHandler) + + store.listers.Service.Store, store.cache.Service = cache.NewInformer( + cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "services", namespace, fields.Everything()), + &apiv1.Service{}, resyncPeriod, cache.ResourceEventHandlerFuncs{}) + + return store +} + +func (s k8sStore) extractAnnotations(ing *extensions.Ingress) { + anns := s.annotations.Extract(ing) + glog.V(3).Infof("updating annotations information for ingres %v/%v", anns.Namespace, anns.Name) + err := s.listers.IngressAnnotation.Update(anns) + if err != nil { + glog.Error(err) + } +} + +// GetSecret returns a Secret using the namespace and name as key +func (s k8sStore) GetSecret(key string) (*apiv1.Secret, error) { + return s.listers.Secret.ByKey(key) +} + +// ListLocalSecrets returns the list of local Secrets +func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert { + var certs []*ingress.SSLCert + for _, item := range s.sslStore.List() { + if s, ok := item.(*ingress.SSLCert); ok { + certs = append(certs, s) + } + } + + return certs +} + +// GetService returns a Service using the namespace and name as key +func (s k8sStore) GetService(key string) (*apiv1.Service, error) { + return s.listers.Service.ByKey(key) +} + +// GetSecret returns an Ingress using the namespace and name as key +func (s k8sStore) GetIngress(key string) (*extensions.Ingress, error) { + return s.listers.Ingress.ByKey(key) +} + +// ListIngresses returns the list of Ingresses +func (s k8sStore) ListIngresses() []*extensions.Ingress { + // filter ingress rules + var ingresses []*extensions.Ingress + for _, item := range s.listers.Ingress.List() { + ing := item.(*extensions.Ingress) + if !class.IsValid(ing) { + continue + } + + ingresses = append(ingresses, ing) + } + + return ingresses +} + +// GetIngressAnnotations returns the annotations associated to an Ingress +func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) { + key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name) + item, exists, err := s.listers.IngressAnnotation.GetByKey(key) + if err != nil { + return &annotations.Ingress{}, fmt.Errorf("unexpected error getting ingress annotation %v: %v", key, err) + } + if !exists { + return &annotations.Ingress{}, fmt.Errorf("ingress annotations %v was not found", key) + } + return item.(*annotations.Ingress), nil +} + +// GetLocalSecret returns the local copy of a Secret +func (s k8sStore) GetLocalSecret(key string) (*ingress.SSLCert, error) { + return s.sslStore.ByKey(key) +} + +func (s k8sStore) GetConfigMap(key string) (*apiv1.ConfigMap, error) { + return s.listers.ConfigMap.ByKey(key) +} + +func (s k8sStore) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) { + return s.listers.Endpoint.GetServiceEndpoints(svc) +} + +// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret +func (s k8sStore) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) { + if _, err := s.GetLocalSecret(name); err != nil { + s.syncSecret(name) + } + + cert, err := s.GetLocalSecret(name) + if err != nil { + return nil, err + } + + return &resolver.AuthSSLCert{ + Secret: name, + CAFileName: cert.CAFileName, + PemSHA: cert.PemSHA, + }, nil +} + +// GetDefaultBackend returns the default backend +func (s k8sStore) GetDefaultBackend() defaults.Backend { + return s.backendConfig.Backend +} + +func (s k8sStore) GetBackendConfiguration() ngx_config.Configuration { + return s.backendConfig +} + +func (s *k8sStore) setConfig(cmap *apiv1.ConfigMap) { + s.backendConfig = ngx_template.ReadConfig(cmap.Data) + + // TODO: this should not be done here + if s.backendConfig.SSLSessionTicketKey != "" { + d, err := base64.StdEncoding.DecodeString(s.backendConfig.SSLSessionTicketKey) + if err != nil { + glog.Warningf("unexpected error decoding key ssl-session-ticket-key: %v", err) + s.backendConfig.SSLSessionTicketKey = "" + } + ioutil.WriteFile("/etc/nginx/tickets.key", d, 0644) + } +} + +// Run initiates the synchronization of the controllers +// and the initial synchronization of the secrets. +func (s k8sStore) Run(stopCh chan struct{}) { + // start controllers + s.cache.Run(stopCh) + + // initial sync of secrets to avoid unnecessary reloads + glog.Info("running initial sync of secrets") + for _, ing := range s.ListIngresses() { + s.ReadSecrets(ing) + } + + // start goroutine to check for missing local secrets + go wait.Until(s.checkMissingSecrets, 10*time.Second, stopCh) + + if s.isOCSPCheckEnabled { + go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh) + } +} diff --git a/internal/ingress/controller/store/store_test.go b/internal/ingress/controller/store/store_test.go new file mode 100644 index 000000000..99ab1f4f6 --- /dev/null +++ b/internal/ingress/controller/store/store_test.go @@ -0,0 +1,526 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "fmt" + "os" + "sync/atomic" + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/api/extensions/v1beta1" + extensions "k8s.io/api/extensions/v1beta1" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + + "k8s.io/ingress-nginx/internal/file" + "k8s.io/ingress-nginx/test/e2e/framework" +) + +func TestStore(t *testing.T) { + // TODO: find a way to avoid the need to use a real api server + home := os.Getenv("HOME") + kubeConfigFile := fmt.Sprintf("%v/.kube/config", home) + kubeContext := "" + + kubeConfig, err := framework.LoadConfig(kubeConfigFile, kubeContext) + if err != nil { + t.Errorf("unexpected error loading kubeconfig file: %v", err) + } + + clientSet, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + t.Errorf("unexpected error creating ingress client: %v", err) + } + + t.Run("should return an error searching for non existing objects", func(t *testing.T) { + ns := createNamespace(clientSet, t) + defer deleteNamespace(ns, clientSet, t) + + stopCh := make(chan struct{}) + updateCh := make(chan Event, 1024) + + go func(ch chan Event) { + for { + <-ch + } + }(updateCh) + + fs := newFS(t) + storer := New(true, + ns.Name, + fmt.Sprintf("%v/config", ns.Name), + fmt.Sprintf("%v/tcp", ns.Name), + fmt.Sprintf("%v/udp", ns.Name), + 10*time.Minute, + clientSet, + fs, + updateCh) + + storer.Run(stopCh) + + key := fmt.Sprintf("%v/anything", ns.Name) + ing, err := storer.GetIngress(key) + if err == nil { + t.Errorf("expected an error but none returned") + } + if ing != nil { + t.Errorf("expected an Ingres but none returned") + } + + ls, err := storer.GetLocalSecret(key) + if err == nil { + t.Errorf("expected an error but none returned") + } + if ls != nil { + t.Errorf("expected an Ingres but none returned") + } + + s, err := storer.GetSecret(key) + if err == nil { + t.Errorf("expected an error but none returned") + } + if s != nil { + t.Errorf("expected an Ingres but none returned") + } + + svc, err := storer.GetService(key) + if err == nil { + t.Errorf("expected an error but none returned") + } + if svc != nil { + t.Errorf("expected an Ingres but none returned") + } + + close(updateCh) + close(stopCh) + }) + + t.Run("should return ingress one event for add, update and delete", func(t *testing.T) { + ns := createNamespace(clientSet, t) + defer deleteNamespace(ns, clientSet, t) + + stopCh := make(chan struct{}) + updateCh := make(chan Event, 1024) + + var add uint64 + var upd uint64 + var del uint64 + + go func(ch chan Event) { + for { + e, ok := <-ch + if !ok { + return + } + + if e.Obj == nil { + continue + } + if _, ok := e.Obj.(*extensions.Ingress); !ok { + t.Errorf("expected an Ingress type but %T returned", e.Obj) + } + switch e.Type { + case CreateEvent: + atomic.AddUint64(&add, 1) + case UpdateEvent: + atomic.AddUint64(&upd, 1) + case DeleteEvent: + atomic.AddUint64(&del, 1) + } + } + }(updateCh) + + fs := newFS(t) + storer := New(true, + ns.Name, + fmt.Sprintf("%v/config", ns.Name), + fmt.Sprintf("%v/tcp", ns.Name), + fmt.Sprintf("%v/udp", ns.Name), + 10*time.Minute, + clientSet, + fs, + updateCh) + + storer.Run(stopCh) + + ing, err := ensureIngress(&v1beta1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dummy", + Namespace: ns.Name, + }, + Spec: v1beta1.IngressSpec{ + Rules: []v1beta1.IngressRule{ + { + Host: "dummy", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Path: "/", + Backend: v1beta1.IngressBackend{ + ServiceName: "http-svc", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + }, + }, clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + // create an invalid ingress (different class) + _, err = ensureIngress(&v1beta1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "custom-class", + Namespace: ns.Name, + Annotations: map[string]string{ + "kubernetes.io/ingress.class": "something", + }, + }, + Spec: v1beta1.IngressSpec{ + Rules: []v1beta1.IngressRule{ + { + Host: "dummy", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Path: "/", + Backend: v1beta1.IngressBackend{ + ServiceName: "http-svc", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + }, + }, clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + ni := ing.DeepCopy() + ni.Spec.Rules[0].Host = "update-dummy" + _, err = ensureIngress(ni, clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + err = clientSet.ExtensionsV1beta1(). + Ingresses(ni.Namespace). + Delete(ni.Name, &metav1.DeleteOptions{}) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + framework.WaitForNoIngressInNamespace(clientSet, ni.Namespace, ni.Name) + + if atomic.LoadUint64(&add) != 1 { + t.Errorf("expected 1 event of type Create but %v ocurred", add) + } + if atomic.LoadUint64(&upd) != 1 { + t.Errorf("expected 1 event of type Update but %v ocurred", upd) + } + if atomic.LoadUint64(&del) != 1 { + t.Errorf("expected 1 event of type Delete but %v ocurred", del) + } + + close(updateCh) + close(stopCh) + }) + + t.Run("should not receive events from new secret no referenced from ingress", func(t *testing.T) { + ns := createNamespace(clientSet, t) + defer deleteNamespace(ns, clientSet, t) + + stopCh := make(chan struct{}) + updateCh := make(chan Event, 1024) + + var add uint64 + var upd uint64 + var del uint64 + + go func(ch chan Event) { + for { + e, ok := <-ch + if !ok { + return + } + + if e.Obj == nil { + continue + } + switch e.Type { + case CreateEvent: + atomic.AddUint64(&add, 1) + case UpdateEvent: + atomic.AddUint64(&upd, 1) + case DeleteEvent: + atomic.AddUint64(&del, 1) + } + } + }(updateCh) + + fs := newFS(t) + storer := New(true, + ns.Name, + fmt.Sprintf("%v/config", ns.Name), + fmt.Sprintf("%v/tcp", ns.Name), + fmt.Sprintf("%v/udp", ns.Name), + 10*time.Minute, + clientSet, + fs, + updateCh) + + storer.Run(stopCh) + + secretName := "no-referenced" + _, _, _, err = framework.CreateIngressTLSSecret(clientSet, []string{"foo"}, secretName, ns.Name) + if err != nil { + t.Errorf("unexpected error creating secret: %v", err) + } + + time.Sleep(1 * time.Second) + + if atomic.LoadUint64(&add) != 0 { + t.Errorf("expected 0 events of type Create but %v ocurred", add) + } + if atomic.LoadUint64(&upd) != 0 { + t.Errorf("expected 0 events of type Update but %v ocurred", upd) + } + if atomic.LoadUint64(&del) != 0 { + t.Errorf("expected 0 events of type Delete but %v ocurred", del) + } + + err = clientSet.CoreV1().Secrets(ns.Name).Delete(secretName, &metav1.DeleteOptions{}) + if err != nil { + t.Errorf("unexpected error deleting secret: %v", err) + } + + time.Sleep(1 * time.Second) + + if atomic.LoadUint64(&add) != 0 { + t.Errorf("expected 0 events of type Create but %v ocurred", add) + } + if atomic.LoadUint64(&upd) != 0 { + t.Errorf("expected 0 events of type Update but %v ocurred", upd) + } + if atomic.LoadUint64(&del) != 1 { + t.Errorf("expected 1 events of type Delete but %v ocurred", del) + } + + close(updateCh) + close(stopCh) + }) + + t.Run("should create an ingress with a secret it doesn't exists", func(t *testing.T) { + ns := createNamespace(clientSet, t) + defer deleteNamespace(ns, clientSet, t) + + stopCh := make(chan struct{}) + updateCh := make(chan Event, 1024) + + var add uint64 + var upd uint64 + var del uint64 + + go func(ch <-chan Event) { + for { + e, ok := <-ch + if !ok { + return + } + + if e.Obj == nil { + continue + } + switch e.Type { + case CreateEvent: + atomic.AddUint64(&add, 1) + case UpdateEvent: + atomic.AddUint64(&upd, 1) + case DeleteEvent: + atomic.AddUint64(&del, 1) + } + } + }(updateCh) + + fs := newFS(t) + storer := New(true, + ns.Name, + fmt.Sprintf("%v/config", ns.Name), + fmt.Sprintf("%v/tcp", ns.Name), + fmt.Sprintf("%v/udp", ns.Name), + 10*time.Minute, + clientSet, + fs, + updateCh) + + storer.Run(stopCh) + + name := "ingress-with-secret" + secretHosts := []string{name} + + _, err := ensureIngress(&v1beta1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns.Name, + }, + Spec: v1beta1.IngressSpec{ + TLS: []v1beta1.IngressTLS{ + { + Hosts: secretHosts, + SecretName: name, + }, + }, + Rules: []v1beta1.IngressRule{ + { + Host: name, + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Path: "/", + Backend: v1beta1.IngressBackend{ + ServiceName: "http-svc", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + }, + }, clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + err = framework.WaitForIngressInNamespace(clientSet, ns.Name, name) + if err != nil { + t.Errorf("unexpected error waiting for secret: %v", err) + } + + if atomic.LoadUint64(&add) != 1 { + t.Errorf("expected 1 events of type Create but %v ocurred", add) + } + if atomic.LoadUint64(&upd) != 0 { + t.Errorf("expected 0 events of type Update but %v ocurred", upd) + } + if atomic.LoadUint64(&del) != 0 { + t.Errorf("expected 0 events of type Delete but %v ocurred", del) + } + + _, _, _, err = framework.CreateIngressTLSSecret(clientSet, secretHosts, name, ns.Name) + if err != nil { + t.Errorf("unexpected error creating secret: %v", err) + } + + t.Run("should exists a secret in the local store and filesystem", func(t *testing.T) { + err := framework.WaitForSecretInNamespace(clientSet, ns.Name, name) + if err != nil { + t.Errorf("unexpected error waiting for secret: %v", err) + } + + time.Sleep(30 * time.Second) + + pemFile := fmt.Sprintf("%v/%v-%v.pem", file.DefaultSSLDirectory, ns.Name, name) + err = framework.WaitForFileInFS(pemFile, fs) + if err != nil { + t.Errorf("unexpected error waiting for file to exists in the filesystem: %v", err) + } + + secretName := fmt.Sprintf("%v/%v", ns.Name, name) + sslCert, err := storer.GetLocalSecret(secretName) + if err != nil { + t.Errorf("unexpected error reading local secret %v: %v", secretName, err) + } + + if sslCert == nil { + t.Errorf("expected a secret but none returned") + } + + pemSHA := file.SHA1(pemFile) + if sslCert.PemSHA != pemSHA { + t.Errorf("SHA of secret on disk differs from local secret store (%v != %v)", pemSHA, sslCert.PemSHA) + } + }) + + close(updateCh) + close(stopCh) + }) + + // test add ingress with secret it doesn't exists and then add secret + // check secret is generated on fs + // check ocsp + // check invalid secret (missing crt) + // check invalid secret (missing key) + // check invalid secret (missing ca) +} + +func createNamespace(clientSet *kubernetes.Clientset, t *testing.T) *apiv1.Namespace { + t.Log("creating temporal namespace") + ns, err := framework.CreateKubeNamespace("store-test", clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress client: %v", err) + } + t.Logf("temporal namespace %v created", ns.Name) + + return ns +} + +func deleteNamespace(ns *apiv1.Namespace, clientSet *kubernetes.Clientset, t *testing.T) { + t.Logf("deleting temporal namespace %v created", ns.Name) + err := framework.DeleteKubeNamespace(clientSet, ns.Name) + if err != nil { + t.Errorf("unexpected error creating ingress client: %v", err) + } + t.Logf("temporal namespace %v deleted", ns.Name) +} + +func ensureIngress(ingress *extensions.Ingress, clientSet *kubernetes.Clientset) (*extensions.Ingress, error) { + s, err := clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress) + if err != nil { + if k8sErrors.IsNotFound(err) { + return clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(ingress) + } + return nil, err + } + return s, nil +} + +func newFS(t *testing.T) file.Filesystem { + fs, err := file.NewFakeFS() + if err != nil { + t.Fatalf("unexpected error creating filesystem: %v", err) + } + return fs +} diff --git a/internal/ingress/controller/template/configmap.go b/internal/ingress/controller/template/configmap.go index 68a88a3b1..b9d7d0249 100644 --- a/internal/ingress/controller/template/configmap.go +++ b/internal/ingress/controller/template/configmap.go @@ -26,6 +26,7 @@ import ( "github.com/mitchellh/mapstructure" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress-nginx/internal/ingress/controller/config" ing_net "k8s.io/ingress-nginx/internal/net" ) @@ -42,7 +43,7 @@ const ( ) var ( - validRedirectCodes = []int{301, 302, 307, 308} + validRedirectCodes = sets.NewInt([]int{301, 302, 307, 308}...) ) // ReadConfig obtains the configuration defined by the user merged with the defaults. @@ -114,7 +115,7 @@ func ReadConfig(src map[string]string) config.Configuration { if err != nil { glog.Warningf("%v is not a valid HTTP code: %v", val, err) } else { - if intInSlice(j, validRedirectCodes) { + if validRedirectCodes.Has(j) { redirectCode = j } else { glog.Warningf("The code %v is not a valid as HTTP redirect code. Using the default.", val) @@ -175,12 +176,3 @@ func filterErrors(codes []int) []int { return fa } - -func intInSlice(i int, list []int) bool { - for _, v := range list { - if v == i { - return true - } - } - return false -} diff --git a/internal/ingress/controller/util.go b/internal/ingress/controller/util.go index a71bb7400..2c13764af 100644 --- a/internal/ingress/controller/util.go +++ b/internal/ingress/controller/util.go @@ -66,12 +66,3 @@ func sysctlFSFileMax() int { } return int(rLimit.Max) } - -func intInSlice(i int, list []int) bool { - for _, v := range list { - if v == i { - return true - } - } - return false -} diff --git a/internal/ingress/controller/util_test.go b/internal/ingress/controller/util_test.go index dc02bf0dc..b0691bab6 100644 --- a/internal/ingress/controller/util_test.go +++ b/internal/ingress/controller/util_test.go @@ -26,26 +26,6 @@ func (fe *fakeError) Error() string { return "fakeError" } -func TestIntInSlice(t *testing.T) { - fooTests := []struct { - i int - list []int - er bool - }{ - {1, []int{1, 2}, true}, - {3, []int{1, 2}, false}, - {1, nil, false}, - {0, nil, false}, - } - - for _, fooTest := range fooTests { - r := intInSlice(fooTest.i, fooTest.list) - if r != fooTest.er { - t.Errorf("returned %t but expected %t for s=%v & list=%v", r, fooTest.er, fooTest.i, fooTest.list) - } - } -} - func TestSysctlFSFileMax(t *testing.T) { i := sysctlFSFileMax() if i < 1 { diff --git a/internal/ingress/status/status.go b/internal/ingress/status/status.go index db7f34ea7..4fd4d9fe0 100644 --- a/internal/ingress/status/status.go +++ b/internal/ingress/status/status.go @@ -40,8 +40,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" - "k8s.io/ingress-nginx/internal/ingress/annotations/class" - "k8s.io/ingress-nginx/internal/ingress/store" "k8s.io/ingress-nginx/internal/k8s" "k8s.io/ingress-nginx/internal/task" ) @@ -56,6 +54,11 @@ type Sync interface { Shutdown() } +type ingressLister interface { + // ListIngresses returns the list of Ingresses + ListIngresses() []*extensions.Ingress +} + // Config ... type Config struct { Client clientset.Interface @@ -68,7 +71,7 @@ type Config struct { UseNodeInternalIP bool - IngressLister store.IngressLister + IngressLister ingressLister DefaultIngressClass string IngressClass string @@ -297,20 +300,14 @@ func sliceToStatus(endpoints []string) []apiv1.LoadBalancerIngress { // updateStatus changes the status information of Ingress rules func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) { - ings := s.IngressLister.List() + ings := s.IngressLister.ListIngresses() p := pool.NewLimited(10) defer p.Close() batch := p.Batch() - for _, cur := range ings { - ing := cur.(*extensions.Ingress) - - if !class.IsValid(ing) { - continue - } - + for _, ing := range ings { batch.Queue(runUpdate(ing, newIngressPoint, s.Client)) } diff --git a/internal/ingress/status/status_test.go b/internal/ingress/status/status_test.go index b03fe9bbb..925edf685 100644 --- a/internal/ingress/status/status_test.go +++ b/internal/ingress/status/status_test.go @@ -25,10 +25,8 @@ import ( extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testclient "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" "k8s.io/ingress-nginx/internal/ingress/annotations/class" - "k8s.io/ingress-nginx/internal/ingress/store" "k8s.io/ingress-nginx/internal/k8s" "k8s.io/ingress-nginx/internal/task" ) @@ -212,14 +210,18 @@ func buildExtensionsIngresses() []extensions.Ingress { } } -func buildIngressListener() store.IngressLister { - s := cache.NewStore(cache.MetaNamespaceKeyFunc) - s.Add(&extensions.Ingress{ +type testIngressLister struct { +} + +func (til *testIngressLister) ListIngresses() []*extensions.Ingress { + var ingresses []*extensions.Ingress + ingresses = append(ingresses, &extensions.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: "foo_ingress_non_01", Namespace: apiv1.NamespaceDefault, }}) - s.Add(&extensions.Ingress{ + + ingresses = append(ingresses, &extensions.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: "foo_ingress_1", Namespace: apiv1.NamespaceDefault, @@ -231,7 +233,11 @@ func buildIngressListener() store.IngressLister { }, }) - return store.IngressLister{Store: s} + return ingresses +} + +func buildIngressLister() ingressLister { + return &testIngressLister{} } func buildStatusSync() statusSync { @@ -247,7 +253,7 @@ func buildStatusSync() statusSync { Config: Config{ Client: buildSimpleClientSet(), PublishService: apiv1.NamespaceDefault + "/" + "foo", - IngressLister: buildIngressListener(), + IngressLister: buildIngressLister(), }, } } @@ -259,7 +265,7 @@ func TestStatusActions(t *testing.T) { c := Config{ Client: buildSimpleClientSet(), PublishService: "", - IngressLister: buildIngressListener(), + IngressLister: buildIngressLister(), DefaultIngressClass: "nginx", IngressClass: "", UpdateStatusOnShutdown: true, diff --git a/internal/ingress/store/main.go b/internal/ingress/store/main.go deleted file mode 100644 index 299f54c0b..000000000 --- a/internal/ingress/store/main.go +++ /dev/null @@ -1,113 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package store - -import ( - "fmt" - - apiv1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/cache" -) - -// IngressLister makes a Store that lists Ingress. -type IngressLister struct { - cache.Store -} - -// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules. -type IngressAnnotationsLister struct { - cache.Store -} - -// SecretLister makes a Store that lists Secrets. -type SecretLister struct { - cache.Store -} - -// GetByName searches for a secret in the local secrets Store -func (sl *SecretLister) GetByName(name string) (*apiv1.Secret, error) { - s, exists, err := sl.GetByKey(name) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("secret %v was not found", name) - } - return s.(*apiv1.Secret), nil -} - -// ConfigMapLister makes a Store that lists Configmaps. -type ConfigMapLister struct { - cache.Store -} - -// GetByName searches for a configmap in the local configmaps Store -func (cml *ConfigMapLister) GetByName(name string) (*apiv1.ConfigMap, error) { - s, exists, err := cml.GetByKey(name) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("configmap %v was not found", name) - } - return s.(*apiv1.ConfigMap), nil -} - -// ServiceLister makes a Store that lists Services. -type ServiceLister struct { - cache.Store -} - -// GetByName searches for a service in the local secrets Store -func (sl *ServiceLister) GetByName(name string) (*apiv1.Service, error) { - s, exists, err := sl.GetByKey(name) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("service %v was not found", name) - } - return s.(*apiv1.Service), nil -} - -// EndpointLister makes a Store that lists Endpoints. -type EndpointLister struct { - cache.Store -} - -// GetServiceEndpoints returns the endpoints of a service, matched on service name. -func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) { - for _, m := range s.Store.List() { - ep := m.(*apiv1.Endpoints) - if svc.Name == ep.Name && svc.Namespace == ep.Namespace { - return ep, nil - } - } - return nil, fmt.Errorf("could not find endpoints for service: %v", svc.Name) -} - -// SSLCertTracker holds a store of referenced Secrets in Ingress rules -type SSLCertTracker struct { - cache.ThreadSafeStore -} - -// NewSSLCertTracker creates a new SSLCertTracker store -func NewSSLCertTracker() *SSLCertTracker { - return &SSLCertTracker{ - cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), - } -} diff --git a/internal/ingress/types.go b/internal/ingress/types.go index be4d03a54..b58856d05 100644 --- a/internal/ingress/types.go +++ b/internal/ingress/types.go @@ -33,7 +33,6 @@ import ( "k8s.io/ingress-nginx/internal/ingress/annotations/redirect" "k8s.io/ingress-nginx/internal/ingress/annotations/rewrite" "k8s.io/ingress-nginx/internal/ingress/resolver" - "k8s.io/ingress-nginx/internal/ingress/store" ) var ( @@ -44,17 +43,6 @@ var ( DefaultSSLDirectory = "/ingress-controller/ssl" ) -// StoreLister returns the configured stores for ingresses, services, -// endpoints, secrets and configmaps. -type StoreLister struct { - Ingress store.IngressLister - Service store.ServiceLister - Endpoint store.EndpointLister - Secret store.SecretLister - ConfigMap store.ConfigMapLister - IngressAnnotation store.IngressAnnotationsLister -} - // Configuration holds the definition of all the parts required to describe all // ingresses reachable by the ingress controller (using a filter by namespace) type Configuration struct { diff --git a/internal/k8s/main.go b/internal/k8s/main.go index b391cad69..dcb887c0a 100644 --- a/internal/k8s/main.go +++ b/internal/k8s/main.go @@ -21,9 +21,11 @@ import ( "os" "strings" + "github.com/golang/glog" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) // ParseNameNS parses a string searching a namespace and name @@ -96,3 +98,13 @@ func GetPodDetails(kubeClient clientset.Interface) (*PodInfo, error) { Labels: pod.GetLabels(), }, nil } + +// MetaNamespaceKey knows how to make keys for API objects which implement meta.Interface. +func MetaNamespaceKey(obj interface{}) string { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + glog.Warning(err) + } + + return key +} diff --git a/internal/net/ssl/ssl.go b/internal/net/ssl/ssl.go index 5efc07503..ca684b74f 100644 --- a/internal/net/ssl/ssl.go +++ b/internal/net/ssl/ssl.go @@ -26,10 +26,8 @@ import ( "encoding/pem" "errors" "fmt" - "io/ioutil" "math/big" "net" - "os" "strconv" "time" @@ -47,10 +45,12 @@ var ( ) // AddOrUpdateCertAndKey creates a .pem file wth the cert and the key with the specified name -func AddOrUpdateCertAndKey(name string, cert, key, ca []byte) (*ingress.SSLCert, error) { +func AddOrUpdateCertAndKey(name string, cert, key, ca []byte, + fs file.Filesystem) (*ingress.SSLCert, error) { + pemName := fmt.Sprintf("%v.pem", name) - pemFileName := fmt.Sprintf("%v/%v", ingress.DefaultSSLDirectory, pemName) - tempPemFile, err := ioutil.TempFile(ingress.DefaultSSLDirectory, pemName) + pemFileName := fmt.Sprintf("%v/%v", file.DefaultSSLDirectory, pemName) + tempPemFile, err := fs.TempFile(file.DefaultSSLDirectory, pemName) if err != nil { return nil, fmt.Errorf("could not create temp pem file %v: %v", pemFileName, err) @@ -74,34 +74,30 @@ func AddOrUpdateCertAndKey(name string, cert, key, ca []byte) (*ingress.SSLCert, if err != nil { return nil, fmt.Errorf("could not close temp pem file %v: %v", tempPemFile.Name(), err) } + defer fs.RemoveAll(tempPemFile.Name()) - pemCerts, err := ioutil.ReadFile(tempPemFile.Name()) + pemCerts, err := fs.ReadFile(tempPemFile.Name()) if err != nil { - _ = os.Remove(tempPemFile.Name()) return nil, err } pemBlock, _ := pem.Decode(pemCerts) if pemBlock == nil { - _ = os.Remove(tempPemFile.Name()) return nil, fmt.Errorf("no valid PEM formatted block found") } // If the file does not start with 'BEGIN CERTIFICATE' it's invalid and must not be used. if pemBlock.Type != "CERTIFICATE" { - _ = os.Remove(tempPemFile.Name()) return nil, fmt.Errorf("certificate %v contains invalid data, and must be created with 'kubectl create secret tls'", name) } pemCert, err := x509.ParseCertificate(pemBlock.Bytes) if err != nil { - _ = os.Remove(tempPemFile.Name()) return nil, err } //Ensure that certificate and private key have a matching public key if _, err := tls.X509KeyPair(cert, key); err != nil { - _ = os.Remove(tempPemFile.Name()) return nil, err } @@ -129,7 +125,7 @@ func AddOrUpdateCertAndKey(name string, cert, key, ca []byte) (*ingress.SSLCert, } } - err = os.Rename(tempPemFile.Name(), pemFileName) + err = fs.Rename(tempPemFile.Name(), pemFileName) if err != nil { return nil, fmt.Errorf("could not move temp pem file %v to destination %v: %v", tempPemFile.Name(), pemFileName, err) } @@ -147,18 +143,24 @@ func AddOrUpdateCertAndKey(name string, cert, key, ca []byte) (*ingress.SSLCert, return nil, errors.New(oe) } - caFile, err := os.OpenFile(pemFileName, os.O_RDWR|os.O_APPEND, 0600) + caData, err := fs.ReadFile(pemFileName) if err != nil { return nil, fmt.Errorf("could not open file %v for writing additional CA chains: %v", pemFileName, err) } - defer caFile.Close() + caFile, err := fs.Create(pemFileName) + _, err = caFile.Write(caData) + if err != nil { + return nil, fmt.Errorf("could not append CA to cert file %v: %v", pemFileName, err) + } + _, err = caFile.Write([]byte("\n")) if err != nil { return nil, fmt.Errorf("could not append CA to cert file %v: %v", pemFileName, err) } caFile.Write(ca) caFile.Write([]byte("\n")) + defer caFile.Close() return &ingress.SSLCert{ Certificate: pemCert, @@ -249,10 +251,10 @@ func parseSANExtension(value []byte) (dnsNames, emailAddresses []string, ipAddre // AddCertAuth creates a .pem file with the specified CAs to be used in Cert Authentication // If it's already exists, it's clobbered. -func AddCertAuth(name string, ca []byte) (*ingress.SSLCert, error) { +func AddCertAuth(name string, ca []byte, fs file.Filesystem) (*ingress.SSLCert, error) { caName := fmt.Sprintf("ca-%v.pem", name) - caFileName := fmt.Sprintf("%v/%v", ingress.DefaultSSLDirectory, caName) + caFileName := fmt.Sprintf("%v/%v", file.DefaultSSLDirectory, caName) pemCABlock, _ := pem.Decode(ca) if pemCABlock == nil { @@ -268,7 +270,13 @@ func AddCertAuth(name string, ca []byte) (*ingress.SSLCert, error) { return nil, err } - err = ioutil.WriteFile(caFileName, ca, 0644) + caFile, err := fs.Create(caFileName) + if err != nil { + return nil, fmt.Errorf("could not write CA file %v: %v", caFileName, err) + } + defer caFile.Close() + + _, err = caFile.Write(ca) if err != nil { return nil, fmt.Errorf("could not write CA file %v: %v", caFileName, err) } @@ -282,11 +290,11 @@ func AddCertAuth(name string, ca []byte) (*ingress.SSLCert, error) { } // AddOrUpdateDHParam creates a dh parameters file with the specified name -func AddOrUpdateDHParam(name string, dh []byte) (string, error) { +func AddOrUpdateDHParam(name string, dh []byte, fs file.Filesystem) (string, error) { pemName := fmt.Sprintf("%v.pem", name) - pemFileName := fmt.Sprintf("%v/%v", ingress.DefaultSSLDirectory, pemName) + pemFileName := fmt.Sprintf("%v/%v", file.DefaultSSLDirectory, pemName) - tempPemFile, err := ioutil.TempFile(ingress.DefaultSSLDirectory, pemName) + tempPemFile, err := fs.TempFile(file.DefaultSSLDirectory, pemName) glog.V(3).Infof("Creating temp file %v for DH param: %v", tempPemFile.Name(), pemName) if err != nil { @@ -303,25 +311,24 @@ func AddOrUpdateDHParam(name string, dh []byte) (string, error) { return "", fmt.Errorf("could not close temp pem file %v: %v", tempPemFile.Name(), err) } - pemCerts, err := ioutil.ReadFile(tempPemFile.Name()) + defer fs.RemoveAll(tempPemFile.Name()) + + pemCerts, err := fs.ReadFile(tempPemFile.Name()) if err != nil { - _ = os.Remove(tempPemFile.Name()) return "", err } pemBlock, _ := pem.Decode(pemCerts) if pemBlock == nil { - _ = os.Remove(tempPemFile.Name()) return "", fmt.Errorf("no valid PEM formatted block found") } // If the file does not start with 'BEGIN DH PARAMETERS' it's invalid and must not be used. if pemBlock.Type != "DH PARAMETERS" { - _ = os.Remove(tempPemFile.Name()) return "", fmt.Errorf("certificate %v contains invalid data", name) } - err = os.Rename(tempPemFile.Name(), pemFileName) + err = fs.Rename(tempPemFile.Name(), pemFileName) if err != nil { return "", fmt.Errorf("could not move temp pem file %v to destination %v: %v", tempPemFile.Name(), pemFileName, err) } @@ -382,13 +389,8 @@ func GetFakeSSLCert() ([]byte, []byte) { // FullChainCert checks if a certificate file contains issues in the intermediate CA chain // Returns a new certificate with the intermediate certificates. // If the certificate does not contains issues with the chain it return an empty byte array -func FullChainCert(in string) ([]byte, error) { - inputFile, err := os.Open(in) - if err != nil { - return nil, err - } - - data, err := ioutil.ReadAll(inputFile) +func FullChainCert(in string, fs file.Filesystem) ([]byte, error) { + data, err := fs.ReadFile(in) if err != nil { return nil, err } diff --git a/internal/net/ssl/ssl_test.go b/internal/net/ssl/ssl_test.go index 95767eeca..d6456050b 100644 --- a/internal/net/ssl/ssl_test.go +++ b/internal/net/ssl/ssl_test.go @@ -19,14 +19,13 @@ package ssl import ( "crypto/x509" "fmt" - "io/ioutil" "testing" "time" certutil "k8s.io/client-go/util/cert" "k8s.io/client-go/util/cert/triple" - "k8s.io/ingress-nginx/internal/ingress" + "k8s.io/ingress-nginx/internal/file" ) // generateRSACerts generates a self signed certificate using a self generated ca @@ -57,11 +56,7 @@ func generateRSACerts(host string) (*triple.KeyPair, *triple.KeyPair, error) { } func TestAddOrUpdateCertAndKey(t *testing.T) { - td, err := ioutil.TempDir("", "ssl") - if err != nil { - t.Fatalf("Unexpected error creating temporal directory: %v", err) - } - ingress.DefaultSSLDirectory = td + fs := newFS(t) cert, _, err := generateRSACerts("echoheaders") if err != nil { @@ -73,7 +68,7 @@ func TestAddOrUpdateCertAndKey(t *testing.T) { c := certutil.EncodeCertPEM(cert.Cert) k := certutil.EncodePrivateKeyPEM(cert.Key) - ngxCert, err := AddOrUpdateCertAndKey(name, c, k, []byte{}) + ngxCert, err := AddOrUpdateCertAndKey(name, c, k, []byte{}, fs) if err != nil { t.Fatalf("unexpected error checking SSL certificate: %v", err) } @@ -92,11 +87,7 @@ func TestAddOrUpdateCertAndKey(t *testing.T) { } func TestCACert(t *testing.T) { - td, err := ioutil.TempDir("", "ssl") - if err != nil { - t.Fatalf("Unexpected error creating temporal directory: %v", err) - } - ingress.DefaultSSLDirectory = td + fs := newFS(t) cert, CA, err := generateRSACerts("echoheaders") if err != nil { @@ -109,7 +100,7 @@ func TestCACert(t *testing.T) { k := certutil.EncodePrivateKeyPEM(cert.Key) ca := certutil.EncodeCertPEM(CA.Cert) - ngxCert, err := AddOrUpdateCertAndKey(name, c, k, ca) + ngxCert, err := AddOrUpdateCertAndKey(name, c, k, ca, fs) if err != nil { t.Fatalf("unexpected error checking SSL certificate: %v", err) } @@ -129,11 +120,10 @@ func TestGetFakeSSLCert(t *testing.T) { } func TestAddCertAuth(t *testing.T) { - td, err := ioutil.TempDir("", "ssl") + fs, err := file.NewFakeFS() if err != nil { - t.Fatalf("Unexpected error creating temporal directory: %v", err) + t.Fatalf("unexpected error creating filesystem: %v", err) } - ingress.DefaultSSLDirectory = td cn := "demo-ca" _, ca, err := generateRSACerts(cn) @@ -141,7 +131,7 @@ func TestAddCertAuth(t *testing.T) { t.Fatalf("unexpected error creating SSL certificate: %v", err) } c := certutil.EncodeCertPEM(ca.Cert) - ic, err := AddCertAuth(cn, c) + ic, err := AddCertAuth(cn, c, fs) if err != nil { t.Fatalf("unexpected error creating SSL certificate: %v", err) } @@ -149,3 +139,11 @@ func TestAddCertAuth(t *testing.T) { t.Fatalf("expected a valid CA file name") } } + +func newFS(t *testing.T) file.Filesystem { + fs, err := file.NewFakeFS() + if err != nil { + t.Fatalf("unexpected error creating filesystem: %v", err) + } + return fs +} diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index cbd7f5038..586f99968 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/ingress-nginx/internal/file" ) const ( @@ -97,9 +98,10 @@ var RunID = uuid.NewUUID() // CreateKubeNamespace creates a new namespace in the cluster func CreateKubeNamespace(baseName string, c kubernetes.Interface) (*v1.Namespace, error) { + ts := time.Now().UnixNano() ns := &v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: fmt.Sprintf("e2e-tests-%v-", baseName), + GenerateName: fmt.Sprintf("e2e-tests-%v-%v-", baseName, ts), }, } // Be robust about making the namespace creation call. @@ -207,6 +209,30 @@ func secretInNamespace(c kubernetes.Interface, namespace, name string) wait.Cond } } +// WaitForFileInFS waits a default amount of time for the specified file is present in the filesystem +func WaitForFileInFS(file string, fs file.Filesystem) error { + return wait.PollImmediate(1*time.Second, time.Minute*2, fileInFS(file, fs)) +} + +func fileInFS(file string, fs file.Filesystem) wait.ConditionFunc { + return func() (bool, error) { + stat, err := fs.Stat(file) + if err != nil { + return false, err + } + + if stat == nil { + return false, fmt.Errorf("file %v does not exists", file) + } + + if stat.Size() > 0 { + return true, nil + } + + return false, fmt.Errorf("the file %v exists but it is empty", file) + } +} + // WaitForNoIngressInNamespace waits until there is no ingress object in a particular namespace func WaitForNoIngressInNamespace(c kubernetes.Interface, namespace, name string) error { return wait.PollImmediate(1*time.Second, time.Minute*2, noIngressInNamespace(c, namespace, name))