Merge pull request #2342 from antoineco/object-ref-map

Sync SSL certificates on events
This commit is contained in:
k8s-ci-robot 2018-04-13 08:32:00 -07:00 committed by GitHub
commit 3fba5c03dd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 521 additions and 217 deletions

View file

@ -40,7 +40,6 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/proxy"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/task"
)
const (
@ -114,21 +113,13 @@ func (n NGINXController) GetPublishService() *apiv1.Service {
// sync collects all the pieces required to assemble the configuration file and
// then sends the content to the backend (OnUpdate) receiving the populated
// template as response reloading the backend if is required.
func (n *NGINXController) syncIngress(item interface{}) error {
func (n *NGINXController) syncIngress(interface{}) error {
n.syncRateLimiter.Accept()
if n.syncQueue.IsShuttingDown() {
return nil
}
if element, ok := item.(task.Element); ok {
if name, ok := element.Key.(string); ok {
if ing, err := n.store.GetIngress(name); err == nil {
n.store.ReadSecrets(ing)
}
}
}
// Sort ingress rules using the ResourceVersion field
ings := n.store.ListIngresses()
sort.SliceStable(ings, func(i, j int) bool {
@ -869,7 +860,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// Tries to fetch the default Certificate from nginx configuration.
// If it does not exists, use the ones generated on Start()
defaultCertificate, err := n.store.GetLocalSecret(n.cfg.DefaultSSLCertificate)
defaultCertificate, err := n.store.GetLocalSSLCert(n.cfg.DefaultSSLCertificate)
if err == nil {
defaultPemFileName = defaultCertificate.PemFileName
defaultPemSHA = defaultCertificate.PemSHA
@ -1039,7 +1030,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName)
cert, err := n.store.GetLocalSecret(key)
cert, err := n.store.GetLocalSSLCert(key)
if err != nil {
glog.Warningf("ssl certificate \"%v\" does not exist in local store", key)
continue

View file

@ -29,7 +29,6 @@ import (
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/net/ssl"
)
@ -51,7 +50,7 @@ func (s k8sStore) syncSecret(key string) {
}
// create certificates and add or update the item in the store
cur, err := s.GetLocalSecret(key)
cur, err := s.GetLocalSSLCert(key)
if err == nil {
if cur.Equal(cert) {
// no need to update
@ -129,9 +128,9 @@ func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error)
}
func (s k8sStore) checkSSLChainIssues() {
for _, item := range s.ListLocalSecrets() {
for _, item := range s.ListLocalSSLCerts() {
secretName := k8s.MetaNamespaceKey(item)
secret, err := s.GetLocalSecret(secretName)
secret, err := s.GetLocalSSLCert(secretName)
if err != nil {
continue
}
@ -179,50 +178,6 @@ func (s k8sStore) checkSSLChainIssues() {
}
}
// checkMissingSecrets verifies if one or more ingress rules contains
// a reference to a secret that is not present in the local secret store.
func (s k8sStore) checkMissingSecrets() {
for _, ing := range s.ListIngresses() {
for _, tls := range ing.Spec.TLS {
if tls.SecretName == "" {
continue
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
if _, ok := s.sslStore.Get(key); !ok {
s.syncSecret(key)
}
}
key, _ := parser.GetStringAnnotation("auth-tls-secret", ing)
if key == "" {
continue
}
if _, ok := s.sslStore.Get(key); !ok {
s.syncSecret(key)
}
}
}
// ReadSecrets extracts information about secrets from an Ingress rule
func (s k8sStore) ReadSecrets(ing *extensions.Ingress) {
for _, tls := range ing.Spec.TLS {
if tls.SecretName == "" {
continue
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
s.syncSecret(key)
}
key, _ := parser.GetStringAnnotation("auth-tls-secret", ing)
if key == "" {
return
}
s.syncSecret(key)
}
// sendDummyEvent sends a dummy event to trigger an update
// This is used in when a secret change
func (s *k8sStore) sendDummyEvent() {

View file

@ -0,0 +1,130 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"sync"
"k8s.io/apimachinery/pkg/util/sets"
)
// ObjectRefMap is a map of references from object(s) to object (1:n). It is
// used to keep track of which data objects (Secrets) are used within Ingress
// objects.
type ObjectRefMap interface {
Insert(consumer string, ref ...string)
Delete(consumer string)
Len() int
Has(ref string) bool
HasConsumer(consumer string) bool
Reference(ref string) []string
ReferencedBy(consumer string) []string
}
type objectRefMap struct {
sync.Mutex
v map[string]sets.String
}
// NewObjectRefMap returns a new ObjectRefMap.
func NewObjectRefMap() ObjectRefMap {
return &objectRefMap{
v: make(map[string]sets.String),
}
}
// Insert adds a consumer to one or more referenced objects.
func (o *objectRefMap) Insert(consumer string, ref ...string) {
o.Lock()
defer o.Unlock()
for _, r := range ref {
if _, ok := o.v[r]; !ok {
o.v[r] = sets.NewString(consumer)
continue
}
o.v[r].Insert(consumer)
}
}
// Delete deletes a consumer from all referenced objects.
func (o *objectRefMap) Delete(consumer string) {
o.Lock()
defer o.Unlock()
for ref, consumers := range o.v {
consumers.Delete(consumer)
if consumers.Len() == 0 {
delete(o.v, ref)
}
}
}
// Len returns the count of referenced objects.
func (o *objectRefMap) Len() int {
return len(o.v)
}
// Has returns whether the given object is referenced by any other object.
func (o *objectRefMap) Has(ref string) bool {
o.Lock()
defer o.Unlock()
if _, ok := o.v[ref]; ok {
return true
}
return false
}
// HasConsumer returns whether the store contains the given consumer.
func (o *objectRefMap) HasConsumer(consumer string) bool {
o.Lock()
defer o.Unlock()
for _, consumers := range o.v {
if consumers.Has(consumer) {
return true
}
}
return false
}
// Reference returns all objects referencing the given object.
func (o *objectRefMap) Reference(ref string) []string {
o.Lock()
defer o.Unlock()
consumers, ok := o.v[ref]
if !ok {
return make([]string, 0)
}
return consumers.List()
}
// ReferencedBy returns all objects referenced by the given object.
func (o *objectRefMap) ReferencedBy(consumer string) []string {
o.Lock()
defer o.Unlock()
refs := make([]string, 0)
for ref, consumers := range o.v {
if consumers.Has(consumer) {
refs = append(refs, ref)
}
}
return refs
}

View file

@ -0,0 +1,65 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import "testing"
func TestObjectRefMapOperations(t *testing.T) {
orm := NewObjectRefMap()
items := []struct {
consumer string
ref []string
}{
{"ns/ingress1", []string{"ns/tls1"}},
{"ns/ingress2", []string{"ns/tls1", "ns/tls2"}},
{"ns/ingress3", []string{"ns/tls1", "ns/tls2", "ns/tls3"}},
}
// populate map with test data
for _, i := range items {
orm.Insert(i.consumer, i.ref...)
}
if l := orm.Len(); l != 3 {
t.Fatalf("Expected 3 referenced objects (got %d)", l)
}
// add already existing item
orm.Insert("ns/ingress1", "ns/tls1")
if l := len(orm.ReferencedBy("ns/ingress1")); l != 1 {
t.Error("Expected existing item not to be added again")
}
// find consumer by name
if !orm.HasConsumer("ns/ingress1") {
t.Error("Expected the \"ns/ingress1\" consumer to exist in the map")
}
// count references to object
if l := len(orm.Reference("ns/tls1")); l != 3 {
t.Errorf("Expected \"ns/tls1\" to be referenced by 3 objects (got %d)", l)
}
// delete consumer
orm.Delete("ns/ingress3")
if l := orm.Len(); l != 2 {
t.Errorf("Expected 2 referenced objects (got %d)", l)
}
if orm.Has("ns/tls3") {
t.Error("Expected \"ns/tls3\" not to be referenced")
}
}

View file

@ -27,16 +27,15 @@ import (
"github.com/eapache/channels"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@ -59,15 +58,15 @@ type Storer interface {
GetBackendConfiguration() ngx_config.Configuration
// GetConfigMap returns a ConfigmMap using the namespace and name as key
GetConfigMap(key string) (*apiv1.ConfigMap, error)
GetConfigMap(key string) (*corev1.ConfigMap, error)
// GetSecret returns a Secret using the namespace and name as key
GetSecret(key string) (*apiv1.Secret, error)
GetSecret(key string) (*corev1.Secret, error)
// GetService returns a Service using the namespace and name as key
GetService(key string) (*apiv1.Service, error)
GetService(key string) (*corev1.Service, error)
GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error)
GetServiceEndpoints(svc *corev1.Service) (*corev1.Endpoints, error)
// GetSecret returns an Ingress using the namespace and name as key
GetIngress(key string) (*extensions.Ingress, error)
@ -78,11 +77,11 @@ type Storer interface {
// GetIngressAnnotations returns the annotations associated to an Ingress
GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error)
// GetLocalSecret returns the local copy of a Secret
GetLocalSecret(name string) (*ingress.SSLCert, error)
// GetLocalSSLCert returns the local copy of a SSLCert
GetLocalSSLCert(name string) (*ingress.SSLCert, error)
// ListLocalSecrets returns the list of local Secrets
ListLocalSecrets() []*ingress.SSLCert
// ListLocalSSLCerts returns the list of local SSLCerts
ListLocalSSLCerts() []*ingress.SSLCert
// GetAuthCertificate resolves a given secret name into an SSL certificate.
// The secret must contain 3 keys named:
@ -94,9 +93,6 @@ type Storer interface {
// Run initiates the synchronization of the controllers
Run(stopCh chan struct{})
// ReadSecrets extracts information about secrets from an Ingress rule
ReadSecrets(*extensions.Ingress)
}
// EventType type of event associated with an informer
@ -109,9 +105,8 @@ const (
UpdateEvent EventType = "UPDATE"
// DeleteEvent event associated when an object is removed from an informer
DeleteEvent EventType = "DELETE"
// ConfigurationEvent event associated when a configuration object is created or updated
// ConfigurationEvent event associated when a controller configuration object is created or updated
ConfigurationEvent EventType = "CONFIGURATION"
slash = "/"
)
// Event holds the context of an event
@ -196,14 +191,14 @@ type k8sStore struct {
// secretIngressMap contains information about which ingress references a
// secret in the annotations.
secretIngressMap map[string]sets.String
secretIngressMap ObjectRefMap
filesystem file.Filesystem
// updateCh
updateCh *channels.RingChannel
// mu mutex used to avoid simultaneous incovations to syncSecret
// mu protects against simultaneous invocations of syncSecret
mu *sync.Mutex
defaultSSLCertificate string
@ -226,16 +221,16 @@ func New(checkOCSP bool,
updateCh: updateCh,
backendConfig: ngx_config.NewDefault(),
mu: &sync.Mutex{},
secretIngressMap: make(map[string]sets.String),
secretIngressMap: NewObjectRefMap(),
defaultSSLCertificate: defaultSSLCertificate,
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
Interface: client.CoreV1().Events(namespace),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
Component: "nginx-ingress-controller",
})
@ -264,22 +259,25 @@ func New(checkOCSP bool,
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !class.IsValid(addIng) {
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
store.extractAnnotations(ing)
store.updateSecretIngressMap(ing)
store.syncSecrets(ing)
store.extractAnnotations(addIng)
recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
ing, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -287,18 +285,23 @@ func New(checkOCSP bool,
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
delIng, ok = tombstone.Obj.(*extensions.Ingress)
ing, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(delIng) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
if !class.IsValid(ing) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)
return
}
recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
store.listers.IngressAnnotation.Delete(delIng)
recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
store.listers.IngressAnnotation.Delete(ing)
key := k8s.MetaNamespaceKey(ing)
store.secretIngressMap.Delete(key)
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
@ -311,15 +314,18 @@ func New(checkOCSP bool,
validCur := class.IsValid(curIng)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
store.extractAnnotations(curIng)
store.updateSecretIngressMap(curIng)
store.syncSecrets(curIng)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
@ -328,39 +334,62 @@ func New(checkOCSP bool,
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
sec := obj.(*corev1.Secret)
key := k8s.MetaNamespaceKey(sec)
if store.defaultSSLCertificate == key {
store.syncSecret(store.defaultSSLCertificate)
}
// find references in ingresses and update local ssl certs
if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
glog.Infof("secret %v was added and it is used in ingress annotations. Parsing...", key)
for _, ingKey := range ings {
ing, err := store.GetIngress(ingKey)
if err != nil {
glog.Errorf("could not find Ingress %v in local store", ingKey)
continue
}
store.extractAnnotations(ing)
store.syncSecrets(ing)
}
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*apiv1.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
sec := cur.(*corev1.Secret)
key := k8s.MetaNamespaceKey(sec)
// parse the ingress annotations (again)
if set, ok := store.secretIngressMap[key]; ok {
glog.Infof("secret %v changed and it is used in ingress annotations. Parsing...", key)
_, err := store.GetLocalSecret(k8s.MetaNamespaceKey(sec))
if err == nil {
store.syncSecret(key)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
if store.defaultSSLCertificate == key {
store.syncSecret(store.defaultSSLCertificate)
}
// find references in ingresses and update local ssl certs
if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
glog.Infof("secret %v was updated and it is used in ingress annotations. Parsing...", key)
for _, ingKey := range ings {
ing, err := store.GetIngress(ingKey)
if err != nil {
glog.Errorf("could not find Ingress %v in local store", ingKey)
continue
}
store.extractAnnotations(ing)
store.syncSecrets(ing)
}
for _, name := range set.List() {
ing, _ := store.GetIngress(name)
if ing != nil {
store.extractAnnotations(ing)
}
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Type: UpdateEvent,
Obj: cur,
}
}
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*apiv1.Secret)
sec, ok := obj.(*corev1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -368,32 +397,31 @@ func New(checkOCSP bool,
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
sec, ok = tombstone.Obj.(*apiv1.Secret)
sec, ok = tombstone.Obj.(*corev1.Secret)
if !ok {
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
return
}
}
store.sslStore.Delete(k8s.MetaNamespaceKey(sec))
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
// parse the ingress annotations (again)c
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
if set, ok := store.secretIngressMap[key]; ok {
glog.Infof("secret %v was removed and it is used in ingress annotations. Parsing...", key)
for _, name := range set.List() {
ing, _ := store.GetIngress(name)
if ing != nil {
store.extractAnnotations(ing)
key := k8s.MetaNamespaceKey(sec)
// find references in ingresses
if ings := store.secretIngressMap.Reference(key); len(ings) > 0 {
glog.Infof("secret %v was deleted and it is used in ingress annotations. Parsing...", key)
for _, ingKey := range ings {
ing, err := store.GetIngress(ingKey)
if err != nil {
glog.Errorf("could not find Ingress %v in local store", ingKey)
continue
}
store.extractAnnotations(ing)
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: sec,
Type: DeleteEvent,
Obj: obj,
}
}
},
@ -413,9 +441,9 @@ func New(checkOCSP bool,
}
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
oep := old.(*corev1.Endpoints)
cep := cur.(*corev1.Endpoints)
if !reflect.DeepEqual(cep.Subsets, oep.Subsets) {
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
@ -424,13 +452,16 @@ func New(checkOCSP bool,
},
}
mapEventHandler := cache.ResourceEventHandlerFuncs{
cmEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
m := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name)
if mapKey == configmap {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
store.setConfig(m)
cm := obj.(*corev1.ConfigMap)
key := k8s.MetaNamespaceKey(cm)
// updates to configuration configmaps can trigger an update
if key == configmap || key == tcp || key == udp {
recorder.Eventf(cm, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("ConfigMap %v", key))
if key == configmap {
store.setConfig(cm)
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: obj,
@ -439,19 +470,14 @@ func New(checkOCSP bool,
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
m := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name)
if mapKey == configmap {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
store.setConfig(m)
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
}
cm := cur.(*corev1.ConfigMap)
key := k8s.MetaNamespaceKey(cm)
// updates to configuration configmaps can trigger an update
if mapKey == tcp || mapKey == udp {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
if key == configmap || key == tcp || key == udp {
recorder.Eventf(cm, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", key))
if key == configmap {
store.setConfig(cm)
}
updateCh.In() <- Event{
Type: ConfigurationEvent,
Obj: cur,
@ -464,7 +490,7 @@ func New(checkOCSP bool,
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(mapEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{})
return store
@ -473,46 +499,75 @@ func New(checkOCSP bool,
// extractAnnotations parses ingress annotations converting the value of the
// annotation to a go struct and also information about the referenced secrets
func (s *k8sStore) extractAnnotations(ing *extensions.Ingress) {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
glog.V(3).Infof("updating annotations information for ingres %v", key)
key := k8s.MetaNamespaceKey(ing)
glog.V(3).Infof("updating annotations information for ingress %v", key)
anns := s.annotations.Extract(ing)
secName := anns.BasicDigestAuth.Secret
if secName != "" {
if _, ok := s.secretIngressMap[secName]; !ok {
s.secretIngressMap[secName] = sets.String{}
}
v := s.secretIngressMap[secName]
if !v.Has(key) {
v.Insert(key)
}
}
secName = anns.CertificateAuth.Secret
if secName != "" {
if _, ok := s.secretIngressMap[secName]; !ok {
s.secretIngressMap[secName] = sets.String{}
}
v := s.secretIngressMap[secName]
if !v.Has(key) {
v.Insert(key)
}
}
err := s.listers.IngressAnnotation.Update(anns)
if err != nil {
glog.Error(err)
}
}
// updateSecretIngressMap takes an Ingress and updates all Secret objects it
// references in secretIngressMap.
func (s *k8sStore) updateSecretIngressMap(ing *extensions.Ingress) {
key := k8s.MetaNamespaceKey(ing)
glog.V(3).Infof("updating references to secrets for ingress %v", key)
// delete all existing references first
s.secretIngressMap.Delete(key)
var refSecrets []string
for _, tls := range ing.Spec.TLS {
secrName := tls.SecretName
if secrName != "" {
secrKey := fmt.Sprintf("%v/%v", ing.Namespace, secrName)
refSecrets = append(refSecrets, secrKey)
}
}
// We can not rely on cached ingress annotations because these are
// discarded when the referenced secret does not exist in the local
// store. As a result, adding a secret *after* the ingress(es) which
// references it would not trigger a resync of that secret.
secretAnnotations := []string{
"auth-secret",
"auth-tls-secret",
}
for _, ann := range secretAnnotations {
secrName, err := parser.GetStringAnnotation(ann, ing)
if err != nil {
continue
}
if secrName != "" {
secrKey := fmt.Sprintf("%v/%v", ing.Namespace, secrName)
refSecrets = append(refSecrets, secrKey)
}
}
// populate map with all secret references
s.secretIngressMap.Insert(key, refSecrets...)
}
// syncSecrets synchronizes data from all Secrets referenced by the given
// Ingress with the local store and file system.
func (s k8sStore) syncSecrets(ing *extensions.Ingress) {
key := k8s.MetaNamespaceKey(ing)
for _, secrKey := range s.secretIngressMap.ReferencedBy(key) {
s.syncSecret(secrKey)
}
}
// GetSecret returns a Secret using the namespace and name as key
func (s k8sStore) GetSecret(key string) (*apiv1.Secret, error) {
func (s k8sStore) GetSecret(key string) (*corev1.Secret, error) {
return s.listers.Secret.ByKey(key)
}
// ListLocalSecrets returns the list of local Secrets
func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert {
// ListLocalSSLCerts returns the list of local SSLCerts
func (s k8sStore) ListLocalSSLCerts() []*ingress.SSLCert {
var certs []*ingress.SSLCert
for _, item := range s.sslStore.List() {
if s, ok := item.(*ingress.SSLCert); ok {
@ -524,7 +579,7 @@ func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert {
}
// GetService returns a Service using the namespace and name as key
func (s k8sStore) GetService(key string) (*apiv1.Service, error) {
func (s k8sStore) GetService(key string) (*corev1.Service, error) {
return s.listers.Service.ByKey(key)
}
@ -545,7 +600,7 @@ func (s k8sStore) ListIngresses() []*extensions.Ingress {
for ri, rule := range ing.Spec.Rules {
for pi, path := range rule.HTTP.Paths {
if path.Path == "" {
ing.Spec.Rules[ri].HTTP.Paths[pi].Path = slash
ing.Spec.Rules[ri].HTTP.Paths[pi].Path = "/"
}
}
}
@ -557,7 +612,7 @@ func (s k8sStore) ListIngresses() []*extensions.Ingress {
// GetIngressAnnotations returns the annotations associated to an Ingress
func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
key := k8s.MetaNamespaceKey(ing)
item, exists, err := s.listers.IngressAnnotation.GetByKey(key)
if err != nil {
return &annotations.Ingress{}, fmt.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
@ -568,26 +623,26 @@ func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.I
return item.(*annotations.Ingress), nil
}
// GetLocalSecret returns the local copy of a Secret
func (s k8sStore) GetLocalSecret(key string) (*ingress.SSLCert, error) {
// GetLocalSSLCert returns the local copy of a SSLCert
func (s k8sStore) GetLocalSSLCert(key string) (*ingress.SSLCert, error) {
return s.sslStore.ByKey(key)
}
func (s k8sStore) GetConfigMap(key string) (*apiv1.ConfigMap, error) {
func (s k8sStore) GetConfigMap(key string) (*corev1.ConfigMap, error) {
return s.listers.ConfigMap.ByKey(key)
}
func (s k8sStore) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
func (s k8sStore) GetServiceEndpoints(svc *corev1.Service) (*corev1.Endpoints, error) {
return s.listers.Endpoint.GetServiceEndpoints(svc)
}
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
func (s k8sStore) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
if _, err := s.GetLocalSecret(name); err != nil {
if _, err := s.GetLocalSSLCert(name); err != nil {
s.syncSecret(name)
}
cert, err := s.GetLocalSecret(name)
cert, err := s.GetLocalSSLCert(name)
if err != nil {
return nil, err
}
@ -608,7 +663,7 @@ func (s k8sStore) GetBackendConfiguration() ngx_config.Configuration {
return s.backendConfig
}
func (s *k8sStore) setConfig(cmap *apiv1.ConfigMap) {
func (s *k8sStore) setConfig(cmap *corev1.ConfigMap) {
s.backendConfig = ngx_template.ReadConfig(cmap.Data)
// TODO: this should not be done here
@ -628,19 +683,6 @@ func (s k8sStore) Run(stopCh chan struct{}) {
// start informers
s.informers.Run(stopCh)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, ing := range s.ListIngresses() {
s.ReadSecrets(ing)
}
if s.defaultSSLCertificate != "" {
s.syncSecret(s.defaultSSLCertificate)
}
// start goroutine to check for missing local secrets
go wait.Until(s.checkMissingSecrets, 10*time.Second, stopCh)
if s.isOCSPCheckEnabled {
go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh)
}

View file

@ -88,7 +88,7 @@ func TestStore(t *testing.T) {
t.Errorf("expected an Ingres but none returned")
}
ls, err := storer.GetLocalSecret(key)
ls, err := storer.GetLocalSSLCert(key)
if err == nil {
t.Errorf("expected an error but none returned")
}
@ -116,7 +116,7 @@ func TestStore(t *testing.T) {
close(stopCh)
})
t.Run("should return ingress one event for add, update and delete", func(t *testing.T) {
t.Run("should return one event for add, update and delete of ingress", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
@ -260,7 +260,7 @@ func TestStore(t *testing.T) {
close(stopCh)
})
t.Run("should not receive events from new secret no referenced from ingress", func(t *testing.T) {
t.Run("should not receive events from secret not referenced from ingress", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
@ -307,13 +307,16 @@ func TestStore(t *testing.T) {
storer.Run(stopCh)
secretName := "no-referenced"
secretName := "not-referenced"
_, _, _, err = framework.CreateIngressTLSSecret(clientSet, []string{"foo"}, secretName, ns.Name)
if err != nil {
t.Errorf("unexpected error creating secret: %v", err)
}
time.Sleep(1 * time.Second)
err = framework.WaitForSecretInNamespace(clientSet, ns.Name, secretName)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
}
if atomic.LoadUint64(&add) != 0 {
t.Errorf("expected 0 events of type Create but %v occurred", add)
@ -338,6 +341,118 @@ func TestStore(t *testing.T) {
if atomic.LoadUint64(&upd) != 0 {
t.Errorf("expected 0 events of type Update but %v occurred", upd)
}
if atomic.LoadUint64(&del) != 0 {
t.Errorf("expected 0 events of type Delete but %v occurred", del)
}
updateCh.Close()
close(stopCh)
})
t.Run("should receive events from secret referenced from ingress", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := channels.NewRingChannel(1024)
var add uint64
var upd uint64
var del uint64
go func(ch *channels.RingChannel) {
for {
evt, ok := <-ch.Out()
if !ok {
return
}
e := evt.(Event)
if e.Obj == nil {
continue
}
switch e.Type {
case CreateEvent:
atomic.AddUint64(&add, 1)
case UpdateEvent:
atomic.AddUint64(&upd, 1)
case DeleteEvent:
atomic.AddUint64(&del, 1)
}
}
}(updateCh)
fs := newFS(t)
storer := New(true,
ns.Name,
fmt.Sprintf("%v/config", ns.Name),
fmt.Sprintf("%v/tcp", ns.Name),
fmt.Sprintf("%v/udp", ns.Name),
"",
10*time.Minute,
clientSet,
fs,
updateCh)
storer.Run(stopCh)
ingressName := "ingress-with-secret"
secretName := "referenced"
_, err := ensureIngress(&v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: ingressName,
Namespace: ns.Name,
},
Spec: v1beta1.IngressSpec{
TLS: []v1beta1.IngressTLS{
{
SecretName: secretName,
},
},
Backend: &v1beta1.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
},
}, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
err = framework.WaitForIngressInNamespace(clientSet, ns.Name, ingressName)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
}
_, _, _, err = framework.CreateIngressTLSSecret(clientSet, []string{"foo"}, secretName, ns.Name)
if err != nil {
t.Errorf("unexpected error creating secret: %v", err)
}
err = framework.WaitForSecretInNamespace(clientSet, ns.Name, secretName)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
}
// take into account secret sync
time.Sleep(3 * time.Second)
if atomic.LoadUint64(&add) != 2 {
t.Errorf("expected 2 events of type Create but %v occurred", add)
}
// secret sync triggers a dummy event
if atomic.LoadUint64(&upd) != 1 {
t.Errorf("expected 1 events of type Update but %v occurred", upd)
}
err = clientSet.CoreV1().Secrets(ns.Name).Delete(secretName, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("unexpected error deleting secret: %v", err)
}
time.Sleep(1 * time.Second)
if atomic.LoadUint64(&del) != 1 {
t.Errorf("expected 1 events of type Delete but %v occurred", del)
}
@ -346,7 +461,7 @@ func TestStore(t *testing.T) {
close(stopCh)
})
t.Run("should create an ingress with a secret it doesn't exists", func(t *testing.T) {
t.Run("should create an ingress with a secret which does not exist", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
@ -434,9 +549,15 @@ func TestStore(t *testing.T) {
err = framework.WaitForIngressInNamespace(clientSet, ns.Name, name)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
t.Errorf("unexpected error waiting for ingress: %v", err)
}
// take into account delay caused by:
// * ingress annotations extraction
// * secretIngressMap update
// * secrets sync
time.Sleep(3 * time.Second)
if atomic.LoadUint64(&add) != 1 {
t.Errorf("expected 1 events of type Create but %v occurred", add)
}
@ -458,16 +579,16 @@ func TestStore(t *testing.T) {
t.Errorf("unexpected error waiting for secret: %v", err)
}
time.Sleep(30 * time.Second)
time.Sleep(5 * time.Second)
pemFile := fmt.Sprintf("%v/%v-%v.pem", file.DefaultSSLDirectory, ns.Name, name)
err = framework.WaitForFileInFS(pemFile, fs)
if err != nil {
t.Errorf("unexpected error waiting for file to exists in the filesystem: %v", err)
t.Errorf("unexpected error waiting for file to exist on the file system: %v", err)
}
secretName := fmt.Sprintf("%v/%v", ns.Name, name)
sslCert, err := storer.GetLocalSecret(secretName)
sslCert, err := storer.GetLocalSSLCert(secretName)
if err != nil {
t.Errorf("unexpected error reading local secret %v: %v", secretName, err)
}

View file

@ -42,9 +42,9 @@ type Queue struct {
sync func(interface{}) error
// workerDone is closed when the worker exits
workerDone chan bool
// fn makes a key for an API object
fn func(obj interface{}) (interface{}, error)
// lastSync is the Unix epoch time of the last execution of 'sync'
lastSync int64
}