From fec3ddc6cc117607a641932f32ef44c8e8fcb20a Mon Sep 17 00:00:00 2001 From: Antoine Cotten Date: Fri, 13 Apr 2018 00:26:10 +0200 Subject: [PATCH 1/3] Sync secrets (SSL certificates) on events Remove scheduled check for missing secrets. --- internal/ingress/controller/controller.go | 15 +- .../ingress/controller/store/backend_ssl.go | 51 +-- .../ingress/controller/store/objectref.go | 130 +++++++ .../controller/store/objectref_test.go | 65 ++++ internal/ingress/controller/store/store.go | 335 ++++++++++-------- .../ingress/controller/store/store_test.go | 4 +- .../ingress/{sort_ingress.go => sslcert.go} | 0 .../{sort_ingress_test.go => sslcert_test.go} | 0 internal/task/queue.go | 4 +- 9 files changed, 395 insertions(+), 209 deletions(-) create mode 100644 internal/ingress/controller/store/objectref.go create mode 100644 internal/ingress/controller/store/objectref_test.go rename internal/ingress/{sort_ingress.go => sslcert.go} (100%) rename internal/ingress/{sort_ingress_test.go => sslcert_test.go} (100%) diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 400b49f1d..93acb2f53 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -40,7 +40,6 @@ import ( "k8s.io/ingress-nginx/internal/ingress/annotations/proxy" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" "k8s.io/ingress-nginx/internal/k8s" - "k8s.io/ingress-nginx/internal/task" ) const ( @@ -114,21 +113,13 @@ func (n NGINXController) GetPublishService() *apiv1.Service { // sync collects all the pieces required to assemble the configuration file and // then sends the content to the backend (OnUpdate) receiving the populated // template as response reloading the backend if is required. -func (n *NGINXController) syncIngress(item interface{}) error { +func (n *NGINXController) syncIngress(interface{}) error { n.syncRateLimiter.Accept() if n.syncQueue.IsShuttingDown() { return nil } - if element, ok := item.(task.Element); ok { - if name, ok := element.Key.(string); ok { - if ing, err := n.store.GetIngress(name); err == nil { - n.store.ReadSecrets(ing) - } - } - } - // Sort ingress rules using the ResourceVersion field ings := n.store.ListIngresses() sort.SliceStable(ings, func(i, j int) bool { @@ -869,7 +860,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, // Tries to fetch the default Certificate from nginx configuration. // If it does not exists, use the ones generated on Start() - defaultCertificate, err := n.store.GetLocalSecret(n.cfg.DefaultSSLCertificate) + defaultCertificate, err := n.store.GetLocalSSLCert(n.cfg.DefaultSSLCertificate) if err == nil { defaultPemFileName = defaultCertificate.PemFileName defaultPemSHA = defaultCertificate.PemSHA @@ -1039,7 +1030,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, } key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName) - cert, err := n.store.GetLocalSecret(key) + cert, err := n.store.GetLocalSSLCert(key) if err != nil { glog.Warningf("ssl certificate \"%v\" does not exist in local store", key) continue diff --git a/internal/ingress/controller/store/backend_ssl.go b/internal/ingress/controller/store/backend_ssl.go index 9f4fbc0ee..9ea86d77f 100644 --- a/internal/ingress/controller/store/backend_ssl.go +++ b/internal/ingress/controller/store/backend_ssl.go @@ -29,7 +29,6 @@ import ( "k8s.io/ingress-nginx/internal/file" "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/annotations/parser" "k8s.io/ingress-nginx/internal/k8s" "k8s.io/ingress-nginx/internal/net/ssl" ) @@ -51,7 +50,7 @@ func (s k8sStore) syncSecret(key string) { } // create certificates and add or update the item in the store - cur, err := s.GetLocalSecret(key) + cur, err := s.GetLocalSSLCert(key) if err == nil { if cur.Equal(cert) { // no need to update @@ -129,9 +128,9 @@ func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error) } func (s k8sStore) checkSSLChainIssues() { - for _, item := range s.ListLocalSecrets() { + for _, item := range s.ListLocalSSLCerts() { secretName := k8s.MetaNamespaceKey(item) - secret, err := s.GetLocalSecret(secretName) + secret, err := s.GetLocalSSLCert(secretName) if err != nil { continue } @@ -179,50 +178,6 @@ func (s k8sStore) checkSSLChainIssues() { } } -// checkMissingSecrets verifies if one or more ingress rules contains -// a reference to a secret that is not present in the local secret store. -func (s k8sStore) checkMissingSecrets() { - for _, ing := range s.ListIngresses() { - for _, tls := range ing.Spec.TLS { - if tls.SecretName == "" { - continue - } - - key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) - if _, ok := s.sslStore.Get(key); !ok { - s.syncSecret(key) - } - } - - key, _ := parser.GetStringAnnotation("auth-tls-secret", ing) - if key == "" { - continue - } - - if _, ok := s.sslStore.Get(key); !ok { - s.syncSecret(key) - } - } -} - -// ReadSecrets extracts information about secrets from an Ingress rule -func (s k8sStore) ReadSecrets(ing *extensions.Ingress) { - for _, tls := range ing.Spec.TLS { - if tls.SecretName == "" { - continue - } - - key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) - s.syncSecret(key) - } - - key, _ := parser.GetStringAnnotation("auth-tls-secret", ing) - if key == "" { - return - } - s.syncSecret(key) -} - // sendDummyEvent sends a dummy event to trigger an update // This is used in when a secret change func (s *k8sStore) sendDummyEvent() { diff --git a/internal/ingress/controller/store/objectref.go b/internal/ingress/controller/store/objectref.go new file mode 100644 index 000000000..9ef13bf07 --- /dev/null +++ b/internal/ingress/controller/store/objectref.go @@ -0,0 +1,130 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "sync" + + "k8s.io/apimachinery/pkg/util/sets" +) + +// ObjectRefMap is a map of references from object(s) to object (1:n). It is +// used to keep track of which data objects (Secrets) are used within Ingress +// objects. +type ObjectRefMap interface { + Insert(consumer string, ref ...string) + Delete(consumer string) + Len() int + Has(ref string) bool + HasConsumer(consumer string) bool + Reference(ref string) []string + ReferencedBy(consumer string) []string +} + +type objectRefMap struct { + sync.Mutex + v map[string]sets.String +} + +// NewObjectRefMap returns a new ObjectRefMap. +func NewObjectRefMap() ObjectRefMap { + return &objectRefMap{ + v: make(map[string]sets.String), + } +} + +// Insert adds a consumer to one or more referenced objects. +func (o *objectRefMap) Insert(consumer string, ref ...string) { + o.Lock() + defer o.Unlock() + + for _, r := range ref { + if _, ok := o.v[r]; !ok { + o.v[r] = sets.NewString(consumer) + continue + } + o.v[r].Insert(consumer) + } +} + +// Delete deletes a consumer from all referenced objects. +func (o *objectRefMap) Delete(consumer string) { + o.Lock() + defer o.Unlock() + + for ref, consumers := range o.v { + consumers.Delete(consumer) + if consumers.Len() == 0 { + delete(o.v, ref) + } + } +} + +// Len returns the count of referenced objects. +func (o *objectRefMap) Len() int { + return len(o.v) +} + +// Has returns whether the given object is referenced by any other object. +func (o *objectRefMap) Has(ref string) bool { + o.Lock() + defer o.Unlock() + + if _, ok := o.v[ref]; ok { + return true + } + return false +} + +// HasConsumer returns whether the store contains the given consumer. +func (o *objectRefMap) HasConsumer(consumer string) bool { + o.Lock() + defer o.Unlock() + + for _, consumers := range o.v { + if consumers.Has(consumer) { + return true + } + } + return false +} + +// Reference returns all objects referencing the given object. +func (o *objectRefMap) Reference(ref string) []string { + o.Lock() + defer o.Unlock() + + consumers, ok := o.v[ref] + if !ok { + return make([]string, 0) + } + return consumers.List() +} + +// ReferencedBy returns all objects referenced by the given object. +func (o *objectRefMap) ReferencedBy(consumer string) []string { + o.Lock() + defer o.Unlock() + + refs := make([]string, 0) + for ref, consumers := range o.v { + if consumers.Has(consumer) { + refs = append(refs, ref) + } + } + return refs +} diff --git a/internal/ingress/controller/store/objectref_test.go b/internal/ingress/controller/store/objectref_test.go new file mode 100644 index 000000000..06eed4282 --- /dev/null +++ b/internal/ingress/controller/store/objectref_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import "testing" + +func TestObjectRefMapOperations(t *testing.T) { + orm := NewObjectRefMap() + + items := []struct { + consumer string + ref []string + }{ + {"ns/ingress1", []string{"ns/tls1"}}, + {"ns/ingress2", []string{"ns/tls1", "ns/tls2"}}, + {"ns/ingress3", []string{"ns/tls1", "ns/tls2", "ns/tls3"}}, + } + + // populate map with test data + for _, i := range items { + orm.Insert(i.consumer, i.ref...) + } + if l := orm.Len(); l != 3 { + t.Fatalf("Expected 3 referenced objects (got %d)", l) + } + + // add already existing item + orm.Insert("ns/ingress1", "ns/tls1") + if l := len(orm.ReferencedBy("ns/ingress1")); l != 1 { + t.Error("Expected existing item not to be added again") + } + + // find consumer by name + if !orm.HasConsumer("ns/ingress1") { + t.Error("Expected the \"ns/ingress1\" consumer to exist in the map") + } + + // count references to object + if l := len(orm.Reference("ns/tls1")); l != 3 { + t.Errorf("Expected \"ns/tls1\" to be referenced by 3 objects (got %d)", l) + } + + // delete consumer + orm.Delete("ns/ingress3") + if l := orm.Len(); l != 2 { + t.Errorf("Expected 2 referenced objects (got %d)", l) + } + if orm.Has("ns/tls3") { + t.Error("Expected \"ns/tls3\" not to be referenced") + } +} diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index c83e71a7c..7a0f61437 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -27,16 +27,15 @@ import ( "github.com/eapache/channels" "github.com/golang/glog" - apiv1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -59,15 +58,15 @@ type Storer interface { GetBackendConfiguration() ngx_config.Configuration // GetConfigMap returns a ConfigmMap using the namespace and name as key - GetConfigMap(key string) (*apiv1.ConfigMap, error) + GetConfigMap(key string) (*corev1.ConfigMap, error) // GetSecret returns a Secret using the namespace and name as key - GetSecret(key string) (*apiv1.Secret, error) + GetSecret(key string) (*corev1.Secret, error) // GetService returns a Service using the namespace and name as key - GetService(key string) (*apiv1.Service, error) + GetService(key string) (*corev1.Service, error) - GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) + GetServiceEndpoints(svc *corev1.Service) (*corev1.Endpoints, error) // GetSecret returns an Ingress using the namespace and name as key GetIngress(key string) (*extensions.Ingress, error) @@ -78,11 +77,11 @@ type Storer interface { // GetIngressAnnotations returns the annotations associated to an Ingress GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) - // GetLocalSecret returns the local copy of a Secret - GetLocalSecret(name string) (*ingress.SSLCert, error) + // GetLocalSSLCert returns the local copy of a SSLCert + GetLocalSSLCert(name string) (*ingress.SSLCert, error) - // ListLocalSecrets returns the list of local Secrets - ListLocalSecrets() []*ingress.SSLCert + // ListLocalSSLCerts returns the list of local SSLCerts + ListLocalSSLCerts() []*ingress.SSLCert // GetAuthCertificate resolves a given secret name into an SSL certificate. // The secret must contain 3 keys named: @@ -94,9 +93,6 @@ type Storer interface { // Run initiates the synchronization of the controllers Run(stopCh chan struct{}) - - // ReadSecrets extracts information about secrets from an Ingress rule - ReadSecrets(*extensions.Ingress) } // EventType type of event associated with an informer @@ -109,9 +105,8 @@ const ( UpdateEvent EventType = "UPDATE" // DeleteEvent event associated when an object is removed from an informer DeleteEvent EventType = "DELETE" - // ConfigurationEvent event associated when a configuration object is created or updated + // ConfigurationEvent event associated when a controller configuration object is created or updated ConfigurationEvent EventType = "CONFIGURATION" - slash = "/" ) // Event holds the context of an event @@ -196,14 +191,14 @@ type k8sStore struct { // secretIngressMap contains information about which ingress references a // secret in the annotations. - secretIngressMap map[string]sets.String + secretIngressMap ObjectRefMap filesystem file.Filesystem // updateCh updateCh *channels.RingChannel - // mu mutex used to avoid simultaneous incovations to syncSecret + // mu protects against simultaneous invocations of syncSecret mu *sync.Mutex defaultSSLCertificate string @@ -226,16 +221,16 @@ func New(checkOCSP bool, updateCh: updateCh, backendConfig: ngx_config.NewDefault(), mu: &sync.Mutex{}, - secretIngressMap: make(map[string]sets.String), + secretIngressMap: NewObjectRefMap(), defaultSSLCertificate: defaultSSLCertificate, } eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ + eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{ Interface: client.CoreV1().Events(namespace), }) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{ Component: "nginx-ingress-controller", }) @@ -264,22 +259,25 @@ func New(checkOCSP bool, ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - addIng := obj.(*extensions.Ingress) - if !class.IsValid(addIng) { - a, _ := parser.GetStringAnnotation(class.IngressKey, addIng) - glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a) + ing := obj.(*extensions.Ingress) + if !class.IsValid(ing) { + a, _ := parser.GetStringAnnotation(class.IngressKey, ing) + glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a) return } - store.extractAnnotations(addIng) - recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) + store.extractAnnotations(ing) + store.updateSecretIngressMap(ing) + store.syncSecrets(ing) + + recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, } }, DeleteFunc: func(obj interface{}) { - delIng, ok := obj.(*extensions.Ingress) + ing, ok := obj.(*extensions.Ingress) if !ok { // If we reached here it means the ingress was deleted but its final state is unrecorded. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -287,18 +285,23 @@ func New(checkOCSP bool, glog.Errorf("couldn't get object from tombstone %#v", obj) return } - delIng, ok = tombstone.Obj.(*extensions.Ingress) + ing, ok = tombstone.Obj.(*extensions.Ingress) if !ok { glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj) return } } - if !class.IsValid(delIng) { - glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey) + if !class.IsValid(ing) { + glog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey) return } - recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name)) - store.listers.IngressAnnotation.Delete(delIng) + + store.listers.IngressAnnotation.Delete(ing) + + key := k8s.MetaNamespaceKey(ing) + store.secretIngressMap.Delete(key) + + recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) updateCh.In() <- Event{ Type: DeleteEvent, Obj: obj, @@ -311,15 +314,18 @@ func New(checkOCSP bool, validCur := class.IsValid(curIng) if !validOld && validCur { glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey) - recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } else if validOld && !validCur { glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey) - recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } else if validCur && !reflect.DeepEqual(old, cur) { - recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) + recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } store.extractAnnotations(curIng) + store.updateSecretIngressMap(curIng) + store.syncSecrets(curIng) + updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, @@ -328,39 +334,64 @@ func New(checkOCSP bool, } secrEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + sec := obj.(*corev1.Secret) + key := k8s.MetaNamespaceKey(sec) + + if store.defaultSSLCertificate == key { + store.syncSecret(store.defaultSSLCertificate) + } + + // find references in ingresses and update local ssl certs + if ings := store.secretIngressMap.Reference(key); len(ings) > 0 { + glog.Infof("secret %v was added and it is used in ingress annotations. Parsing...", key) + for _, ingKey := range ings { + ing, err := store.GetIngress(ingKey) + if err != nil { + glog.Errorf("could not find Ingress %v in local store", ingKey) + continue + } + store.extractAnnotations(ing) + store.updateSecretIngressMap(ing) + store.syncSecrets(ing) + } + updateCh.In() <- Event{ + Type: UpdateEvent, + Obj: obj, + } + } + }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { - sec := cur.(*apiv1.Secret) - key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) + sec := cur.(*corev1.Secret) + key := k8s.MetaNamespaceKey(sec) - // parse the ingress annotations (again) - if set, ok := store.secretIngressMap[key]; ok { - glog.Infof("secret %v changed and it is used in ingress annotations. Parsing...", key) - _, err := store.GetLocalSecret(k8s.MetaNamespaceKey(sec)) - if err == nil { - store.syncSecret(key) - updateCh.In() <- Event{ - Type: UpdateEvent, - Obj: cur, + if store.defaultSSLCertificate == key { + store.syncSecret(store.defaultSSLCertificate) + } + + // find references in ingresses and update local ssl certs + if ings := store.secretIngressMap.Reference(key); len(ings) > 0 { + glog.Infof("secret %v was updated and it is used in ingress annotations. Parsing...", key) + for _, ingKey := range ings { + ing, err := store.GetIngress(ingKey) + if err != nil { + glog.Errorf("could not find Ingress %v in local store", ingKey) + continue } + store.extractAnnotations(ing) + store.updateSecretIngressMap(ing) + store.syncSecrets(ing) } - - for _, name := range set.List() { - ing, _ := store.GetIngress(name) - if ing != nil { - store.extractAnnotations(ing) - } - } - updateCh.In() <- Event{ - Type: ConfigurationEvent, + Type: UpdateEvent, Obj: cur, } } } }, DeleteFunc: func(obj interface{}) { - sec, ok := obj.(*apiv1.Secret) + sec, ok := obj.(*corev1.Secret) if !ok { // If we reached here it means the secret was deleted but its final state is unrecorded. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -368,32 +399,32 @@ func New(checkOCSP bool, glog.Errorf("couldn't get object from tombstone %#v", obj) return } - sec, ok = tombstone.Obj.(*apiv1.Secret) + sec, ok = tombstone.Obj.(*corev1.Secret) if !ok { glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj) return } } + store.sslStore.Delete(k8s.MetaNamespaceKey(sec)) - updateCh.In() <- Event{ - Type: DeleteEvent, - Obj: obj, - } - // parse the ingress annotations (again)c - key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name) - if set, ok := store.secretIngressMap[key]; ok { - glog.Infof("secret %v was removed and it is used in ingress annotations. Parsing...", key) - for _, name := range set.List() { - ing, _ := store.GetIngress(name) - if ing != nil { - store.extractAnnotations(ing) + key := k8s.MetaNamespaceKey(sec) + + // find references in ingresses + if ings := store.secretIngressMap.Reference(key); len(ings) > 0 { + glog.Infof("secret %v was deleted and it is used in ingress annotations. Parsing...", key) + for _, ingKey := range ings { + ing, err := store.GetIngress(ingKey) + if err != nil { + glog.Errorf("could not find Ingress %v in local store", ingKey) + continue } + store.extractAnnotations(ing) + store.updateSecretIngressMap(ing) } - updateCh.In() <- Event{ - Type: ConfigurationEvent, - Obj: sec, + Type: DeleteEvent, + Obj: obj, } } }, @@ -413,9 +444,9 @@ func New(checkOCSP bool, } }, UpdateFunc: func(old, cur interface{}) { - oep := old.(*apiv1.Endpoints) - ocur := cur.(*apiv1.Endpoints) - if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) { + oep := old.(*corev1.Endpoints) + cep := cur.(*corev1.Endpoints) + if !reflect.DeepEqual(cep.Subsets, oep.Subsets) { updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, @@ -424,13 +455,17 @@ func New(checkOCSP bool, }, } - mapEventHandler := cache.ResourceEventHandlerFuncs{ + cmEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - m := obj.(*apiv1.ConfigMap) - mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name) - if mapKey == configmap { - glog.V(2).Infof("adding configmap %v to backend", mapKey) - store.setConfig(m) + cm := obj.(*corev1.ConfigMap) + key := k8s.MetaNamespaceKey(cm) + // updates to configuration configmaps can trigger an update + if key == configmap || key == tcp || key == udp { + glog.V(2).Infof("adding configmap %v to backend", key) + recorder.Eventf(cm, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("ConfigMap %v", key)) + if key == configmap { + store.setConfig(cm) + } updateCh.In() <- Event{ Type: ConfigurationEvent, Obj: obj, @@ -439,19 +474,14 @@ func New(checkOCSP bool, }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { - m := cur.(*apiv1.ConfigMap) - mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name) - if mapKey == configmap { - recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) - store.setConfig(m) - updateCh.In() <- Event{ - Type: ConfigurationEvent, - Obj: cur, - } - } + cm := cur.(*corev1.ConfigMap) + key := k8s.MetaNamespaceKey(cm) // updates to configuration configmaps can trigger an update - if mapKey == tcp || mapKey == udp { - recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey)) + if key == configmap || key == tcp || key == udp { + recorder.Eventf(cm, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", key)) + if key == configmap { + store.setConfig(cm) + } updateCh.In() <- Event{ Type: ConfigurationEvent, Obj: cur, @@ -464,7 +494,7 @@ func New(checkOCSP bool, store.informers.Ingress.AddEventHandler(ingEventHandler) store.informers.Endpoint.AddEventHandler(epEventHandler) store.informers.Secret.AddEventHandler(secrEventHandler) - store.informers.ConfigMap.AddEventHandler(mapEventHandler) + store.informers.ConfigMap.AddEventHandler(cmEventHandler) store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{}) return store @@ -473,46 +503,74 @@ func New(checkOCSP bool, // extractAnnotations parses ingress annotations converting the value of the // annotation to a go struct and also information about the referenced secrets func (s *k8sStore) extractAnnotations(ing *extensions.Ingress) { - key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name) - glog.V(3).Infof("updating annotations information for ingres %v", key) + key := k8s.MetaNamespaceKey(ing) + glog.V(3).Infof("updating annotations information for ingress %v", key) anns := s.annotations.Extract(ing) - secName := anns.BasicDigestAuth.Secret - if secName != "" { - if _, ok := s.secretIngressMap[secName]; !ok { - s.secretIngressMap[secName] = sets.String{} - } - v := s.secretIngressMap[secName] - if !v.Has(key) { - v.Insert(key) - } - } - - secName = anns.CertificateAuth.Secret - if secName != "" { - if _, ok := s.secretIngressMap[secName]; !ok { - s.secretIngressMap[secName] = sets.String{} - } - v := s.secretIngressMap[secName] - if !v.Has(key) { - v.Insert(key) - } - } - err := s.listers.IngressAnnotation.Update(anns) if err != nil { glog.Error(err) } } +// updateSecretIngressMap takes an Ingress and updates all Secret objects it +// references in secretIngressMap. +func (s *k8sStore) updateSecretIngressMap(ing *extensions.Ingress) { + key := k8s.MetaNamespaceKey(ing) + glog.V(3).Infof("updating references to secrets for ingress %v", key) + + // delete all existing references first + s.secretIngressMap.Delete(key) + + var refSecrets []string + + for _, tls := range ing.Spec.TLS { + secrName := tls.SecretName + if secrName != "" { + secrKey := fmt.Sprintf("%v/%v", ing.Namespace, secrName) + refSecrets = append(refSecrets, secrKey) + } + } + + anns, err := s.GetIngressAnnotations(ing) + if err != nil { + glog.Errorf("Error reading Ingress annotations: %v", err) + return + } + + secretAnnotations := []string{ + anns.BasicDigestAuth.Secret, + anns.CertificateAuth.Secret, + } + + for _, secrName := range secretAnnotations { + if secrName != "" { + secrKey := fmt.Sprintf("%v/%v", ing.Namespace, secrName) + refSecrets = append(refSecrets, secrKey) + } + } + + // populate map with all secret references + s.secretIngressMap.Insert(key, refSecrets...) +} + +// syncSecrets synchronizes data from all Secrets referenced by the given +// Ingress with the local store and file system. +func (s k8sStore) syncSecrets(ing *extensions.Ingress) { + key := k8s.MetaNamespaceKey(ing) + for _, secrKey := range s.secretIngressMap.ReferencedBy(key) { + s.syncSecret(secrKey) + } +} + // GetSecret returns a Secret using the namespace and name as key -func (s k8sStore) GetSecret(key string) (*apiv1.Secret, error) { +func (s k8sStore) GetSecret(key string) (*corev1.Secret, error) { return s.listers.Secret.ByKey(key) } -// ListLocalSecrets returns the list of local Secrets -func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert { +// ListLocalSSLCerts returns the list of local SSLCerts +func (s k8sStore) ListLocalSSLCerts() []*ingress.SSLCert { var certs []*ingress.SSLCert for _, item := range s.sslStore.List() { if s, ok := item.(*ingress.SSLCert); ok { @@ -524,7 +582,7 @@ func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert { } // GetService returns a Service using the namespace and name as key -func (s k8sStore) GetService(key string) (*apiv1.Service, error) { +func (s k8sStore) GetService(key string) (*corev1.Service, error) { return s.listers.Service.ByKey(key) } @@ -545,7 +603,7 @@ func (s k8sStore) ListIngresses() []*extensions.Ingress { for ri, rule := range ing.Spec.Rules { for pi, path := range rule.HTTP.Paths { if path.Path == "" { - ing.Spec.Rules[ri].HTTP.Paths[pi].Path = slash + ing.Spec.Rules[ri].HTTP.Paths[pi].Path = "/" } } } @@ -557,7 +615,7 @@ func (s k8sStore) ListIngresses() []*extensions.Ingress { // GetIngressAnnotations returns the annotations associated to an Ingress func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) { - key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name) + key := k8s.MetaNamespaceKey(ing) item, exists, err := s.listers.IngressAnnotation.GetByKey(key) if err != nil { return &annotations.Ingress{}, fmt.Errorf("unexpected error getting ingress annotation %v: %v", key, err) @@ -568,26 +626,26 @@ func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.I return item.(*annotations.Ingress), nil } -// GetLocalSecret returns the local copy of a Secret -func (s k8sStore) GetLocalSecret(key string) (*ingress.SSLCert, error) { +// GetLocalSSLCert returns the local copy of a SSLCert +func (s k8sStore) GetLocalSSLCert(key string) (*ingress.SSLCert, error) { return s.sslStore.ByKey(key) } -func (s k8sStore) GetConfigMap(key string) (*apiv1.ConfigMap, error) { +func (s k8sStore) GetConfigMap(key string) (*corev1.ConfigMap, error) { return s.listers.ConfigMap.ByKey(key) } -func (s k8sStore) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) { +func (s k8sStore) GetServiceEndpoints(svc *corev1.Service) (*corev1.Endpoints, error) { return s.listers.Endpoint.GetServiceEndpoints(svc) } // GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret func (s k8sStore) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) { - if _, err := s.GetLocalSecret(name); err != nil { + if _, err := s.GetLocalSSLCert(name); err != nil { s.syncSecret(name) } - cert, err := s.GetLocalSecret(name) + cert, err := s.GetLocalSSLCert(name) if err != nil { return nil, err } @@ -608,7 +666,7 @@ func (s k8sStore) GetBackendConfiguration() ngx_config.Configuration { return s.backendConfig } -func (s *k8sStore) setConfig(cmap *apiv1.ConfigMap) { +func (s *k8sStore) setConfig(cmap *corev1.ConfigMap) { s.backendConfig = ngx_template.ReadConfig(cmap.Data) // TODO: this should not be done here @@ -628,19 +686,6 @@ func (s k8sStore) Run(stopCh chan struct{}) { // start informers s.informers.Run(stopCh) - // initial sync of secrets to avoid unnecessary reloads - glog.Info("running initial sync of secrets") - for _, ing := range s.ListIngresses() { - s.ReadSecrets(ing) - } - - if s.defaultSSLCertificate != "" { - s.syncSecret(s.defaultSSLCertificate) - } - - // start goroutine to check for missing local secrets - go wait.Until(s.checkMissingSecrets, 10*time.Second, stopCh) - if s.isOCSPCheckEnabled { go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh) } diff --git a/internal/ingress/controller/store/store_test.go b/internal/ingress/controller/store/store_test.go index 9f506aa7a..7ce9923e0 100644 --- a/internal/ingress/controller/store/store_test.go +++ b/internal/ingress/controller/store/store_test.go @@ -88,7 +88,7 @@ func TestStore(t *testing.T) { t.Errorf("expected an Ingres but none returned") } - ls, err := storer.GetLocalSecret(key) + ls, err := storer.GetLocalSSLCert(key) if err == nil { t.Errorf("expected an error but none returned") } @@ -467,7 +467,7 @@ func TestStore(t *testing.T) { } secretName := fmt.Sprintf("%v/%v", ns.Name, name) - sslCert, err := storer.GetLocalSecret(secretName) + sslCert, err := storer.GetLocalSSLCert(secretName) if err != nil { t.Errorf("unexpected error reading local secret %v: %v", secretName, err) } diff --git a/internal/ingress/sort_ingress.go b/internal/ingress/sslcert.go similarity index 100% rename from internal/ingress/sort_ingress.go rename to internal/ingress/sslcert.go diff --git a/internal/ingress/sort_ingress_test.go b/internal/ingress/sslcert_test.go similarity index 100% rename from internal/ingress/sort_ingress_test.go rename to internal/ingress/sslcert_test.go diff --git a/internal/task/queue.go b/internal/task/queue.go index 9b05f1653..3b4c0e41c 100644 --- a/internal/task/queue.go +++ b/internal/task/queue.go @@ -42,9 +42,9 @@ type Queue struct { sync func(interface{}) error // workerDone is closed when the worker exits workerDone chan bool - + // fn makes a key for an API object fn func(obj interface{}) (interface{}, error) - + // lastSync is the Unix epoch time of the last execution of 'sync' lastSync int64 } From c786f55336c3868067adf96465e0305f4752bc0d Mon Sep 17 00:00:00 2001 From: Antoine Cotten Date: Fri, 13 Apr 2018 10:44:34 +0200 Subject: [PATCH 2/3] Include missing secrets in secretIngressMap Update secretIngressMap independently from stored annotations, which may miss some secret references. --- internal/ingress/controller/store/store.go | 24 ++++++++++------------ 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index 7a0f61437..6df0520b1 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -352,7 +352,6 @@ func New(checkOCSP bool, continue } store.extractAnnotations(ing) - store.updateSecretIngressMap(ing) store.syncSecrets(ing) } updateCh.In() <- Event{ @@ -380,7 +379,6 @@ func New(checkOCSP bool, continue } store.extractAnnotations(ing) - store.updateSecretIngressMap(ing) store.syncSecrets(ing) } updateCh.In() <- Event{ @@ -420,7 +418,6 @@ func New(checkOCSP bool, continue } store.extractAnnotations(ing) - store.updateSecretIngressMap(ing) } updateCh.In() <- Event{ Type: DeleteEvent, @@ -533,18 +530,19 @@ func (s *k8sStore) updateSecretIngressMap(ing *extensions.Ingress) { } } - anns, err := s.GetIngressAnnotations(ing) - if err != nil { - glog.Errorf("Error reading Ingress annotations: %v", err) - return - } - + // We can not rely on cached ingress annotations because these are + // discarded when the referenced secret does not exist in the local + // store. As a result, adding a secret *after* the ingress(es) which + // references it would not trigger a resync of that secret. secretAnnotations := []string{ - anns.BasicDigestAuth.Secret, - anns.CertificateAuth.Secret, + "auth-secret", + "auth-tls-secret", } - - for _, secrName := range secretAnnotations { + for _, ann := range secretAnnotations { + secrName, err := parser.GetStringAnnotation(ann, ing) + if err != nil { + continue + } if secrName != "" { secrKey := fmt.Sprintf("%v/%v", ing.Namespace, secrName) refSecrets = append(refSecrets, secrKey) From 0a563651db7a3e7c92080a48f6f2912d246f0785 Mon Sep 17 00:00:00 2001 From: Antoine Cotten Date: Fri, 13 Apr 2018 15:34:42 +0200 Subject: [PATCH 3/3] Add test for channel events with referenced secret --- internal/ingress/controller/store/store.go | 7 +- .../ingress/controller/store/store_test.go | 137 +++++++++++++++++- 2 files changed, 132 insertions(+), 12 deletions(-) diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index 6df0520b1..5bb978dfd 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -265,12 +265,12 @@ func New(checkOCSP bool, glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a) return } + recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) store.extractAnnotations(ing) store.updateSecretIngressMap(ing) store.syncSecrets(ing) - recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, @@ -295,13 +295,13 @@ func New(checkOCSP bool, glog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey) return } + recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) store.listers.IngressAnnotation.Delete(ing) key := k8s.MetaNamespaceKey(ing) store.secretIngressMap.Delete(key) - recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) updateCh.In() <- Event{ Type: DeleteEvent, Obj: obj, @@ -355,7 +355,7 @@ func New(checkOCSP bool, store.syncSecrets(ing) } updateCh.In() <- Event{ - Type: UpdateEvent, + Type: CreateEvent, Obj: obj, } } @@ -458,7 +458,6 @@ func New(checkOCSP bool, key := k8s.MetaNamespaceKey(cm) // updates to configuration configmaps can trigger an update if key == configmap || key == tcp || key == udp { - glog.V(2).Infof("adding configmap %v to backend", key) recorder.Eventf(cm, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("ConfigMap %v", key)) if key == configmap { store.setConfig(cm) diff --git a/internal/ingress/controller/store/store_test.go b/internal/ingress/controller/store/store_test.go index 7ce9923e0..3549197db 100644 --- a/internal/ingress/controller/store/store_test.go +++ b/internal/ingress/controller/store/store_test.go @@ -116,7 +116,7 @@ func TestStore(t *testing.T) { close(stopCh) }) - t.Run("should return ingress one event for add, update and delete", func(t *testing.T) { + t.Run("should return one event for add, update and delete of ingress", func(t *testing.T) { ns := createNamespace(clientSet, t) defer deleteNamespace(ns, clientSet, t) @@ -260,7 +260,7 @@ func TestStore(t *testing.T) { close(stopCh) }) - t.Run("should not receive events from new secret no referenced from ingress", func(t *testing.T) { + t.Run("should not receive events from secret not referenced from ingress", func(t *testing.T) { ns := createNamespace(clientSet, t) defer deleteNamespace(ns, clientSet, t) @@ -307,13 +307,16 @@ func TestStore(t *testing.T) { storer.Run(stopCh) - secretName := "no-referenced" + secretName := "not-referenced" _, _, _, err = framework.CreateIngressTLSSecret(clientSet, []string{"foo"}, secretName, ns.Name) if err != nil { t.Errorf("unexpected error creating secret: %v", err) } - time.Sleep(1 * time.Second) + err = framework.WaitForSecretInNamespace(clientSet, ns.Name, secretName) + if err != nil { + t.Errorf("unexpected error waiting for secret: %v", err) + } if atomic.LoadUint64(&add) != 0 { t.Errorf("expected 0 events of type Create but %v occurred", add) @@ -338,6 +341,118 @@ func TestStore(t *testing.T) { if atomic.LoadUint64(&upd) != 0 { t.Errorf("expected 0 events of type Update but %v occurred", upd) } + if atomic.LoadUint64(&del) != 0 { + t.Errorf("expected 0 events of type Delete but %v occurred", del) + } + + updateCh.Close() + close(stopCh) + }) + + t.Run("should receive events from secret referenced from ingress", func(t *testing.T) { + ns := createNamespace(clientSet, t) + defer deleteNamespace(ns, clientSet, t) + + stopCh := make(chan struct{}) + updateCh := channels.NewRingChannel(1024) + + var add uint64 + var upd uint64 + var del uint64 + + go func(ch *channels.RingChannel) { + for { + evt, ok := <-ch.Out() + if !ok { + return + } + + e := evt.(Event) + if e.Obj == nil { + continue + } + switch e.Type { + case CreateEvent: + atomic.AddUint64(&add, 1) + case UpdateEvent: + atomic.AddUint64(&upd, 1) + case DeleteEvent: + atomic.AddUint64(&del, 1) + } + } + }(updateCh) + + fs := newFS(t) + storer := New(true, + ns.Name, + fmt.Sprintf("%v/config", ns.Name), + fmt.Sprintf("%v/tcp", ns.Name), + fmt.Sprintf("%v/udp", ns.Name), + "", + 10*time.Minute, + clientSet, + fs, + updateCh) + + storer.Run(stopCh) + + ingressName := "ingress-with-secret" + secretName := "referenced" + + _, err := ensureIngress(&v1beta1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: ingressName, + Namespace: ns.Name, + }, + Spec: v1beta1.IngressSpec{ + TLS: []v1beta1.IngressTLS{ + { + SecretName: secretName, + }, + }, + Backend: &v1beta1.IngressBackend{ + ServiceName: "http-svc", + ServicePort: intstr.FromInt(80), + }, + }, + }, clientSet) + if err != nil { + t.Errorf("unexpected error creating ingress: %v", err) + } + + err = framework.WaitForIngressInNamespace(clientSet, ns.Name, ingressName) + if err != nil { + t.Errorf("unexpected error waiting for secret: %v", err) + } + + _, _, _, err = framework.CreateIngressTLSSecret(clientSet, []string{"foo"}, secretName, ns.Name) + if err != nil { + t.Errorf("unexpected error creating secret: %v", err) + } + + err = framework.WaitForSecretInNamespace(clientSet, ns.Name, secretName) + if err != nil { + t.Errorf("unexpected error waiting for secret: %v", err) + } + + // take into account secret sync + time.Sleep(3 * time.Second) + + if atomic.LoadUint64(&add) != 2 { + t.Errorf("expected 2 events of type Create but %v occurred", add) + } + // secret sync triggers a dummy event + if atomic.LoadUint64(&upd) != 1 { + t.Errorf("expected 1 events of type Update but %v occurred", upd) + } + + err = clientSet.CoreV1().Secrets(ns.Name).Delete(secretName, &metav1.DeleteOptions{}) + if err != nil { + t.Errorf("unexpected error deleting secret: %v", err) + } + + time.Sleep(1 * time.Second) + if atomic.LoadUint64(&del) != 1 { t.Errorf("expected 1 events of type Delete but %v occurred", del) } @@ -346,7 +461,7 @@ func TestStore(t *testing.T) { close(stopCh) }) - t.Run("should create an ingress with a secret it doesn't exists", func(t *testing.T) { + t.Run("should create an ingress with a secret which does not exist", func(t *testing.T) { ns := createNamespace(clientSet, t) defer deleteNamespace(ns, clientSet, t) @@ -434,9 +549,15 @@ func TestStore(t *testing.T) { err = framework.WaitForIngressInNamespace(clientSet, ns.Name, name) if err != nil { - t.Errorf("unexpected error waiting for secret: %v", err) + t.Errorf("unexpected error waiting for ingress: %v", err) } + // take into account delay caused by: + // * ingress annotations extraction + // * secretIngressMap update + // * secrets sync + time.Sleep(3 * time.Second) + if atomic.LoadUint64(&add) != 1 { t.Errorf("expected 1 events of type Create but %v occurred", add) } @@ -458,12 +579,12 @@ func TestStore(t *testing.T) { t.Errorf("unexpected error waiting for secret: %v", err) } - time.Sleep(30 * time.Second) + time.Sleep(5 * time.Second) pemFile := fmt.Sprintf("%v/%v-%v.pem", file.DefaultSSLDirectory, ns.Name, name) err = framework.WaitForFileInFS(pemFile, fs) if err != nil { - t.Errorf("unexpected error waiting for file to exists in the filesystem: %v", err) + t.Errorf("unexpected error waiting for file to exist on the file system: %v", err) } secretName := fmt.Sprintf("%v/%v", ns.Name, name)