From cd288b99931f4c1aef90cf8b94070909e9ea34ce Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Mon, 18 Sep 2017 20:53:26 -0300 Subject: [PATCH] Improve resource usage in nginx controller --- controllers/nginx/pkg/cmd/controller/nginx.go | 4 +- controllers/nginx/pkg/template/template.go | 38 +- .../rootfs/etc/nginx/template/nginx.tmpl | 2 + core/pkg/base64/base64.go | 12 - .../pkg/ingress/annotations/ratelimit/main.go | 9 +- core/pkg/ingress/controller/backend_ssl.go | 6 +- .../ingress/controller/backend_ssl_test.go | 14 +- core/pkg/ingress/controller/controller.go | 363 +++++------------- core/pkg/ingress/controller/listers.go | 200 ++++++++++ core/pkg/ingress/controller/util_test.go | 2 +- core/pkg/ingress/sort_ingress.go | 46 --- core/pkg/ingress/sort_ingress_test.go | 341 ---------------- core/pkg/ingress/status/status.go | 27 +- core/pkg/ingress/status/status_test.go | 63 +-- core/pkg/ingress/store/main.go | 49 ++- core/pkg/ingress/types.go | 4 +- core/pkg/ingress/types_equals.go | 2 +- 17 files changed, 388 insertions(+), 794 deletions(-) delete mode 100644 core/pkg/base64/base64.go create mode 100644 core/pkg/ingress/controller/listers.go diff --git a/controllers/nginx/pkg/cmd/controller/nginx.go b/controllers/nginx/pkg/cmd/controller/nginx.go index 1562fb7e8..29e647375 100644 --- a/controllers/nginx/pkg/cmd/controller/nginx.go +++ b/controllers/nginx/pkg/cmd/controller/nginx.go @@ -124,7 +124,7 @@ type NGINXController struct { configmap *apiv1.ConfigMap - storeLister ingress.StoreLister + storeLister *ingress.StoreLister binary string resolver []net.IP @@ -463,7 +463,7 @@ func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) { } // SetListers sets the configured store listers in the generic ingress controller -func (n *NGINXController) SetListers(lister ingress.StoreLister) { +func (n *NGINXController) SetListers(lister *ingress.StoreLister) { n.storeLister = lister } diff --git a/controllers/nginx/pkg/template/template.go b/controllers/nginx/pkg/template/template.go index 9a250a10f..b3762586e 100644 --- a/controllers/nginx/pkg/template/template.go +++ b/controllers/nginx/pkg/template/template.go @@ -49,11 +49,9 @@ const ( // Template ... type Template struct { - tmpl *text_template.Template - fw watch.FileWatcher - s int - tmplBuf *bytes.Buffer - outCmdBuf *bytes.Buffer + tmpl *text_template.Template + fw watch.FileWatcher + s int } //NewTemplate returns a new Template instance or an @@ -69,11 +67,9 @@ func NewTemplate(file string, onChange func()) (*Template, error) { } return &Template{ - tmpl: tmpl, - fw: fw, - s: defBufferSize, - tmplBuf: bytes.NewBuffer(make([]byte, 0, defBufferSize)), - outCmdBuf: bytes.NewBuffer(make([]byte, 0, defBufferSize)), + tmpl: tmpl, + fw: fw, + s: defBufferSize, }, nil } @@ -85,15 +81,13 @@ func (t *Template) Close() { // Write populates a buffer using a template with NGINX configuration // and the servers and upstreams created by Ingress rules func (t *Template) Write(conf config.TemplateConfig) ([]byte, error) { - defer t.tmplBuf.Reset() - defer t.outCmdBuf.Reset() + tmplBuf := bytes.NewBuffer(make([]byte, 0, t.s)) + outCmdBuf := bytes.NewBuffer(make([]byte, 0, t.s)) defer func() { - if t.s < t.tmplBuf.Cap() { - glog.V(2).Infof("adjusting template buffer size from %v to %v", t.s, t.tmplBuf.Cap()) - t.s = t.tmplBuf.Cap() - t.tmplBuf = bytes.NewBuffer(make([]byte, 0, t.tmplBuf.Cap())) - t.outCmdBuf = bytes.NewBuffer(make([]byte, 0, t.outCmdBuf.Cap())) + if t.s < tmplBuf.Cap() { + glog.V(2).Infof("adjusting template buffer size from %v to %v", t.s, tmplBuf.Cap()) + t.s = tmplBuf.Cap() } }() @@ -105,7 +99,7 @@ func (t *Template) Write(conf config.TemplateConfig) ([]byte, error) { glog.Infof("NGINX configuration: %v", string(b)) } - err := t.tmpl.Execute(t.tmplBuf, conf) + err := t.tmpl.Execute(tmplBuf, conf) if err != nil { return nil, err } @@ -113,14 +107,14 @@ func (t *Template) Write(conf config.TemplateConfig) ([]byte, error) { // squeezes multiple adjacent empty lines to be single // spaced this is to avoid the use of regular expressions cmd := exec.Command("/ingress-controller/clean-nginx-conf.sh") - cmd.Stdin = t.tmplBuf - cmd.Stdout = t.outCmdBuf + cmd.Stdin = tmplBuf + cmd.Stdout = outCmdBuf if err := cmd.Run(); err != nil { glog.Warningf("unexpected error cleaning template: %v", err) - return t.tmplBuf.Bytes(), nil + return tmplBuf.Bytes(), nil } - return t.outCmdBuf.Bytes(), nil + return outCmdBuf.Bytes(), nil } var ( diff --git a/controllers/nginx/rootfs/etc/nginx/template/nginx.tmpl b/controllers/nginx/rootfs/etc/nginx/template/nginx.tmpl index cfa721acd..4fc6f95a6 100644 --- a/controllers/nginx/rootfs/etc/nginx/template/nginx.tmpl +++ b/controllers/nginx/rootfs/etc/nginx/template/nginx.tmpl @@ -348,6 +348,7 @@ http { {{ end }} {{ range $index, $server := $servers }} + server { server_name {{ $server.Hostname }}; {{ template "SERVER" serverConfig $all $server }} @@ -355,6 +356,7 @@ http { {{ template "CUSTOM_ERRORS" $all }} } + {{ if $server.Alias }} server { server_name {{ $server.Alias }}; diff --git a/core/pkg/base64/base64.go b/core/pkg/base64/base64.go deleted file mode 100644 index 6c4480148..000000000 --- a/core/pkg/base64/base64.go +++ /dev/null @@ -1,12 +0,0 @@ -package base64 - -import ( - "encoding/base64" - "strings" -) - -// Encode encodes a string to base64 removing the equals character -func Encode(s string) string { - str := base64.URLEncoding.EncodeToString([]byte(s)) - return strings.Replace(str, "=", "", -1) -} diff --git a/core/pkg/ingress/annotations/ratelimit/main.go b/core/pkg/ingress/annotations/ratelimit/main.go index 1c173ec48..b2d8979f1 100644 --- a/core/pkg/ingress/annotations/ratelimit/main.go +++ b/core/pkg/ingress/annotations/ratelimit/main.go @@ -17,13 +17,13 @@ limitations under the License. package ratelimit import ( + "encoding/base64" "fmt" "sort" "strings" extensions "k8s.io/api/extensions/v1beta1" - "k8s.io/ingress/core/pkg/base64" "k8s.io/ingress/core/pkg/ingress/annotations/parser" "k8s.io/ingress/core/pkg/ingress/resolver" "k8s.io/ingress/core/pkg/net" @@ -218,7 +218,7 @@ func (a ratelimit) Parse(ing *extensions.Ingress) (interface{}, error) { LimitRate: lr, LimitRateAfter: lra, Name: zoneName, - ID: base64.Encode(zoneName), + ID: encode(zoneName), Whitelist: cidrs, }, nil } @@ -248,3 +248,8 @@ func parseCIDRs(s string) ([]string, error) { return cidrs, nil } + +func encode(s string) string { + str := base64.URLEncoding.EncodeToString([]byte(s)) + return strings.Replace(str, "=", "", -1) +} diff --git a/core/pkg/ingress/controller/backend_ssl.go b/core/pkg/ingress/controller/backend_ssl.go index d9be39ac5..3b3bb48ba 100644 --- a/core/pkg/ingress/controller/backend_ssl.go +++ b/core/pkg/ingress/controller/backend_ssl.go @@ -67,15 +67,11 @@ func (ic *GenericController) syncSecret(key string) { // getPemCertificate receives a secret, and creates a ingress.SSLCert as return. // It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only. func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLCert, error) { - secretInterface, exists, err := ic.secrLister.Store.GetByKey(secretName) + secret, err := ic.listers.Secret.GetByName(secretName) if err != nil { return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err) } - if !exists { - return nil, fmt.Errorf("secret named %v does not exist", secretName) - } - secret := secretInterface.(*apiv1.Secret) cert, okcert := secret.Data[apiv1.TLSCertKey] key, okkey := secret.Data[apiv1.TLSPrivateKeyKey] diff --git a/core/pkg/ingress/controller/backend_ssl_test.go b/core/pkg/ingress/controller/backend_ssl_test.go index b2756b3df..e2386b1a5 100644 --- a/core/pkg/ingress/controller/backend_ssl_test.go +++ b/core/pkg/ingress/controller/backend_ssl_test.go @@ -86,6 +86,13 @@ func buildSecrListerForBackendSSL() store.SecretLister { return secrLister } +func buildListers() *ingress.StoreLister { + sl := &ingress.StoreLister{} + sl.Ingress.Store = buildIngListenerForBackendSSL() + sl.Secret.Store = buildSecrListerForBackendSSL() + return sl +} + func buildControllerForBackendSSL() cache_client.Controller { cfg := &cache_client.Config{ Queue: &MockQueue{Synced: true}, @@ -99,8 +106,7 @@ func buildGenericControllerForBackendSSL() *GenericController { cfg: &Configuration{ Client: buildSimpleClientSetForBackendSSL(), }, - ingLister: buildIngListenerForBackendSSL(), - secrLister: buildSecrListerForBackendSSL(), + listers: buildListers(), ingController: buildControllerForBackendSSL(), endpController: buildControllerForBackendSSL(), @@ -162,7 +168,7 @@ func TestSyncSecret(t *testing.T) { secret.SetNamespace("default") secret.SetName("foo_secret") secret.Data = foo.Data - ic.secrLister.Add(secret) + ic.listers.Secret.Add(secret) key := "default/foo_secret" // for add @@ -209,7 +215,7 @@ func TestGetPemCertificate(t *testing.T) { ic := buildGenericControllerForBackendSSL() secret := buildSecretForBackendSSL() secret.Data = foo.Data - ic.secrLister.Add(secret) + ic.listers.Secret.Add(secret) sslCert, err := ic.getPemCertificate(foo.secretName) if foo.eErr { diff --git a/core/pkg/ingress/controller/controller.go b/core/pkg/ingress/controller/controller.go index e3ca01116..2f703234d 100644 --- a/core/pkg/ingress/controller/controller.go +++ b/core/pkg/ingress/controller/controller.go @@ -31,7 +31,6 @@ import ( apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/conversion" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -39,7 +38,6 @@ import ( "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" - fcache "k8s.io/client-go/tools/cache/testing" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" @@ -51,7 +49,6 @@ import ( "k8s.io/ingress/core/pkg/ingress/defaults" "k8s.io/ingress/core/pkg/ingress/resolver" "k8s.io/ingress/core/pkg/ingress/status" - "k8s.io/ingress/core/pkg/ingress/store" "k8s.io/ingress/core/pkg/k8s" "k8s.io/ingress/core/pkg/net/ssl" local_strings "k8s.io/ingress/core/pkg/strings" @@ -87,12 +84,7 @@ type GenericController struct { secrController cache.Controller mapController cache.Controller - ingLister store.IngressLister - svcLister store.ServiceLister - nodeLister store.NodeLister - endpLister store.EndpointLister - secrLister store.SecretLister - mapLister store.ConfigMapLister + listers *ingress.StoreLister annotations annotationExtractor @@ -119,6 +111,8 @@ type GenericController struct { runningConfig *ingress.Configuration forceReload bool + + initialSyncDone bool } // Configuration contains all the settings required by an Ingress controller @@ -171,177 +165,18 @@ func newIngressController(config *Configuration) *GenericController { Component: "ingress-controller", }), sslCertTracker: newSSLCertTracker(), + listers: &ingress.StoreLister{}, } ic.syncQueue = task.NewTaskQueue(ic.syncIngress) - // from here to the end of the method all the code is just boilerplate - // required to watch Ingress, Secrets, ConfigMaps and Endoints. - // This is used to detect new content, updates or removals and act accordingly - ingEventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - addIng := obj.(*extensions.Ingress) - if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { - a, _ := parser.GetStringAnnotation(class.IngressKey, addIng) - glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a) - return - } - ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) - ic.syncQueue.Enqueue(obj) - }, - DeleteFunc: func(obj interface{}) { - delIng, ok := obj.(*extensions.Ingress) - if !ok { - // If we reached here it means the ingress was deleted but its final state is unrecorded. - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("couldn't get object from tombstone %#v", obj) - return - } - delIng, ok = tombstone.Obj.(*extensions.Ingress) - if !ok { - glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj) - return - } - } - if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { - glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey) - return - } - ic.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) - ic.syncQueue.Enqueue(obj) - }, - UpdateFunc: func(old, cur interface{}) { - oldIng := old.(*extensions.Ingress) - curIng := cur.(*extensions.Ingress) - validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) - validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) - if !validOld && validCur { - glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey) - ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) - } else if validOld && !validCur { - glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey) - ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) - } else if validCur && !reflect.DeepEqual(old, cur) { - ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) - } - - ic.syncQueue.Enqueue(cur) - }, - } - - secrEventHandler := cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(old, cur interface{}) { - if !reflect.DeepEqual(old, cur) { - sec := cur.(*apiv1.Secret) - key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) - ic.syncSecret(key) - } - }, - DeleteFunc: func(obj interface{}) { - sec, ok := obj.(*apiv1.Secret) - if !ok { - // If we reached here it means the secret was deleted but its final state is unrecorded. - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("couldn't get object from tombstone %#v", obj) - return - } - sec, ok = tombstone.Obj.(*apiv1.Secret) - if !ok { - glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj) - return - } - } - key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) - ic.sslCertTracker.DeleteAll(key) - }, - } - - eventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - ic.syncQueue.Enqueue(obj) - }, - DeleteFunc: func(obj interface{}) { - ic.syncQueue.Enqueue(obj) - }, - UpdateFunc: func(old, cur interface{}) { - oep := old.(*apiv1.Endpoints) - ocur := cur.(*apiv1.Endpoints) - if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) { - ic.syncQueue.Enqueue(cur) - } - }, - } - - mapEventHandler := cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - upCmap := obj.(*apiv1.ConfigMap) - mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) - if mapKey == ic.cfg.ConfigMapName { - glog.V(2).Infof("adding configmap %v to backend", mapKey) - ic.cfg.Backend.SetConfig(upCmap) - ic.forceReload = true - } - }, - UpdateFunc: func(old, cur interface{}) { - if !reflect.DeepEqual(old, cur) { - upCmap := cur.(*apiv1.ConfigMap) - mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) - if mapKey == ic.cfg.ConfigMapName { - glog.V(2).Infof("updating configmap backend (%v)", mapKey) - ic.cfg.Backend.SetConfig(upCmap) - ic.forceReload = true - } - // updates to configuration configmaps can trigger an update - if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName { - ic.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) - ic.syncQueue.Enqueue(cur) - } - } - }, - } - - watchNs := apiv1.NamespaceAll - if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != apiv1.NamespaceAll { - watchNs = ic.cfg.Namespace - } - - ic.ingLister.Store, ic.ingController = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()), - &extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler) - - ic.endpLister.Store, ic.endpController = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()), - &apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler) - - ic.secrLister.Store, ic.secrController = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()), - &apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler) - - ic.mapLister.Store, ic.mapController = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()), - &apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler) - - ic.svcLister.Store, ic.svcController = cache.NewInformer( - cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()), - &apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) - - var nodeListerWatcher cache.ListerWatcher - if config.DisableNodeList { - nodeListerWatcher = fcache.NewFakeControllerSource() - } else { - nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) - } - ic.nodeLister.Store, ic.nodeController = cache.NewInformer( - nodeListerWatcher, - &apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) + ic.createListers(config.DisableNodeList) if config.UpdateStatus { ic.syncStatus = status.NewStatusSyncer(status.Config{ Client: config.Client, PublishService: ic.cfg.PublishService, - IngressLister: ic.ingLister, + IngressLister: ic.listers.Ingress, ElectionID: config.ElectionID, IngressClass: config.IngressClass, DefaultIngressClass: config.DefaultIngressClass, @@ -353,14 +188,7 @@ func newIngressController(config *Configuration) *GenericController { } ic.annotations = newAnnotationExtractor(ic) - ic.cfg.Backend.SetListers(ingress.StoreLister{ - Ingress: ic.ingLister, - Service: ic.svcLister, - Node: ic.nodeLister, - Endpoint: ic.endpLister, - Secret: ic.secrLister, - ConfigMap: ic.mapLister, - }) + ic.cfg.Backend.SetListers(ic.listers) cloner.RegisterDeepCopyFunc(ingress.GetGeneratedDeepCopyFuncs) @@ -384,7 +212,7 @@ func (ic GenericController) GetDefaultBackend() defaults.Backend { // GetPublishService returns the configured service used to set ingress status func (ic GenericController) GetPublishService() *apiv1.Service { - s, err := ic.GetService(ic.cfg.PublishService) + s, err := ic.listers.Service.GetByName(ic.cfg.PublishService) if err != nil { return nil } @@ -399,37 +227,12 @@ func (ic GenericController) GetRecorder() record.EventRecorder { // GetSecret searches for a secret in the local secrets Store func (ic GenericController) GetSecret(name string) (*apiv1.Secret, error) { - s, exists, err := ic.secrLister.Store.GetByKey(name) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("secret %v was not found", name) - } - return s.(*apiv1.Secret), nil + return ic.listers.Secret.GetByName(name) } // GetService searches for a service in the local secrets Store func (ic GenericController) GetService(name string) (*apiv1.Service, error) { - s, exists, err := ic.svcLister.Store.GetByKey(name) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("service %v was not found", name) - } - return s.(*apiv1.Service), nil -} - -func (ic *GenericController) getConfigMap(ns, name string) (*apiv1.ConfigMap, error) { - s, exists, err := ic.mapLister.Store.GetByKey(fmt.Sprintf("%v/%v", ns, name)) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("configmap %v was not found", name) - } - return s.(*apiv1.ConfigMap), nil + return ic.listers.Service.GetByName(name) } // sync collects all the pieces required to assemble the configuration file and @@ -443,7 +246,7 @@ func (ic *GenericController) syncIngress(key interface{}) error { } if name, ok := key.(string); ok { - if obj, exists, _ := ic.ingLister.GetByKey(name); exists { + if obj, exists, _ := ic.listers.Ingress.GetByKey(name); exists { ing := obj.(*extensions.Ingress) ic.readSecrets(ing) } @@ -511,15 +314,15 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1 return []ingress.L4Service{} } - ns, name, err := k8s.ParseNameNS(configmapName) + _, _, err := k8s.ParseNameNS(configmapName) if err != nil { - glog.Errorf("unexpected error reading configmap %v: %v", name, err) + glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err) return []ingress.L4Service{} } - configmap, err := ic.getConfigMap(ns, name) + configmap, err := ic.listers.ConfigMap.GetByName(configmapName) if err != nil { - glog.Errorf("unexpected error reading configmap %v: %v", name, err) + glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err) return []ingress.L4Service{} } @@ -562,7 +365,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1 continue } - svcObj, svcExists, err := ic.svcLister.Store.GetByKey(nsName) + svcObj, svcExists, err := ic.listers.Service.GetByKey(nsName) if err != nil { glog.Warningf("error getting service %v: %v", nsName, err) continue @@ -578,7 +381,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1 var endps []ingress.Endpoint targetPort, err := strconv.Atoi(svcPort) if err != nil { - glog.V(3).Infof("searching service %v/%v endpoints using the name '%v'", svcNs, svcName, svcPort) + glog.V(3).Infof("searching service %v endpoints using the name '%v'", svcNs, svcName, svcPort) for _, sp := range svc.Spec.Ports { if sp.Name == svcPort { if sp.Protocol == proto { @@ -631,7 +434,7 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Backend { Name: defUpstreamName, } svcKey := ic.cfg.DefaultService - svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey) + svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey) if err != nil { glog.Warningf("unexpected error searching the default backend %v: %v", ic.cfg.DefaultService, err) upstream.Endpoints = append(upstream.Endpoints, ic.cfg.Backend.DefaultEndpoint()) @@ -656,36 +459,19 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Backend { return upstream } -type ingressByRevision []interface{} - -func (c ingressByRevision) Len() int { return len(c) } -func (c ingressByRevision) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -func (c ingressByRevision) Less(i, j int) bool { - ir := c[i].(*extensions.Ingress).ResourceVersion - jr := c[j].(*extensions.Ingress).ResourceVersion - return ir < jr -} - // getBackendServers returns a list of Upstream and Server to be used by the backend // An upstream can be used in multiple servers if the namespace, service name and port are the same func (ic *GenericController) getBackendServers() ([]*ingress.Backend, []*ingress.Server) { - ings := ic.ingLister.Store.List() - sort.Sort(ingressByRevision(ings)) + ings := ic.listers.Ingress.List() + sort.Slice(ings, func(i, j int) bool { + ir := ings[i].(*extensions.Ingress).ResourceVersion + jr := ings[j].(*extensions.Ingress).ResourceVersion + return ir < jr + }) - upstreams := ic.createUpstreams(ings) - servers := ic.createServers(ings, upstreams) - - // If a server has a hostname equivalent to a pre-existing alias, then we - // remove the alias to avoid conflicts. - for _, server := range servers { - for j, alias := range servers { - if server.Hostname == alias.Alias { - glog.Warningf("There is a conflict with server hostname '%v' and alias '%v' (in server %v). Removing alias to avoid conflicts.", - server.Hostname, alias.Hostname, alias.Hostname) - servers[j].Alias = "" - } - } - } + du := ic.getDefaultUpstream() + upstreams := ic.createUpstreams(ings, du) + servers := ic.createServers(ings, upstreams, du) for _, ingIf := range ings { ing := ingIf.(*extensions.Ingress) @@ -860,15 +646,22 @@ func (ic *GenericController) getBackendServers() ([]*ingress.Backend, []*ingress } if ic.cfg.SortBackends { - sort.Sort(ingress.BackendByNameServers(aUpstreams)) + sort.Slice(aUpstreams, func(a, b int) bool { + return aUpstreams[a].Name < aUpstreams[b].Name + }) } aServers := make([]*ingress.Server, 0, len(servers)) for _, value := range servers { - sort.Sort(ingress.LocationByPath(value.Locations)) + sort.Slice(value.Locations, func(i, j int) bool { + return value.Locations[i].Path > value.Locations[j].Path + }) aServers = append(aServers, value) } - sort.Sort(ingress.ServerByName(aServers)) + + sort.Slice(aServers, func(i, j int) bool { + return aServers[i].Hostname < aServers[j].Hostname + }) return aUpstreams, aServers } @@ -879,7 +672,7 @@ func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.Aut ic.syncSecret(secretName) } - _, err := ic.GetSecret(secretName) + _, err := ic.listers.Secret.GetByName(secretName) if err != nil { return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err) } @@ -898,9 +691,9 @@ func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.Aut // createUpstreams creates the NGINX upstreams for each service referenced in // Ingress rules. The servers inside the upstream are endpoints. -func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ingress.Backend { +func (ic *GenericController) createUpstreams(data []interface{}, du *ingress.Backend) map[string]*ingress.Backend { upstreams := make(map[string]*ingress.Backend) - upstreams[defUpstreamName] = ic.getDefaultUpstream() + upstreams[defUpstreamName] = du for _, ingIf := range data { ing := ingIf.(*extensions.Ingress) @@ -994,18 +787,13 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing upstreams[name].Endpoints = endp } - s, exists, err := ic.svcLister.Store.GetByKey(svcKey) + s, err := ic.listers.Service.GetByName(svcKey) if err != nil { glog.Warningf("error obtaining service: %v", err) continue } - if !exists { - glog.Warningf("service %v does not exists", svcKey) - continue - } - - upstreams[name].Service = s.(*apiv1.Service) + upstreams[name].Service = s } } } @@ -1014,7 +802,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing } func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) { - svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey) + svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey) if !svcExists { return endpoint, fmt.Errorf("service %v does not exist", svcKey) @@ -1035,19 +823,13 @@ func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *e // to a service. func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, hz *healthcheck.Upstream) ([]ingress.Endpoint, error) { - svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey) + svc, err := ic.listers.Service.GetByName(svcKey) var upstreams []ingress.Endpoint if err != nil { return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err) } - if !svcExists { - err = fmt.Errorf("service %v does not exist", svcKey) - return upstreams, err - } - - svc := svcObj.(*apiv1.Service) glog.V(3).Infof("obtaining port information for service %v", svcKey) for _, servicePort := range svc.Spec.Ports { // targetPort could be a string, use the name or the port (int) @@ -1061,7 +843,15 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, } if ic.cfg.SortBackends { - sort.Sort(ingress.EndpointByAddrPort(endps)) + sort.Slice(endps, func(i, j int) bool { + iName := endps[i].Address + jName := endps[j].Address + if iName != jName { + return iName < jName + } + + return endps[i].Port < endps[j].Port + }) } upstreams = append(upstreams, endps...) break @@ -1084,11 +874,16 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, // SSL certificates. Each server is configured with location / using a default // backend specified by the user or the one inside the ingress spec. func (ic *GenericController) createServers(data []interface{}, - upstreams map[string]*ingress.Backend) map[string]*ingress.Server { - servers := make(map[string]*ingress.Server) + upstreams map[string]*ingress.Backend, + du *ingress.Backend) map[string]*ingress.Server { + + servers := make(map[string]*ingress.Server, len(data)) + // If a server has a hostname equivalent to a pre-existing alias, then we + // remove the alias to avoid conflicts. + aliases := make(map[string]string, len(data)) bdef := ic.GetDefaultBackend() - ngxProxy := proxy.Configuration{ + ngxProxy := &proxy.Configuration{ BodySize: bdef.ProxyBodySize, ConnectTimeout: bdef.ProxyConnectTimeout, SendTimeout: bdef.ProxySendTimeout, @@ -1111,7 +906,6 @@ func (ic *GenericController) createServers(data []interface{}, } // initialize the default server - du := ic.getDefaultUpstream() servers[defServerName] = &ingress.Server{ Hostname: defServerName, SSLCertificate: defaultPemFileName, @@ -1137,7 +931,6 @@ func (ic *GenericController) createServers(data []interface{}, sslpt := ic.annotations.SSLPassthrough(ing) // default upstream server - du := ic.getDefaultUpstream() un := du.Name if ing.Spec.Backend != nil { @@ -1178,7 +971,9 @@ func (ic *GenericController) createServers(data []interface{}, Proxy: ngxProxy, Service: &apiv1.Service{}, }, - }, SSLPassthrough: sslpt} + }, + SSLPassthrough: sslpt, + } } } @@ -1200,6 +995,11 @@ func (ic *GenericController) createServers(data []interface{}, // setup server aliases servers[host].Alias = aliasAnnotation + if aliasAnnotation != "" { + if _, ok := aliases[aliasAnnotation]; !ok { + aliases[aliasAnnotation] = host + } + } // only add a certificate if the server does not have one previously configured if servers[host].SSLCertificate != "" { @@ -1258,6 +1058,12 @@ func (ic *GenericController) createServers(data []interface{}, } } + for alias, host := range aliases { + if _, ok := servers[alias]; ok { + glog.Warningf("There is a conflict with server hostname '%v' and alias '%v' (in server %v). Removing alias to avoid conflicts.", alias, host) + servers[host].Alias = "" + } + } return servers } @@ -1292,7 +1098,7 @@ func (ic *GenericController) getEndpoints( } glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String()) - ep, err := ic.endpLister.GetServiceEndpoints(s) + ep, err := ic.listers.Endpoint.GetServiceEndpoints(s) if err != nil { glog.Warningf("unexpected error obtaining service endpoints: %v", err) return upsServers @@ -1402,9 +1208,16 @@ func (ic GenericController) Start() { } // initial sync of secrets to avoid unnecessary reloads - for _, key := range ic.ingLister.ListKeys() { - if obj, exists, _ := ic.ingLister.GetByKey(key); exists { + for _, key := range ic.listers.Ingress.ListKeys() { + if obj, exists, _ := ic.listers.Ingress.GetByKey(key); exists { ing := obj.(*extensions.Ingress) + + if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { + a, _ := parser.GetStringAnnotation(class.IngressKey, ing) + glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a) + continue + } + ic.readSecrets(ing) } } @@ -1417,6 +1230,12 @@ func (ic GenericController) Start() { go ic.syncStatus.Run(ic.stopCh) } + ic.initialSyncDone = true + + time.Sleep(5 * time.Second) + // force initial sync + ic.syncQueue.Enqueue(&extensions.Ingress{}) + <-ic.stopCh } diff --git a/core/pkg/ingress/controller/listers.go b/core/pkg/ingress/controller/listers.go new file mode 100644 index 000000000..60cf3a92c --- /dev/null +++ b/core/pkg/ingress/controller/listers.go @@ -0,0 +1,200 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "reflect" + + "github.com/golang/glog" + + apiv1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/tools/cache" + fcache "k8s.io/client-go/tools/cache/testing" + + "k8s.io/ingress/core/pkg/ingress/annotations/class" + "k8s.io/ingress/core/pkg/ingress/annotations/parser" +) + +func (ic *GenericController) createListers(disableNodeLister bool) { + // from here to the end of the method all the code is just boilerplate + // required to watch Ingress, Secrets, ConfigMaps and Endoints. + // This is used to detect new content, updates or removals and act accordingly + ingEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + addIng := obj.(*extensions.Ingress) + if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { + a, _ := parser.GetStringAnnotation(class.IngressKey, addIng) + glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a) + return + } + ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) + + if ic.initialSyncDone { + ic.syncQueue.Enqueue(obj) + } + }, + DeleteFunc: func(obj interface{}) { + delIng, ok := obj.(*extensions.Ingress) + if !ok { + // If we reached here it means the ingress was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("couldn't get object from tombstone %#v", obj) + return + } + delIng, ok = tombstone.Obj.(*extensions.Ingress) + if !ok { + glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj) + return + } + } + if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) { + glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey) + return + } + ic.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) + ic.syncQueue.Enqueue(obj) + }, + UpdateFunc: func(old, cur interface{}) { + oldIng := old.(*extensions.Ingress) + curIng := cur.(*extensions.Ingress) + validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) + validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) + if !validOld && validCur { + glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey) + ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + } else if validOld && !validCur { + glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey) + ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + } else if validCur && !reflect.DeepEqual(old, cur) { + ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + } + + ic.syncQueue.Enqueue(cur) + }, + } + + secrEventHandler := cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + sec := cur.(*apiv1.Secret) + key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) + ic.syncSecret(key) + } + }, + DeleteFunc: func(obj interface{}) { + sec, ok := obj.(*apiv1.Secret) + if !ok { + // If we reached here it means the secret was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("couldn't get object from tombstone %#v", obj) + return + } + sec, ok = tombstone.Obj.(*apiv1.Secret) + if !ok { + glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj) + return + } + } + key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) + ic.sslCertTracker.DeleteAll(key) + }, + } + + eventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ic.syncQueue.Enqueue(obj) + }, + DeleteFunc: func(obj interface{}) { + ic.syncQueue.Enqueue(obj) + }, + UpdateFunc: func(old, cur interface{}) { + oep := old.(*apiv1.Endpoints) + ocur := cur.(*apiv1.Endpoints) + if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) { + ic.syncQueue.Enqueue(cur) + } + }, + } + + mapEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + upCmap := obj.(*apiv1.ConfigMap) + mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) + if mapKey == ic.cfg.ConfigMapName { + glog.V(2).Infof("adding configmap %v to backend", mapKey) + ic.cfg.Backend.SetConfig(upCmap) + ic.forceReload = true + } + }, + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + upCmap := cur.(*apiv1.ConfigMap) + mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) + if mapKey == ic.cfg.ConfigMapName { + glog.V(2).Infof("updating configmap backend (%v)", mapKey) + ic.cfg.Backend.SetConfig(upCmap) + ic.forceReload = true + } + // updates to configuration configmaps can trigger an update + if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName { + ic.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) + ic.syncQueue.Enqueue(cur) + } + } + }, + } + + watchNs := apiv1.NamespaceAll + if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != apiv1.NamespaceAll { + watchNs = ic.cfg.Namespace + } + + ic.listers.Ingress.Store, ic.ingController = cache.NewInformer( + cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()), + &extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler) + + ic.listers.Endpoint.Store, ic.endpController = cache.NewInformer( + cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()), + &apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler) + + ic.listers.Secret.Store, ic.secrController = cache.NewInformer( + cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()), + &apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler) + + ic.listers.ConfigMap.Store, ic.mapController = cache.NewInformer( + cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()), + &apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler) + + ic.listers.Service.Store, ic.svcController = cache.NewInformer( + cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()), + &apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) + + var nodeListerWatcher cache.ListerWatcher + if disableNodeLister { + nodeListerWatcher = fcache.NewFakeControllerSource() + } else { + nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) + } + ic.listers.Node.Store, ic.nodeController = cache.NewInformer( + nodeListerWatcher, + &apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{}) +} diff --git a/core/pkg/ingress/controller/util_test.go b/core/pkg/ingress/controller/util_test.go index 3847cb2ba..3a785e5cc 100644 --- a/core/pkg/ingress/controller/util_test.go +++ b/core/pkg/ingress/controller/util_test.go @@ -51,7 +51,7 @@ func TestMergeLocationAnnotations(t *testing.T) { "Redirect": redirect.Redirect{}, "Rewrite": rewrite.Redirect{}, "Whitelist": ipwhitelist.SourceRange{}, - "Proxy": proxy.Configuration{}, + "Proxy": &proxy.Configuration{}, "UsePortInRedirects": true, } diff --git a/core/pkg/ingress/sort_ingress.go b/core/pkg/ingress/sort_ingress.go index 96147ffd6..6d13f861b 100644 --- a/core/pkg/ingress/sort_ingress.go +++ b/core/pkg/ingress/sort_ingress.go @@ -24,52 +24,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ) -// BackendByNameServers sorts upstreams by name -type BackendByNameServers []*Backend - -func (c BackendByNameServers) Len() int { return len(c) } -func (c BackendByNameServers) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -func (c BackendByNameServers) Less(i, j int) bool { - - return c[i].Name < c[j].Name -} - -// EndpointByAddrPort sorts endpoints by address and port -type EndpointByAddrPort []Endpoint - -func (c EndpointByAddrPort) Len() int { return len(c) } -func (c EndpointByAddrPort) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -func (c EndpointByAddrPort) Less(i, j int) bool { - iName := c[i].Address - jName := c[j].Address - if iName != jName { - return iName < jName - } - - iU := c[i].Port - jU := c[j].Port - return iU < jU -} - -// ServerByName sorts servers by name -type ServerByName []*Server - -func (c ServerByName) Len() int { return len(c) } -func (c ServerByName) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -func (c ServerByName) Less(i, j int) bool { - return c[i].Hostname < c[j].Hostname -} - -// LocationByPath sorts location by path in descending order -// Location / is the last one -type LocationByPath []*Location - -func (c LocationByPath) Len() int { return len(c) } -func (c LocationByPath) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -func (c LocationByPath) Less(i, j int) bool { - return c[i].Path > c[j].Path -} - // SSLCert describes a SSL certificate to be used in a server type SSLCert struct { metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/core/pkg/ingress/sort_ingress_test.go b/core/pkg/ingress/sort_ingress_test.go index b148e706a..52f00adee 100644 --- a/core/pkg/ingress/sort_ingress_test.go +++ b/core/pkg/ingress/sort_ingress_test.go @@ -22,347 +22,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func buildBackendByNameServers() BackendByNameServers { - return []*Backend{ - { - Name: "foo1", - Secure: true, - Endpoints: []Endpoint{}, - }, - { - Name: "foo2", - Secure: false, - Endpoints: []Endpoint{}, - }, - { - Name: "foo3", - Secure: true, - Endpoints: []Endpoint{}, - }, - } -} - -func TestBackendByNameServersLen(t *testing.T) { - fooTests := []struct { - backends BackendByNameServers - el int - }{ - {[]*Backend{}, 0}, - {buildBackendByNameServers(), 3}, - {nil, 0}, - } - - for _, fooTest := range fooTests { - r := fooTest.backends.Len() - if r != fooTest.el { - t.Errorf("returned %v but expected %v for the len of BackendByNameServers: %v", r, fooTest.el, fooTest.backends) - } - } -} - -func TestBackendByNameServersSwap(t *testing.T) { - fooTests := []struct { - backends BackendByNameServers - i int - j int - }{ - {buildBackendByNameServers(), 0, 1}, - {buildBackendByNameServers(), 2, 1}, - } - - for _, fooTest := range fooTests { - fooi := fooTest.backends[fooTest.i] - fooj := fooTest.backends[fooTest.j] - fooTest.backends.Swap(fooTest.i, fooTest.j) - if fooi.Name != fooTest.backends[fooTest.j].Name || fooj.Name != fooTest.backends[fooTest.i].Name { - t.Errorf("failed to swap for ByNameServers, foo: %v", fooTest) - } - } -} - -func TestBackendByNameServersLess(t *testing.T) { - fooTests := []struct { - backends BackendByNameServers - i int - j int - er bool - }{ - // order by name - {buildBackendByNameServers(), 0, 2, true}, - {buildBackendByNameServers(), 1, 0, false}, - } - - for _, fooTest := range fooTests { - r := fooTest.backends.Less(fooTest.i, fooTest.j) - if r != fooTest.er { - t.Errorf("returned %v but expected %v for the foo: %v", r, fooTest.er, fooTest) - } - } -} - -func buildEndpointByAddrPort() EndpointByAddrPort { - return []Endpoint{ - { - Address: "127.0.0.1", - Port: "8080", - MaxFails: 3, - FailTimeout: 10, - }, - { - Address: "127.0.0.1", - Port: "8081", - MaxFails: 3, - FailTimeout: 10, - }, - { - Address: "127.0.0.1", - Port: "8082", - MaxFails: 3, - FailTimeout: 10, - }, - { - Address: "127.0.0.2", - Port: "8082", - MaxFails: 3, - FailTimeout: 10, - }, - } -} - -func TestEndpointByAddrPortLen(t *testing.T) { - fooTests := []struct { - endpoints EndpointByAddrPort - el int - }{ - {[]Endpoint{}, 0}, - {buildEndpointByAddrPort(), 4}, - {nil, 0}, - } - - for _, fooTest := range fooTests { - r := fooTest.endpoints.Len() - if r != fooTest.el { - t.Errorf("returned %v but expected %v for the len of EndpointByAddrPort: %v", r, fooTest.el, fooTest.endpoints) - } - } -} - -func TestEndpointByAddrPortSwap(t *testing.T) { - fooTests := []struct { - endpoints EndpointByAddrPort - i int - j int - }{ - {buildEndpointByAddrPort(), 0, 1}, - {buildEndpointByAddrPort(), 2, 1}, - } - - for _, fooTest := range fooTests { - fooi := fooTest.endpoints[fooTest.i] - fooj := fooTest.endpoints[fooTest.j] - fooTest.endpoints.Swap(fooTest.i, fooTest.j) - if fooi.Port != fooTest.endpoints[fooTest.j].Port || - fooi.Address != fooTest.endpoints[fooTest.j].Address || - fooj.Port != fooTest.endpoints[fooTest.i].Port || - fooj.Address != fooTest.endpoints[fooTest.i].Address { - t.Errorf("failed to swap for EndpointByAddrPort, foo: %v", fooTest) - } - } -} - -func TestEndpointByAddrPortLess(t *testing.T) { - fooTests := []struct { - endpoints EndpointByAddrPort - i int - j int - er bool - }{ - // 1) order by name - // 2) order by port(if the name is the same one) - {buildEndpointByAddrPort(), 0, 1, true}, - {buildEndpointByAddrPort(), 2, 1, false}, - {buildEndpointByAddrPort(), 2, 3, true}, - } - - for _, fooTest := range fooTests { - r := fooTest.endpoints.Less(fooTest.i, fooTest.j) - if r != fooTest.er { - t.Errorf("returned %v but expected %v for the foo: %v", r, fooTest.er, fooTest) - } - } -} - -func buildServerByName() ServerByName { - return []*Server{ - { - Hostname: "foo1", - SSLPassthrough: true, - SSLCertificate: "foo1_cert", - SSLPemChecksum: "foo1_pem", - Locations: []*Location{}, - }, - { - Hostname: "foo2", - SSLPassthrough: true, - SSLCertificate: "foo2_cert", - SSLPemChecksum: "foo2_pem", - Locations: []*Location{}, - }, - { - Hostname: "foo3", - SSLPassthrough: false, - SSLCertificate: "foo3_cert", - SSLPemChecksum: "foo3_pem", - Locations: []*Location{}, - }, - { - Hostname: "_", - SSLPassthrough: false, - SSLCertificate: "foo4_cert", - SSLPemChecksum: "foo4_pem", - Locations: []*Location{}, - }, - } -} - -func TestServerByNameLen(t *testing.T) { - fooTests := []struct { - servers ServerByName - el int - }{ - {[]*Server{}, 0}, - {buildServerByName(), 4}, - {nil, 0}, - } - - for _, fooTest := range fooTests { - r := fooTest.servers.Len() - if r != fooTest.el { - t.Errorf("returned %v but expected %v for the len of ServerByName: %v", r, fooTest.el, fooTest.servers) - } - } -} - -func TestServerByNameSwap(t *testing.T) { - fooTests := []struct { - servers ServerByName - i int - j int - }{ - {buildServerByName(), 0, 1}, - {buildServerByName(), 2, 1}, - } - - for _, fooTest := range fooTests { - fooi := fooTest.servers[fooTest.i] - fooj := fooTest.servers[fooTest.j] - fooTest.servers.Swap(fooTest.i, fooTest.j) - if fooi.Hostname != fooTest.servers[fooTest.j].Hostname || - fooj.Hostname != fooTest.servers[fooTest.i].Hostname { - t.Errorf("failed to swap for ServerByName, foo: %v", fooTest) - } - } -} - -func TestServerByNameLess(t *testing.T) { - fooTests := []struct { - servers ServerByName - i int - j int - er bool - }{ - {buildServerByName(), 0, 1, true}, - {buildServerByName(), 2, 1, false}, - {buildServerByName(), 2, 3, false}, - } - - for _, fooTest := range fooTests { - r := fooTest.servers.Less(fooTest.i, fooTest.j) - if r != fooTest.er { - t.Errorf("returned %v but expected %v for the foo: %v", r, fooTest.er, fooTest) - } - } -} - -func buildLocationByPath() LocationByPath { - return []*Location{ - { - Path: "a", - IsDefBackend: true, - Backend: "a_back", - }, - { - Path: "b", - IsDefBackend: true, - Backend: "b_back", - }, - { - Path: "c", - IsDefBackend: true, - Backend: "c_back", - }, - } -} - -func TestLocationByPath(t *testing.T) { - fooTests := []struct { - locations LocationByPath - el int - }{ - {[]*Location{}, 0}, - {buildLocationByPath(), 3}, - {nil, 0}, - } - - for _, fooTest := range fooTests { - r := fooTest.locations.Len() - if r != fooTest.el { - t.Errorf("returned %v but expected %v for the len of LocationByPath: %v", r, fooTest.el, fooTest.locations) - } - } -} - -func TestLocationByPathSwap(t *testing.T) { - fooTests := []struct { - locations LocationByPath - i int - j int - }{ - {buildLocationByPath(), 0, 1}, - {buildLocationByPath(), 2, 1}, - } - - for _, fooTest := range fooTests { - fooi := fooTest.locations[fooTest.i] - fooj := fooTest.locations[fooTest.j] - fooTest.locations.Swap(fooTest.i, fooTest.j) - if fooi.Path != fooTest.locations[fooTest.j].Path || - fooj.Path != fooTest.locations[fooTest.i].Path { - t.Errorf("failed to swap for LocationByPath, foo: %v", fooTest) - } - } -} - -func TestLocationByPathLess(t *testing.T) { - fooTests := []struct { - locations LocationByPath - i int - j int - er bool - }{ - // sorts location by path in descending order - {buildLocationByPath(), 0, 1, false}, - {buildLocationByPath(), 2, 1, true}, - } - - for _, fooTest := range fooTests { - r := fooTest.locations.Less(fooTest.i, fooTest.j) - if r != fooTest.er { - t.Errorf("returned %v but expected %v for the foo: %v", r, fooTest.er, fooTest) - } - } -} - func TestGetObjectKindForSSLCert(t *testing.T) { fk := &SSLCert{ ObjectMeta: metav1.ObjectMeta{}, diff --git a/core/pkg/ingress/status/status.go b/core/pkg/ingress/status/status.go index ec0ad5d0e..9e571c2f4 100644 --- a/core/pkg/ingress/status/status.go +++ b/core/pkg/ingress/status/status.go @@ -45,7 +45,7 @@ import ( ) const ( - updateInterval = 30 * time.Second + updateInterval = 60 * time.Second ) // Sync ... @@ -56,14 +56,16 @@ type Sync interface { // Config ... type Config struct { - Client clientset.Interface + Client clientset.Interface + PublishService string - IngressLister store.IngressLister ElectionID string UpdateStatusOnShutdown bool + IngressLister store.IngressLister + DefaultIngressClass string IngressClass string @@ -293,7 +295,10 @@ func sliceToStatus(endpoints []string) []apiv1.LoadBalancerIngress { } } - sort.Sort(loadBalancerIngressByIP(lbi)) + sort.Slice(lbi, func(a, b int) bool { + return lbi[a].IP < lbi[b].IP + }) + return lbi } @@ -328,7 +333,10 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) { } curIPs := currIng.Status.LoadBalancer.Ingress - sort.Sort(loadBalancerIngressByIP(curIPs)) + sort.Slice(curIPs, func(a, b int) bool { + return curIPs[a].IP < curIPs[b].IP + }) + if ingressSliceEqual(addrs, curIPs) { glog.V(3).Infof("skipping update of Ingress %v/%v (no change)", currIng.Namespace, currIng.Name) return @@ -361,12 +369,3 @@ func ingressSliceEqual(lhs, rhs []apiv1.LoadBalancerIngress) bool { } return true } - -// loadBalancerIngressByIP sorts LoadBalancerIngress using the field IP -type loadBalancerIngressByIP []apiv1.LoadBalancerIngress - -func (c loadBalancerIngressByIP) Len() int { return len(c) } -func (c loadBalancerIngressByIP) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -func (c loadBalancerIngressByIP) Less(i, j int) bool { - return c[i].IP < c[j].IP -} diff --git a/core/pkg/ingress/status/status_test.go b/core/pkg/ingress/status/status_test.go index c7353a30d..0bc2d8544 100644 --- a/core/pkg/ingress/status/status_test.go +++ b/core/pkg/ingress/status/status_test.go @@ -18,7 +18,6 @@ package status import ( "os" - "sort" "testing" "time" @@ -35,7 +34,7 @@ import ( "k8s.io/ingress/core/pkg/task" ) -func buildLoadBalancerIngressByIP() loadBalancerIngressByIP { +func buildLoadBalancerIngressByIP() []apiv1.LoadBalancerIngress { return []apiv1.LoadBalancerIngress{ { IP: "10.0.0.1", @@ -232,6 +231,7 @@ func buildIngressListener() store.IngressLister { }, }, }) + return store.IngressLister{Store: s} } @@ -376,7 +376,6 @@ func TestRunningAddresessWithPods(t *testing.T) { func TestUpdateStatus(t *testing.T) { fk := buildStatusSync() newIPs := buildLoadBalancerIngressByIP() - sort.Sort(loadBalancerIngressByIP(newIPs)) fk.updateStatus(newIPs) fooIngress1, err1 := fk.Client.Extensions().Ingresses(apiv1.NamespaceDefault).Get("foo_ingress_1", metav1.GetOptions{}) @@ -460,61 +459,3 @@ func TestIngressSliceEqual(t *testing.T) { } } } - -func TestLoadBalancerIngressByIPLen(t *testing.T) { - fooTests := []struct { - ips loadBalancerIngressByIP - el int - }{ - {[]apiv1.LoadBalancerIngress{}, 0}, - {buildLoadBalancerIngressByIP(), 4}, - {nil, 0}, - } - - for _, fooTest := range fooTests { - r := fooTest.ips.Len() - if r != fooTest.el { - t.Errorf("returned %v but expected %v ", r, fooTest.el) - } - } -} - -func TestLoadBalancerIngressByIPSwap(t *testing.T) { - fooTests := []struct { - ips loadBalancerIngressByIP - i int - j int - }{ - {buildLoadBalancerIngressByIP(), 0, 1}, - {buildLoadBalancerIngressByIP(), 2, 1}, - } - - for _, fooTest := range fooTests { - fooi := fooTest.ips[fooTest.i] - fooj := fooTest.ips[fooTest.j] - fooTest.ips.Swap(fooTest.i, fooTest.j) - if fooi.IP != fooTest.ips[fooTest.j].IP || - fooj.IP != fooTest.ips[fooTest.i].IP { - t.Errorf("failed to swap for loadBalancerIngressByIP") - } - } -} - -func TestLoadBalancerIngressByIPLess(t *testing.T) { - fooTests := []struct { - ips loadBalancerIngressByIP - i int - j int - er bool - }{ - {buildLoadBalancerIngressByIP(), 0, 1, true}, - {buildLoadBalancerIngressByIP(), 2, 1, false}, - } - - for _, fooTest := range fooTests { - r := fooTest.ips.Less(fooTest.i, fooTest.j) - if r != fooTest.er { - t.Errorf("returned %v but expected %v ", r, fooTest.er) - } - } -} diff --git a/core/pkg/ingress/store/main.go b/core/pkg/ingress/store/main.go index 182899499..d65421de8 100644 --- a/core/pkg/ingress/store/main.go +++ b/core/pkg/ingress/store/main.go @@ -19,7 +19,7 @@ package store import ( "fmt" - api "k8s.io/api/core/v1" + apiv1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" ) @@ -29,20 +29,56 @@ type IngressLister struct { } // SecretsLister makes a Store that lists Secrets. -type SecretsLister struct { +type SecretLister struct { cache.Store } +// GetByName searches for a secret in the local secrets Store +func (sl *SecretLister) GetByName(name string) (*apiv1.Secret, error) { + s, exists, err := sl.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("secret %v was not found", name) + } + return s.(*apiv1.Secret), nil +} + // ConfigMapLister makes a Store that lists Configmaps. type ConfigMapLister struct { cache.Store } +// GetByName searches for a configmap in the local configmaps Store +func (cml *ConfigMapLister) GetByName(name string) (*apiv1.ConfigMap, error) { + s, exists, err := cml.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("configmap %v was not found", name) + } + return s.(*apiv1.ConfigMap), nil +} + // ServiceLister makes a Store that lists Services. type ServiceLister struct { cache.Store } +// GetByName searches for a service in the local secrets Store +func (sl *ServiceLister) GetByName(name string) (*apiv1.Service, error) { + s, exists, err := sl.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("service %v was not found", name) + } + return s.(*apiv1.Service), nil +} + // NodeLister makes a Store that lists Nodes. type NodeLister struct { cache.Store @@ -54,9 +90,9 @@ type EndpointLister struct { } // GetServiceEndpoints returns the endpoints of a service, matched on service name. -func (s *EndpointLister) GetServiceEndpoints(svc *api.Service) (ep api.Endpoints, err error) { +func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (ep apiv1.Endpoints, err error) { for _, m := range s.Store.List() { - ep = *m.(*api.Endpoints) + ep = *m.(*apiv1.Endpoints) if svc.Name == ep.Name && svc.Namespace == ep.Namespace { return ep, nil } @@ -64,8 +100,3 @@ func (s *EndpointLister) GetServiceEndpoints(svc *api.Service) (ep api.Endpoints err = fmt.Errorf("could not find endpoints for service: %v", svc.Name) return } - -// SecretLister makes a Store that lists Secres. -type SecretLister struct { - cache.Store -} diff --git a/core/pkg/ingress/types.go b/core/pkg/ingress/types.go index dceef00e6..a24ed5c5b 100644 --- a/core/pkg/ingress/types.go +++ b/core/pkg/ingress/types.go @@ -81,7 +81,7 @@ type Controller interface { SetConfig(*apiv1.ConfigMap) // SetListers allows the access of store listers present in the generic controller // This avoid the use of the kubernetes client. - SetListers(StoreLister) + SetListers(*StoreLister) // BackendDefaults returns the minimum settings required to configure the // communication to endpoints BackendDefaults() defaults.Backend @@ -309,7 +309,7 @@ type Location struct { // Proxy contains information about timeouts and buffer sizes // to be used in connections against endpoints // +optional - Proxy proxy.Configuration `json:"proxy,omitempty"` + Proxy *proxy.Configuration `json:"proxy,omitempty"` // UsePortInRedirects indicates if redirects must specify the port // +optional UsePortInRedirects bool `json:"usePortInRedirects"` diff --git a/core/pkg/ingress/types_equals.go b/core/pkg/ingress/types_equals.go index fc62def57..69a214d1e 100644 --- a/core/pkg/ingress/types_equals.go +++ b/core/pkg/ingress/types_equals.go @@ -382,7 +382,7 @@ func (l1 *Location) Equal(l2 *Location) bool { if !(&l1.Whitelist).Equal(&l2.Whitelist) { return false } - if !(&l1.Proxy).Equal(&l2.Proxy) { + if !(l1.Proxy).Equal(l2.Proxy) { return false } if l1.UsePortInRedirects != l2.UsePortInRedirects {