Refactor lister and controllers

This commit is contained in:
Manuel de Brito Fontes 2017-11-17 21:15:43 -03:00
parent de37e8ea89
commit dee4056cf0
23 changed files with 1182 additions and 571 deletions

View file

@ -34,6 +34,9 @@ jobs:
- go get github.com/golang/lint/golint - go get github.com/golang/lint/golint
- make fmt lint vet - make fmt lint vet
- stage: Coverage - stage: Coverage
before_script:
- make e2e-image
- test/e2e/up.sh
script: script:
- go get github.com/mattn/goveralls - go get github.com/mattn/goveralls
- go get github.com/modocache/gover - go get github.com/modocache/gover

View file

@ -27,15 +27,12 @@ import (
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/controller" "k8s.io/ingress-nginx/internal/ingress/controller"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
ing_net "k8s.io/ingress-nginx/internal/net" ing_net "k8s.io/ingress-nginx/internal/net"
) )
const (
defIngressClass = "nginx"
)
func parseFlags() (bool, *controller.Configuration, error) { func parseFlags() (bool, *controller.Configuration, error) {
var ( var (
flags = pflag.NewFlagSet("", pflag.ExitOnError) flags = pflag.NewFlagSet("", pflag.ExitOnError)
@ -157,6 +154,8 @@ func parseFlags() (bool, *controller.Configuration, error) {
} }
} }
ingress.IngressClass = *ingressClass
// check port collisions // check port collisions
if !ing_net.IsPortAvailable(*httpPort) { if !ing_net.IsPortAvailable(*httpPort) {
return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --http-port", *httpPort) return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --http-port", *httpPort)
@ -198,7 +197,6 @@ func parseFlags() (bool, *controller.Configuration, error) {
EnableSSLChainCompletion: *enableSSLChainCompletion, EnableSSLChainCompletion: *enableSSLChainCompletion,
ResyncPeriod: *resyncPeriod, ResyncPeriod: *resyncPeriod,
DefaultService: *defaultSvc, DefaultService: *defaultSvc,
IngressClass: *ingressClass,
Namespace: *watchNamespace, Namespace: *watchNamespace,
ConfigMapName: *configMap, ConfigMapName: *configMap,
TCPConfigMapName: *tcpConfigMapName, TCPConfigMapName: *tcpConfigMapName,

View file

@ -127,6 +127,7 @@ func main() {
ngx := controller.NewNGINXController(conf) ngx := controller.NewNGINXController(conf)
if conf.EnableSSLPassthrough { if conf.EnableSSLPassthrough {
glog.Info("setting up TLS proxy for SSL passthrough")
setupSSLProxy(conf.ListenPorts.HTTPS, conf.ListenPorts.SSLProxy, ngx) setupSSLProxy(conf.ListenPorts.HTTPS, conf.ListenPorts.SSLProxy, ngx)
} }

View file

@ -37,8 +37,6 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/ingress-nginx/internal/ingress" "k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck" "k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser" "k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/annotations/proxy" "k8s.io/ingress-nginx/internal/ingress/annotations/proxy"
@ -76,8 +74,8 @@ type Configuration struct {
ConfigMapName string ConfigMapName string
DefaultService string DefaultService string
IngressClass string
Namespace string Namespace string
ForceNamespaceIsolation bool ForceNamespaceIsolation bool
@ -117,26 +115,6 @@ func (n NGINXController) GetDefaultBackend() defaults.Backend {
return n.backendDefaults return n.backendDefaults
} }
// GetPublishService returns the configured service used to set ingress status
func (n NGINXController) GetPublishService() *apiv1.Service {
s, err := n.listers.Service.GetByName(n.cfg.PublishService)
if err != nil {
return nil
}
return s
}
// GetSecret searches for a secret in the local secrets Store
func (n NGINXController) GetSecret(name string) (*apiv1.Secret, error) {
return n.listers.Secret.GetByName(name)
}
// GetService searches for a service in the local secrets Store
func (n NGINXController) GetService(name string) (*apiv1.Service, error) {
return n.listers.Service.GetByName(name)
}
// GetAnnotationWithPrefix returns the prefix of ingress annotations // GetAnnotationWithPrefix returns the prefix of ingress annotations
func (n NGINXController) GetAnnotationWithPrefix(suffix string) string { func (n NGINXController) GetAnnotationWithPrefix(suffix string) string {
return fmt.Sprintf("%v/%v", n.cfg.AnnotationsPrefix, suffix) return fmt.Sprintf("%v/%v", n.cfg.AnnotationsPrefix, suffix)
@ -154,32 +132,20 @@ func (n *NGINXController) syncIngress(item interface{}) error {
if element, ok := item.(task.Element); ok { if element, ok := item.(task.Element); ok {
if name, ok := element.Key.(string); ok { if name, ok := element.Key.(string); ok {
if obj, exists, _ := n.listers.Ingress.GetByKey(name); exists { if ing, err := n.store.GetIngress(name); err == nil {
ing := obj.(*extensions.Ingress)
n.readSecrets(ing) n.readSecrets(ing)
} }
} }
} }
// Sort ingress rules using the ResourceVersion field // Sort ingress rules using the ResourceVersion field
ings := n.listers.Ingress.List() ingresses := n.store.ListIngresses()
sort.SliceStable(ings, func(i, j int) bool { sort.SliceStable(ingresses, func(i, j int) bool {
ir := ings[i].(*extensions.Ingress).ResourceVersion ir := ingresses[i].ResourceVersion
jr := ings[j].(*extensions.Ingress).ResourceVersion jr := ingresses[j].ResourceVersion
return ir < jr return ir < jr
}) })
// filter ingress rules
var ingresses []*extensions.Ingress
for _, ingIf := range ings {
ing := ingIf.(*extensions.Ingress)
if !class.IsValid(ing, n.cfg.IngressClass, n.cfg.DefaultIngressClass) {
continue
}
ingresses = append(ingresses, ing)
}
upstreams, servers := n.getBackendServers(ingresses) upstreams, servers := n.getBackendServers(ingresses)
var passUpstreams []*ingress.SSLPassthroughBackend var passUpstreams []*ingress.SSLPassthroughBackend
@ -248,7 +214,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
return []ingress.L4Service{} return []ingress.L4Service{}
} }
configmap, err := n.listers.ConfigMap.GetByName(configmapName) configmap, err := n.store.GetConfigMap(configmapName)
if err != nil { if err != nil {
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err) glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
return []ingress.L4Service{} return []ingress.L4Service{}
@ -306,19 +272,12 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
continue continue
} }
svcObj, svcExists, err := n.listers.Service.GetByKey(nsName) svc, err := n.store.GetService(nsName)
if err != nil { if err != nil {
glog.Warningf("error getting service %v: %v", nsName, err) glog.Warningf("error getting service %v: %v", nsName, err)
continue continue
} }
if !svcExists {
glog.Warningf("service %v was not found", nsName)
continue
}
svc := svcObj.(*apiv1.Service)
var endps []ingress.Endpoint var endps []ingress.Endpoint
targetPort, err := strconv.Atoi(svcPort) targetPort, err := strconv.Atoi(svcPort)
if err != nil { if err != nil {
@ -375,20 +334,13 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
Name: defUpstreamName, Name: defUpstreamName,
} }
svcKey := n.cfg.DefaultService svcKey := n.cfg.DefaultService
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey) svc, err := n.store.GetService(svcKey)
if err != nil { if err != nil {
glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err) glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err)
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint()) upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream return upstream
} }
if !svcExists {
glog.Warningf("service %v does not exist", svcKey)
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream
}
svc := svcObj.(*apiv1.Service)
endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{}) endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{})
if len(endps) == 0 { if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey) glog.Warningf("service %v does not have any active endpoints", svcKey)
@ -408,7 +360,10 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
servers := n.createServers(ingresses, upstreams, du) servers := n.createServers(ingresses, upstreams, du)
for _, ing := range ingresses { for _, ing := range ingresses {
anns := n.getIngressAnnotations(ing) anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Warningf("%v", err)
}
for _, rule := range ing.Spec.Rules { for _, rule := range ing.Spec.Rules {
host := rule.Host host := rule.Host
@ -622,20 +577,19 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret // GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
func (n NGINXController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) { func (n NGINXController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
if _, exists := n.sslCertTracker.Get(name); !exists { if _, err := n.store.GetLocalSecret(name); err != nil {
n.syncSecret(name) n.syncSecret(name)
} }
_, err := n.listers.Secret.GetByName(name) _, err := n.store.GetLocalSecret(name)
if err != nil { if err != nil {
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err) return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
} }
bc, exists := n.sslCertTracker.Get(name) cert, err := n.store.GetLocalSecret(name)
if !exists { if err != nil {
return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name) return &resolver.AuthSSLCert{}, err
} }
cert := bc.(*ingress.SSLCert)
return &resolver.AuthSSLCert{ return &resolver.AuthSSLCert{
Secret: name, Secret: name,
CAFileName: cert.CAFileName, CAFileName: cert.CAFileName,
@ -650,7 +604,10 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[defUpstreamName] = du upstreams[defUpstreamName] = du
for _, ing := range data { for _, ing := range data {
anns := n.getIngressAnnotations(ing) anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Warningf("%v", err)
}
var defBackend string var defBackend string
if ing.Spec.Backend != nil { if ing.Spec.Backend != nil {
@ -737,7 +694,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[name].Endpoints = endp upstreams[name].Endpoints = endp
} }
s, err := n.listers.Service.GetByName(svcKey) s, err := n.store.GetService(svcKey)
if err != nil { if err != nil {
glog.Warningf("error obtaining service: %v", err) glog.Warningf("error obtaining service: %v", err)
continue continue
@ -752,13 +709,11 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
} }
func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) { func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey) svc, err := n.store.GetService(svcKey)
if err != nil {
if !svcExists { return endpoint, err
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
} }
svc := svcObj.(*apiv1.Service)
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" { if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey) return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey)
} }
@ -790,7 +745,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *exte
// to a service. // to a service.
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string, func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
hz *healthcheck.Config) ([]ingress.Endpoint, error) { hz *healthcheck.Config) ([]ingress.Endpoint, error) {
svc, err := n.listers.Service.GetByName(svcKey) svc, err := n.store.GetService(svcKey)
var upstreams []ingress.Endpoint var upstreams []ingress.Endpoint
if err != nil { if err != nil {
@ -915,7 +870,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// initialize all the servers // initialize all the servers
for _, ing := range data { for _, ing := range data {
anns := n.getIngressAnnotations(ing) anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Warningf("%v", err)
}
// default upstream server // default upstream server
un := du.Name un := du.Name
@ -966,7 +924,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// configure default location, alias, and SSL // configure default location, alias, and SSL
for _, ing := range data { for _, ing := range data {
anns := n.getIngressAnnotations(ing) anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Warningf("%v", err)
}
for _, rule := range ing.Spec.Rules { for _, rule := range ing.Spec.Rules {
host := rule.Host host := rule.Host
@ -1031,13 +992,12 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
} }
key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName) key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName)
bc, exists := n.sslCertTracker.Get(key) cert, err := n.store.GetLocalSecret(key)
if !exists { if err != nil {
glog.Warningf("ssl certificate \"%v\" does not exist in local store", key) glog.Warningf("%v", err)
continue continue
} }
cert := bc.(*ingress.SSLCert)
err = cert.Certificate.VerifyHostname(host) err = cert.Certificate.VerifyHostname(host)
if err != nil { if err != nil {
glog.Warningf("ssl certificate %v does not contain a Common Name or Subject Alternative Name for host %v", key, host) glog.Warningf("ssl certificate %v does not contain a Common Name or Subject Alternative Name for host %v", key, host)
@ -1107,7 +1067,7 @@ func (n *NGINXController) getEndpoints(
} }
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String()) glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := n.listers.Endpoint.GetServiceEndpoints(s) ep, err := n.store.GetServiceEndpoints(s)
if err != nil { if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err) glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return upsServers return upsServers
@ -1187,24 +1147,3 @@ func (n *NGINXController) SetForceReload(shouldReload bool) {
atomic.StoreInt32(&n.forceReload, 0) atomic.StoreInt32(&n.forceReload, 0)
} }
} }
func (n *NGINXController) extractAnnotations(ing *extensions.Ingress) {
anns := n.annotations.Extract(ing)
glog.V(3).Infof("updating annotations information for ingres %v/%v", anns.Namespace, anns.Name)
n.listers.IngressAnnotation.Update(anns)
}
// getByIngress returns the parsed annotations from an Ingress
func (n *NGINXController) getIngressAnnotations(ing *extensions.Ingress) *annotations.Ingress {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
item, exists, err := n.listers.IngressAnnotation.GetByKey(key)
if err != nil {
glog.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
return &annotations.Ingress{}
}
if !exists {
glog.Errorf("ingress annotation %v was not found", key)
return &annotations.Ingress{}
}
return item.(*annotations.Ingress)
}

View file

@ -1,228 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"reflect"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
cache_client "k8s.io/client-go/tools/cache"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
)
type cacheController struct {
Ingress cache.Controller
Endpoint cache.Controller
Service cache.Controller
Secret cache.Controller
Configmap cache.Controller
}
func (c *cacheController) Run(stopCh chan struct{}) {
go c.Ingress.Run(stopCh)
go c.Endpoint.Run(stopCh)
go c.Service.Run(stopCh)
go c.Secret.Run(stopCh)
go c.Configmap.Run(stopCh)
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh,
c.Ingress.HasSynced,
c.Endpoint.HasSynced,
c.Service.HasSynced,
c.Secret.HasSynced,
c.Configmap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
}
func (n *NGINXController) createListers(stopCh chan struct{}) (*ingress.StoreLister, *cacheController) {
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !class.IsValid(addIng, n.cfg.IngressClass, defIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng, n)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
return
}
n.extractAnnotations(addIng)
n.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
n.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
delIng, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(delIng, n.cfg.IngressClass, defIngressClass) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
return
}
n.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
n.listers.IngressAnnotation.Delete(delIng)
n.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng, n.cfg.IngressClass, defIngressClass)
validCur := class.IsValid(curIng, n.cfg.IngressClass, defIngressClass)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
n.extractAnnotations(curIng)
n.syncQueue.Enqueue(cur)
},
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*apiv1.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
_, exists := n.sslCertTracker.Get(key)
if exists {
n.syncSecret(key)
}
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*apiv1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
sec, ok = tombstone.Obj.(*apiv1.Secret)
if !ok {
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
return
}
}
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
n.sslCertTracker.Delete(key)
n.syncQueue.Enqueue(key)
},
}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
n.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
n.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
n.syncQueue.Enqueue(cur)
}
},
}
mapEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
upCmap := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == n.cfg.ConfigMapName {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
n.SetConfig(upCmap)
n.SetForceReload(true)
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upCmap := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == n.cfg.ConfigMapName {
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
n.SetConfig(upCmap)
n.SetForceReload(true)
}
// updates to configuration configmaps can trigger an update
if mapKey == n.cfg.ConfigMapName || mapKey == n.cfg.TCPConfigMapName || mapKey == n.cfg.UDPConfigMapName {
n.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
n.syncQueue.Enqueue(cur)
}
}
},
}
watchNs := apiv1.NamespaceAll
if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll {
watchNs = n.cfg.Namespace
}
lister := &ingress.StoreLister{}
lister.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
controller := &cacheController{}
lister.Ingress.Store, controller.Ingress = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", n.cfg.Namespace, fields.Everything()),
&extensions.Ingress{}, n.cfg.ResyncPeriod, ingEventHandler)
lister.Endpoint.Store, controller.Endpoint = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "endpoints", n.cfg.Namespace, fields.Everything()),
&apiv1.Endpoints{}, n.cfg.ResyncPeriod, eventHandler)
lister.Secret.Store, controller.Secret = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
&apiv1.Secret{}, n.cfg.ResyncPeriod, secrEventHandler)
lister.ConfigMap.Store, controller.Configmap = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
&apiv1.ConfigMap{}, n.cfg.ResyncPeriod, mapEventHandler)
lister.Service.Store, controller.Service = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "services", n.cfg.Namespace, fields.Everything()),
&apiv1.Service{}, n.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
return lister, controller
}

View file

@ -69,10 +69,9 @@ const (
) )
var ( var (
tmplPath = "/etc/nginx/template/nginx.tmpl" tmplPath = "/etc/nginx/template/nginx.tmpl"
cfgPath = "/etc/nginx/nginx.conf" cfgPath = "/etc/nginx/nginx.conf"
nginxBinary = "/usr/sbin/nginx" nginxBinary = "/usr/sbin/nginx"
defIngressClass = "nginx"
) )
// NewNGINXController creates a new NGINX Ingress controller. // NewNGINXController creates a new NGINX Ingress controller.
@ -103,9 +102,9 @@ func NewNGINXController(config *Configuration) *NGINXController {
isIPV6Enabled: ing_net.IsIPv6Enabled(), isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h, resolver: h,
cfg: config, cfg: config,
sslCertTracker: store.NewSSLCertTracker(),
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1), syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
@ -113,19 +112,26 @@ func NewNGINXController(config *Configuration) *NGINXController {
}), }),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
updateCh: make(chan interface{}),
stopLock: &sync.Mutex{}, stopLock: &sync.Mutex{},
fileSystem: filesystem.DefaultFs{}, fileSystem: filesystem.DefaultFs{},
} }
n.listers, n.controllers = n.createListers(n.stopCh) watchNs := apiv1.NamespaceAll
if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll {
watchNs = n.cfg.Namespace
}
ae := annotations.NewAnnotationExtractor(n)
n.store = store.New(watchNs, n.cfg.ResyncPeriod, n.recorder, n.cfg.Client, ae, n.updateCh)
n.stats = newStatsCollector(config.Namespace, config.IngressClass, n.binary, n.cfg.ListenPorts.Status) n.stats = newStatsCollector(config.Namespace, config.IngressClass, n.binary, n.cfg.ListenPorts.Status)
n.syncQueue = task.NewTaskQueue(n.syncIngress) n.syncQueue = task.NewTaskQueue(n.syncIngress)
n.annotations = annotations.NewAnnotationExtractor(n)
if config.UpdateStatus { if config.UpdateStatus {
n.syncStatus = status.NewStatusSyncer(status.Config{ n.syncStatus = status.NewStatusSyncer(status.Config{
Client: config.Client, Client: config.Client,
@ -174,21 +180,12 @@ Error loading new template : %v
type NGINXController struct { type NGINXController struct {
cfg *Configuration cfg *Configuration
listers *ingress.StoreLister
controllers *cacheController
annotations annotations.Extractor
recorder record.EventRecorder recorder record.EventRecorder
syncQueue *task.Queue syncQueue *task.Queue
syncStatus status.Sync syncStatus status.Sync
// local store of SSL certificates
// (only certificates used in ingress)
sslCertTracker *store.SSLCertTracker
syncRateLimiter flowcontrol.RateLimiter syncRateLimiter flowcontrol.RateLimiter
// stopLock is used to enforce only a single call to Stop is active. // stopLock is used to enforce only a single call to Stop is active.
@ -201,7 +198,7 @@ type NGINXController struct {
// ngxErrCh channel used to detect errors with the nginx processes // ngxErrCh channel used to detect errors with the nginx processes
ngxErrCh chan error ngxErrCh chan error
// runningConfig contains the running configuration in the Backend // runningConfig contains the running configuration
runningConfig *ingress.Configuration runningConfig *ingress.Configuration
forceReload int32 forceReload int32
@ -210,7 +207,7 @@ type NGINXController struct {
configmap *apiv1.ConfigMap configmap *apiv1.ConfigMap
storeLister *ingress.StoreLister store store.Storer
binary string binary string
resolver []net.IP resolver []net.IP
@ -224,8 +221,6 @@ type NGINXController struct {
// returns true if proxy protocol es enabled // returns true if proxy protocol es enabled
IsProxyProtocolEnabled bool IsProxyProtocolEnabled bool
isSSLPassthroughEnabled bool
isShuttingDown bool isShuttingDown bool
Proxy *TCPProxy Proxy *TCPProxy
@ -239,7 +234,7 @@ type NGINXController struct {
func (n *NGINXController) Start() { func (n *NGINXController) Start() {
glog.Infof("starting Ingress controller") glog.Infof("starting Ingress controller")
n.controllers.Run(n.stopCh) n.store.Start()
// initial sync of secrets to avoid unnecessary reloads // initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets") glog.Info("running initial sync of secrets")
@ -265,8 +260,6 @@ func (n *NGINXController) Start() {
go n.syncStatus.Run(n.stopCh) go n.syncStatus.Run(n.stopCh)
} }
go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh)
done := make(chan error, 1) done := make(chan error, 1)
cmd := exec.Command(n.binary, "-c", cfgPath) cmd := exec.Command(n.binary, "-c", cfgPath)
@ -472,16 +465,40 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
} }
} }
svcIP := svc.Spec.ClusterIP
// in case of headless services we can use just the first address
isHeadlessService := svc.Spec.ClusterIP == "None"
if isHeadlessService {
ep, err := n.listers.Endpoint.GetServiceEndpoints(svc)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
continue
}
if len(ep.Subsets) == 0 {
glog.Warningf("invalid service headless definition (no subsets)")
continue
}
if len(ep.Subsets[0].Addresses) == 0 {
glog.Warningf("invalid service headless definition (no addresses)")
continue
}
svcIP = ep.Subsets[0].Addresses[0].IP
}
//TODO: Allow PassthroughBackends to specify they support proxy-protocol //TODO: Allow PassthroughBackends to specify they support proxy-protocol
servers = append(servers, &TCPServer{ servers = append(servers, &TCPServer{
Hostname: pb.Hostname, Hostname: pb.Hostname,
IP: svc.Spec.ClusterIP, IP: svcIP,
Port: port, Port: port,
ProxyProtocol: false, ProxyProtocol: false,
}) })
} }
if n.isSSLPassthroughEnabled { if n.cfg.EnableSSLPassthrough {
n.Proxy.ServerList = servers n.Proxy.ServerList = servers
} }
@ -627,7 +644,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
Cfg: cfg, Cfg: cfg,
IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6, IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6,
RedirectServers: redirectServers, RedirectServers: redirectServers,
IsSSLPassthroughEnabled: n.isSSLPassthroughEnabled, IsSSLPassthroughEnabled: n.cfg.EnableSSLPassthrough,
ListenPorts: n.cfg.ListenPorts, ListenPorts: n.cfg.ListenPorts,
PublishService: n.GetPublishService(), PublishService: n.GetPublishService(),
} }

View file

@ -29,6 +29,10 @@ import (
"github.com/ncabatoff/process-exporter/proc" "github.com/ncabatoff/process-exporter/proc"
) )
const (
processName = "nginx"
)
// IsRespawnIfRequired checks if error type is exec.ExitError or not // IsRespawnIfRequired checks if error type is exec.ExitError or not
func IsRespawnIfRequired(err error) bool { func IsRespawnIfRequired(err error) bool {
exitError, ok := err.(*exec.ExitError) exitError, ok := err.(*exec.ExitError)
@ -68,7 +72,7 @@ func WaitUntilPortIsAvailable(port int) {
continue continue
} }
if pn == "nginx" { if pn == processName {
osp, err := os.FindProcess(p.PID) osp, err := os.FindProcess(p.PID)
if err != nil { if err != nil {
glog.Errorf("unexpected error obtaining process information: %v", err) glog.Errorf("unexpected error obtaining process information: %v", err)
@ -85,7 +89,7 @@ func WaitUntilPortIsAvailable(port int) {
func IsNginxRunning() bool { func IsNginxRunning() bool {
processes, _ := ps.Processes() processes, _ := ps.Processes()
for _, p := range processes { for _, p := range processes {
if p.Executable() == "nginx" { if p.Executable() == processName {
return true return true
} }
} }

View file

@ -40,6 +40,7 @@ type TCPProxy struct {
func (p *TCPProxy) Get(host string) *TCPServer { func (p *TCPProxy) Get(host string) *TCPServer {
if p.ServerList == nil { if p.ServerList == nil {
glog.Warning("there is no servers configured with SSL passthrough. Returning default backend")
return p.Default return p.Default
} }
@ -94,6 +95,7 @@ func (p *TCPProxy) Handle(conn net.Conn) {
glog.V(4).Infof("Writing proxy protocol header - %s", proxyProtocolHeader) glog.V(4).Infof("Writing proxy protocol header - %s", proxyProtocolHeader)
_, err = fmt.Fprintf(clientConn, proxyProtocolHeader) _, err = fmt.Fprintf(clientConn, proxyProtocolHeader)
} }
if err != nil { if err != nil {
glog.Errorf("unexpected error writing proxy-protocol header: %s", err) glog.Errorf("unexpected error writing proxy-protocol header: %s", err)
clientConn.Close() clientConn.Close()

View file

@ -40,6 +40,7 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/class" "k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/store" "k8s.io/ingress-nginx/internal/ingress/store"
"k8s.io/ingress-nginx/internal/k8s" "k8s.io/ingress-nginx/internal/k8s"
@ -69,9 +70,6 @@ type Config struct {
UseNodeInternalIP bool UseNodeInternalIP bool
IngressLister store.IngressLister IngressLister store.IngressLister
DefaultIngressClass string
IngressClass string
} }
// statusSync keeps the status IP in each Ingress rule updated executing a periodic check // statusSync keeps the status IP in each Ingress rule updated executing a periodic check
@ -180,9 +178,9 @@ func NewStatusSyncer(config Config) Sync {
// we need to use the defined ingress class to allow multiple leaders // we need to use the defined ingress class to allow multiple leaders
// in order to update information about ingress status // in order to update information about ingress status
electionID := fmt.Sprintf("%v-%v", config.ElectionID, config.DefaultIngressClass) electionID := fmt.Sprintf("%v-%v", config.ElectionID, ingress.DefaultIngressClass)
if config.IngressClass != "" { if ingress.IngressClass != "" {
electionID = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass) electionID = fmt.Sprintf("%v-%v", config.ElectionID, ingress.IngressClass)
} }
callbacks := leaderelection.LeaderCallbacks{ callbacks := leaderelection.LeaderCallbacks{
@ -314,7 +312,7 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
for _, cur := range ings { for _, cur := range ings {
ing := cur.(*extensions.Ingress) ing := cur.(*extensions.Ingress)
if !class.IsValid(ing, s.Config.IngressClass, s.Config.DefaultIngressClass) { if !class.IsValid(ing, ingress.IngressClass, ingress.DefaultIngressClass) {
continue continue
} }

View file

@ -261,8 +261,6 @@ func TestStatusActions(t *testing.T) {
Client: buildSimpleClientSet(), Client: buildSimpleClientSet(),
PublishService: "", PublishService: "",
IngressLister: buildIngressListener(), IngressLister: buildIngressListener(),
DefaultIngressClass: "nginx",
IngressClass: "",
UpdateStatusOnShutdown: true, UpdateStatusOnShutdown: true,
} }
// create object // create object

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package controller package store
import ( import (
"fmt" "fmt"
@ -25,53 +25,51 @@ import (
"github.com/imdario/mergo" "github.com/imdario/mergo"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/ingress-nginx/internal/ingress" "k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/net/ssl" "k8s.io/ingress-nginx/internal/net/ssl"
) )
// syncSecret keeps in sync Secrets used by Ingress rules with the files on // syncSecret keeps in sync Secrets used by Ingress rules with the files on
// disk to allow copy of the content of the secret to disk to be used // disk to allow copy of the content of the secret to disk to be used
// by external processes. // by external processes.
func (ic *NGINXController) syncSecret(key string) { func (s k8sStore) syncSecret(key string) {
glog.V(3).Infof("starting syncing of secret %v", key) glog.V(3).Infof("starting syncing of secret %v", key)
cert, err := ic.getPemCertificate(key) // cert
_, err := s.getPemCertificate(key)
if err != nil { if err != nil {
glog.Warningf("error obtaining PEM from secret %v: %v", key, err) glog.Warningf("error obtaining PEM from secret %v: %v", key, err)
return return
} }
/*
// create certificates and add or update the item in the store // create certificates and add or update the item in the store
cur, exists := ic.sslCertTracker.Get(key) cur, exists := s.GetSecret(key)
if exists { if exists {
s := cur.(*ingress.SSLCert) s := cur.(*ingress.SSLCert)
if s.Equal(cert) { if s.Equal(cert) {
// no need to update // no need to update
return
}
glog.Infof("updating secret %v in the local store", key)
ic.store.UpdateLocalSecret(key, cert)
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.syncQueue.Enqueue(&extensions.Ingress{})
return return
} }
glog.Infof("updating secret %v in the local store", key)
ic.sslCertTracker.Update(key, cert) glog.Infof("adding secret %v to the local store", key)
ic.store.AddLocalSecret(key, cert)
// this update must trigger an update // this update must trigger an update
// (like an update event from a change in Ingress) // (like an update event from a change in Ingress)
ic.syncQueue.Enqueue(&extensions.Ingress{}) ic.syncQueue.Enqueue(&extensions.Ingress{})*/
return
}
glog.Infof("adding secret %v to the local store", key)
ic.sslCertTracker.Add(key, cert)
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.syncQueue.Enqueue(&extensions.Ingress{})
} }
// getPemCertificate receives a secret, and creates a ingress.SSLCert as return. // getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
// It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only. // It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCert, error) { func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
secret, err := ic.listers.Secret.GetByName(secretName) secret, err := s.listers.Secret.GetByNamespaceName(secretName)
if err != nil { if err != nil {
return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err) return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err)
} }
@ -83,7 +81,7 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer
// namespace/secretName -> namespace-secretName // namespace/secretName -> namespace-secretName
nsSecName := strings.Replace(secretName, "/", "-", -1) nsSecName := strings.Replace(secretName, "/", "-", -1)
var s *ingress.SSLCert var sslCert *ingress.SSLCert
if okcert && okkey { if okcert && okkey {
if cert == nil { if cert == nil {
return nil, fmt.Errorf("secret %v has no 'tls.crt'", secretName) return nil, fmt.Errorf("secret %v has no 'tls.crt'", secretName)
@ -94,18 +92,18 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer
// If 'ca.crt' is also present, it will allow this secret to be used in the // If 'ca.crt' is also present, it will allow this secret to be used in the
// 'nginx.ingress.kubernetes.io/auth-tls-secret' annotation // 'nginx.ingress.kubernetes.io/auth-tls-secret' annotation
s, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca) sslCert, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca)
if err != nil { if err != nil {
return nil, fmt.Errorf("unexpected error creating pem file: %v", err) return nil, fmt.Errorf("unexpected error creating pem file: %v", err)
} }
/*
glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, s.CN) glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, s.CN)
if ca != nil { if ca != nil {
glog.V(3).Infof("found 'ca.crt', secret %v can also be used for Certificate Authentication", secretName) glog.V(3).Infof("found 'ca.crt', secret %v can also be used for Certificate Authentication", secretName)
} }
*/
} else if ca != nil { } else if ca != nil {
s, err = ssl.AddCertAuth(nsSecName, ca) sslCert, err = ssl.AddCertAuth(nsSecName, ca)
if err != nil { if err != nil {
return nil, fmt.Errorf("unexpected error creating pem file: %v", err) return nil, fmt.Errorf("unexpected error creating pem file: %v", err)
@ -119,15 +117,20 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer
return nil, fmt.Errorf("no keypair or CA cert could be found in %v", secretName) return nil, fmt.Errorf("no keypair or CA cert could be found in %v", secretName)
} }
s.Name = secret.Name sslCert.Name = secret.Name
s.Namespace = secret.Namespace sslCert.Namespace = secret.Namespace
return s, nil
return sslCert, nil
} }
func (ic *NGINXController) checkSSLChainIssues() { func (s k8sStore) checkSSLChainIssues() {
for _, secretName := range ic.sslCertTracker.ListKeys() { for _, item := range s.ListLocalSecrets() {
s, _ := ic.sslCertTracker.Get(secretName) secretName := fmt.Sprintf("%v/%v", item.Namespace, item.Name)
secret := s.(*ingress.SSLCert)
secret, err := s.GetLocalSecret(secretName)
if err != nil {
continue
}
if secret.FullChainPemFileName != "" { if secret.FullChainPemFileName != "" {
// chain already checked // chain already checked
@ -158,42 +161,37 @@ func (ic *NGINXController) checkSSLChainIssues() {
dst.FullChainPemFileName = fullChainPemFileName dst.FullChainPemFileName = fullChainPemFileName
glog.Infof("updating local copy of ssl certificate %v with missing intermediate CA certs", secretName) glog.Infof("updating local copy of ssl certificate %v with missing intermediate CA certs", secretName)
ic.sslCertTracker.Update(secretName, dst) s.sslStore.Update(secretName, dst)
// this update must trigger an update // this update must trigger an update
// (like an update event from a change in Ingress) // (like an update event from a change in Ingress)
ic.syncQueue.Enqueue(&extensions.Ingress{}) //ic.syncQueue.Enqueue(&extensions.Ingress{})
} }
} }
// checkMissingSecrets verify if one or more ingress rules contains a reference // checkMissingSecrets verify if one or more ingress rules contains a reference
// to a secret that is not present in the local secret store. // to a secret that is not present in the local secret store.
// In this case we call syncSecret. // In this case we call syncSecret.
func (ic *NGINXController) checkMissingSecrets() { func (s k8sStore) checkMissingSecrets() {
for _, obj := range ic.listers.Ingress.List() { for _, ing := range s.ListIngresses() {
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
continue
}
for _, tls := range ing.Spec.TLS { for _, tls := range ing.Spec.TLS {
if tls.SecretName == "" { if tls.SecretName == "" {
continue continue
} }
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName) key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
if _, ok := ic.sslCertTracker.Get(key); !ok { if _, ok := s.sslStore.Get(key); !ok {
ic.syncSecret(key) s.syncSecret(key)
} }
} }
key, _ := parser.GetStringAnnotation("auth-tls-secret", ing, ic) /*
if key == "" { key, _ := parser.GetStringAnnotation("auth-tls-secret", ing, resolver)
continue if key == "" {
} continue
}
if _, ok := ic.sslCertTracker.Get(key); !ok { if _, ok := ic.sslCertTracker.Get(key); !ok {
ic.syncSecret(key) ic.syncSecret(key)
} }*/
} }
} }

View file

@ -14,24 +14,20 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package controller package store
import ( import (
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"testing"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testclient "k8s.io/client-go/kubernetes/fake" testclient "k8s.io/client-go/kubernetes/fake"
cache_client "k8s.io/client-go/tools/cache" cache_client "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/api"
"k8s.io/ingress-nginx/internal/ingress" "k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/store"
"k8s.io/ingress-nginx/internal/task"
"k8s.io/kubernetes/pkg/api"
) )
const ( const (
@ -66,8 +62,8 @@ func buildSimpleClientSetForBackendSSL() *testclient.Clientset {
return testclient.NewSimpleClientset() return testclient.NewSimpleClientset()
} }
func buildIngListenerForBackendSSL() store.IngressLister { func buildIngListenerForBackendSSL() IngressLister {
ingLister := store.IngressLister{} ingLister := IngressLister{}
ingLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) ingLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
return ingLister return ingLister
} }
@ -81,20 +77,21 @@ func buildSecretForBackendSSL() *apiv1.Secret {
} }
} }
func buildSecrListerForBackendSSL() store.SecretLister { func buildSecrListerForBackendSSL() SecretLister {
secrLister := store.SecretLister{} secrLister := SecretLister{}
secrLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) secrLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
return secrLister return secrLister
} }
/*
func buildListers() *ingress.StoreLister { func buildListers() *ingress.StoreLister {
sl := &ingress.StoreLister{} sl := &ingress.StoreLister{}
sl.Ingress.Store = buildIngListenerForBackendSSL() sl.Ingress.Store = buildIngListenerForBackendSSL()
sl.Secret.Store = buildSecrListerForBackendSSL() sl.Secret.Store = buildSecrListerForBackendSSL()
return sl return sl
} }
*/
func buildControllerForBackendSSL() cache_client.Controller { func buildControllerForBackendSSL() cache_client.Controller {
cfg := &cache_client.Config{ cfg := &cache_client.Config{
Queue: &MockQueue{Synced: true}, Queue: &MockQueue{Synced: true},
@ -103,6 +100,7 @@ func buildControllerForBackendSSL() cache_client.Controller {
return cache_client.New(cfg) return cache_client.New(cfg)
} }
/*
func buildGenericControllerForBackendSSL() *NGINXController { func buildGenericControllerForBackendSSL() *NGINXController {
gc := &NGINXController{ gc := &NGINXController{
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1), syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
@ -110,13 +108,13 @@ func buildGenericControllerForBackendSSL() *NGINXController {
Client: buildSimpleClientSetForBackendSSL(), Client: buildSimpleClientSetForBackendSSL(),
}, },
listers: buildListers(), listers: buildListers(),
sslCertTracker: store.NewSSLCertTracker(), sslCertTracker: NewSSLCertTracker(),
} }
gc.syncQueue = task.NewTaskQueue(gc.syncIngress) gc.syncQueue = task.NewTaskQueue(gc.syncIngress)
return gc return gc
} }
*/
func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) { func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) {
// prepare // prepare
td, err := ioutil.TempDir("", "ssl") td, err := ioutil.TempDir("", "ssl")
@ -140,6 +138,7 @@ func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) {
return dCrt, dKey, dCa, nil return dCrt, dKey, dCa, nil
} }
/*
func TestSyncSecret(t *testing.T) { func TestSyncSecret(t *testing.T) {
// prepare for test // prepare for test
dCrt, dKey, dCa, err := buildCrtKeyAndCA() dCrt, dKey, dCa, err := buildCrtKeyAndCA()
@ -232,3 +231,4 @@ func TestGetPemCertificate(t *testing.T) {
}) })
} }
} }
*/

View file

@ -0,0 +1,41 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// ConfigMapLister makes a Store that lists Configmaps.
type ConfigMapLister struct {
cache.Store
}
// GetByNamespaceName searches for a configmap in the local configmaps Store
func (cml *ConfigMapLister) GetByNamespaceName(key string) (*apiv1.ConfigMap, error) {
s, exists, err := cml.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("configmap %v was not found", key)
}
return s.(*apiv1.ConfigMap), nil
}

View file

@ -0,0 +1,40 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// EndpointLister makes a Store that lists Endpoints.
type EndpointLister struct {
cache.Store
}
// GetServiceEndpoints returns the endpoints of a service, matched on service name.
func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
for _, m := range s.Store.List() {
ep := m.(*apiv1.Endpoints)
if svc.Name == ep.Name && svc.Namespace == ep.Namespace {
return ep, nil
}
}
return nil, fmt.Errorf("could not find endpoints for service: %v", svc.Name)
}

View file

@ -0,0 +1,41 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
)
// IngressLister makes a Store that lists Ingress.
type IngressLister struct {
cache.Store
}
// GetByNamespaceName searches for an ingress in the local ingress Store
func (il IngressLister) GetByNamespaceName(key string) (*extensions.Ingress, error) {
i, exists, err := il.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("ingress %v was not found", key)
}
return i.(*extensions.Ingress), nil
}

View file

@ -0,0 +1,26 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"k8s.io/client-go/tools/cache"
)
// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules.
type IngressAnnotationsLister struct {
cache.Store
}

View file

@ -0,0 +1,30 @@
package store
import (
"fmt"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-nginx/internal/ingress"
)
// SSLCertTracker holds a store of referenced Secrets in Ingress rules
type SSLCertTracker struct {
cache.ThreadSafeStore
}
// NewSSLCertTracker creates a new SSLCertTracker store
func NewSSLCertTracker() *SSLCertTracker {
return &SSLCertTracker{
cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
}
}
// GetByNamespaceName searches for an ingress in the local ingress Store
func (s SSLCertTracker) GetByNamespaceName(key string) (*ingress.SSLCert, error) {
cert, exists := s.Get(key)
if !exists {
return nil, fmt.Errorf("local SSL certificate %v was not found", key)
}
return cert.(*ingress.SSLCert), nil
}

View file

@ -1,113 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// IngressLister makes a Store that lists Ingress.
type IngressLister struct {
cache.Store
}
// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules.
type IngressAnnotationsLister struct {
cache.Store
}
// SecretLister makes a Store that lists Secrets.
type SecretLister struct {
cache.Store
}
// GetByName searches for a secret in the local secrets Store
func (sl *SecretLister) GetByName(name string) (*apiv1.Secret, error) {
s, exists, err := sl.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("secret %v was not found", name)
}
return s.(*apiv1.Secret), nil
}
// ConfigMapLister makes a Store that lists Configmaps.
type ConfigMapLister struct {
cache.Store
}
// GetByName searches for a configmap in the local configmaps Store
func (cml *ConfigMapLister) GetByName(name string) (*apiv1.ConfigMap, error) {
s, exists, err := cml.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("configmap %v was not found", name)
}
return s.(*apiv1.ConfigMap), nil
}
// ServiceLister makes a Store that lists Services.
type ServiceLister struct {
cache.Store
}
// GetByName searches for a service in the local secrets Store
func (sl *ServiceLister) GetByName(name string) (*apiv1.Service, error) {
s, exists, err := sl.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("service %v was not found", name)
}
return s.(*apiv1.Service), nil
}
// EndpointLister makes a Store that lists Endpoints.
type EndpointLister struct {
cache.Store
}
// GetServiceEndpoints returns the endpoints of a service, matched on service name.
func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
for _, m := range s.Store.List() {
ep := m.(*apiv1.Endpoints)
if svc.Name == ep.Name && svc.Namespace == ep.Namespace {
return ep, nil
}
}
return nil, fmt.Errorf("could not find endpoints for service: %v", svc.Name)
}
// SSLCertTracker holds a store of referenced Secrets in Ingress rules
type SSLCertTracker struct {
cache.ThreadSafeStore
}
// NewSSLCertTracker creates a new SSLCertTracker store
func NewSSLCertTracker() *SSLCertTracker {
return &SSLCertTracker{
cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
}
}

View file

@ -0,0 +1,41 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// SecretLister makes a Store that lists Secrets.
type SecretLister struct {
cache.Store
}
// GetByNamespaceName searches for a secret in the local secrets Store
func (sl *SecretLister) GetByNamespaceName(key string) (*apiv1.Secret, error) {
s, exists, err := sl.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("secret %v was not found", key)
}
return s.(*apiv1.Secret), nil
}

View file

@ -0,0 +1,41 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// ServiceLister makes a Store that lists Services.
type ServiceLister struct {
cache.Store
}
// GetByNamespaceName searches for a service in the local secrets Store
func (sl *ServiceLister) GetByNamespaceName(key string) (*apiv1.Service, error) {
s, exists, err := sl.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("service %v was not found", key)
}
return s.(*apiv1.Service), nil
}

View file

@ -0,0 +1,412 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"reflect"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
cache_client "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/resolver"
)
// Storer is the interface that wraps the required methods to gather information
// about ingresses, services, secrets and ingress annotations.
type Storer interface {
GetConfigMap(key string) (*apiv1.ConfigMap, error)
// GetSecret returns a Secret using the namespace and name as key
GetSecret(key string) (*apiv1.Secret, error)
// GetService returns a Service using the namespace and name as key
GetService(key string) (*apiv1.Service, error)
GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error)
// GetSecret returns an Ingress using the namespace and name as key
GetIngress(key string) (*extensions.Ingress, error)
// ListIngresses returns the list of Ingresses
ListIngresses() []*extensions.Ingress
// GetIngressAnnotations returns the annotations associated to an Ingress
GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error)
// GetLocalSecret returns the local copy of a Secret
GetLocalSecret(name string) (*ingress.SSLCert, error)
// ListLocalSecrets returns the list of local Secrets
ListLocalSecrets() []*ingress.SSLCert
// StartSync initiates the synchronization of the controllers
StartSync(stopCh chan struct{})
}
// lister returns the stores for ingresses, services, endpoints, secrets and configmaps.
type lister struct {
Ingress IngressLister
Service ServiceLister
Endpoint EndpointLister
Secret SecretLister
ConfigMap ConfigMapLister
IngressAnnotation IngressAnnotationsLister
}
// controller defines the required controllers that interact agains the api server
type controller struct {
Ingress cache.Controller
Endpoint cache.Controller
Service cache.Controller
Secret cache.Controller
Configmap cache.Controller
}
// Run initiates the synchronization of the controllers against the api server
func (c *controller) Run(stopCh chan struct{}) {
go c.Ingress.Run(stopCh)
go c.Endpoint.Run(stopCh)
go c.Service.Run(stopCh)
go c.Secret.Run(stopCh)
go c.Configmap.Run(stopCh)
// wait for all involved caches to be synced, before processing items
// from the queue is started
if !cache.WaitForCacheSync(stopCh,
c.Ingress.HasSynced,
c.Endpoint.HasSynced,
c.Service.HasSynced,
c.Secret.HasSynced,
c.Configmap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
}
type k8sStore struct {
cache *controller
// listers
listers *lister
// sslStore local store of SSL certificates (certificates used in ingress)
sslStore *SSLCertTracker
// annotations parser
annotations annotations.Extractor
}
// New creates a new object store to be used in the ingress controller
func New(namespace, configmap, tcp, udp string,
resyncPeriod time.Duration,
recorder record.EventRecorder,
client clientset.Interface,
annotations annotations.Extractor,
r resolver.Resolver,
updateCh chan ingress.Event) Storer {
store := &k8sStore{
cache: &controller{},
listers: &lister{},
sslStore: NewSSLCertTracker(),
annotations: annotations,
}
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !class.IsValid(addIng, ingress.IngressClass, ingress.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng, r)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
return
}
store.extractAnnotations(addIng)
recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
updateCh <- ingress.Event{
Type: ingress.CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
delIng, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(delIng, ingress.IngressClass, ingress.DefaultIngressClass) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
return
}
recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
store.listers.IngressAnnotation.Delete(delIng)
updateCh <- ingress.Event{
Type: ingress.DeleteEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng, ingress.IngressClass, ingress.DefaultIngressClass)
validCur := class.IsValid(curIng, ingress.IngressClass, ingress.DefaultIngressClass)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
store.extractAnnotations(curIng)
updateCh <- ingress.Event{
Type: ingress.UpdateEvent,
Obj: cur,
}
},
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*apiv1.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
_, exists := store.sslStore.Get(key)
if exists {
updateCh <- ingress.Event{
Type: ingress.UpdateEvent,
Obj: cur,
}
}
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*apiv1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
sec, ok = tombstone.Obj.(*apiv1.Secret)
if !ok {
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
return
}
}
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
store.sslStore.Delete(key)
updateCh <- ingress.Event{
Type: ingress.DeleteEvent,
Obj: obj,
}
},
}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh <- ingress.Event{
Type: ingress.CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
updateCh <- ingress.Event{
Type: ingress.DeleteEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
updateCh <- ingress.Event{
Type: ingress.UpdateEvent,
Obj: cur,
}
}
},
}
mapEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
upCmap := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == configmap {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
updateCh <- ingress.Event{
Type: ingress.CreateEvent,
Obj: obj,
}
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upCmap := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == configmap {
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
updateCh <- ingress.Event{
Type: ingress.UpdateEvent,
Obj: cur,
}
}
// updates to configuration configmaps can trigger an update
if mapKey == configmap || mapKey == tcp || mapKey == udp {
recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
updateCh <- ingress.Event{
Type: ingress.UpdateEvent,
Obj: cur,
}
}
}
},
}
store.listers.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
store.listers.Ingress.Store, store.cache.Ingress = cache.NewInformer(
cache.NewListWatchFromClient(client.ExtensionsV1beta1().RESTClient(), "ingresses", namespace, fields.Everything()),
&extensions.Ingress{}, resyncPeriod, ingEventHandler)
store.listers.Endpoint.Store, store.cache.Endpoint = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", namespace, fields.Everything()),
&apiv1.Endpoints{}, resyncPeriod, eventHandler)
store.listers.Secret.Store, store.cache.Secret = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "secrets", namespace, fields.Everything()),
&apiv1.Secret{}, resyncPeriod, secrEventHandler)
store.listers.ConfigMap.Store, store.cache.Configmap = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()),
&apiv1.ConfigMap{}, resyncPeriod, mapEventHandler)
store.listers.Service.Store, store.cache.Service = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "services", namespace, fields.Everything()),
&apiv1.Service{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
return store
}
func (s k8sStore) extractAnnotations(ing *extensions.Ingress) {
anns := s.annotations.Extract(ing)
glog.V(3).Infof("updating annotations information for ingres %v/%v", anns.Namespace, anns.Name)
s.listers.IngressAnnotation.Update(anns)
}
// GetSecret returns a Secret using the namespace and name as key
func (s k8sStore) GetSecret(key string) (*apiv1.Secret, error) {
return s.listers.Secret.GetByNamespaceName(key)
}
// ListLocalSecrets returns the list of local Secrets
func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert {
var certs []*ingress.SSLCert
for _, item := range s.sslStore.List() {
if s, ok := item.(*ingress.SSLCert); ok {
certs = append(certs, s)
}
}
return certs
}
// GetService returns a Service using the namespace and name as key
func (s k8sStore) GetService(key string) (*apiv1.Service, error) {
return s.listers.Service.GetByNamespaceName(key)
}
// GetSecret returns an Ingress using the namespace and name as key
func (s k8sStore) GetIngress(key string) (*extensions.Ingress, error) {
return s.listers.Ingress.GetByNamespaceName(key)
}
// ListIngresses returns the list of Ingresses
func (s k8sStore) ListIngresses() []*extensions.Ingress {
// filter ingress rules
var ingresses []*extensions.Ingress
for _, item := range s.listers.Ingress.List() {
ing := item.(*extensions.Ingress)
if !class.IsValid(ing, ingress.IngressClass, ingress.DefaultIngressClass) {
continue
}
ingresses = append(ingresses, ing)
}
return ingresses
}
// GetIngressAnnotations returns the annotations associated to an Ingress
func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
item, exists, err := s.listers.IngressAnnotation.GetByKey(key)
if err != nil {
return nil, fmt.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
}
if !exists {
return nil, fmt.Errorf("ingress annotation %v was not found", key)
}
return item.(*annotations.Ingress), nil
}
// GetLocalSecret returns the local copy of a Secret
func (s k8sStore) GetLocalSecret(key string) (*ingress.SSLCert, error) {
return s.sslStore.GetByNamespaceName(key)
}
func (s k8sStore) GetConfigMap(key string) (*apiv1.ConfigMap, error) {
return s.listers.ConfigMap.GetByNamespaceName(key)
}
func (s k8sStore) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
return s.listers.Endpoint.GetServiceEndpoints(svc)
}
// StartSync initiates the synchronization of the controllers
func (s k8sStore) StartSync(stopCh chan struct{}) {
// start controllers
s.cache.Run(stopCh)
// start goroutine to check for missing local secrets
go wait.Until(s.checkMissingSecrets, 30*time.Second, stopCh)
}

View file

@ -0,0 +1,314 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"os"
"sync/atomic"
"testing"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
extensions "k8s.io/api/extensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/test/e2e/framework"
)
func TestStore(t *testing.T) {
// TODO: find a way to avoid the need to use a real api server
home := os.Getenv("HOME")
kubeConfigFile := fmt.Sprintf("%v/.kube/config", home)
kubeContext := ""
kubeConfig, err := framework.LoadConfig(kubeConfigFile, kubeContext)
if err != nil {
t.Errorf("unexpected error loading kubeconfig file: %v", err)
}
clientSet, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
t.Errorf("unexpected error creating ingress client: %v", err)
}
t.Run("should return an error searching for non existing objects", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
defer close(stopCh)
updateCh := make(chan ingress.Event)
defer close(updateCh)
go func(ch chan ingress.Event) {
for {
<-ch
}
}(updateCh)
storer := New(ns.Name,
fmt.Sprintf("%v/config", ns.Name),
fmt.Sprintf("%v/tcp", ns.Name),
fmt.Sprintf("%v/udp", ns.Name),
10*time.Minute,
&record.FakeRecorder{},
clientSet,
annotations.Extractor{},
resolver.Mock{},
updateCh)
storer.StartSync(stopCh)
key := fmt.Sprintf("%v/anything", ns.Name)
ing, err := storer.GetIngress(key)
if err == nil {
t.Errorf("expected an error but none returned")
}
if ing != nil {
t.Errorf("expected an Ingres but none returned")
}
ls, err := storer.GetLocalSecret(key)
if err == nil {
t.Errorf("expected an error but none returned")
}
if ls != nil {
t.Errorf("expected an Ingres but none returned")
}
s, err := storer.GetSecret(key)
if err == nil {
t.Errorf("expected an error but none returned")
}
if s != nil {
t.Errorf("expected an Ingres but none returned")
}
svc, err := storer.GetService(key)
if err == nil {
t.Errorf("expected an error but none returned")
}
if svc != nil {
t.Errorf("expected an Ingres but none returned")
}
})
t.Run("should return ingress one event for add, update and delete", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
defer close(stopCh)
updateCh := make(chan ingress.Event)
defer close(updateCh)
var add uint64
var upd uint64
var del uint64
go func(ch chan ingress.Event) {
for {
e := <-ch
if e.Obj == nil {
continue
}
if _, ok := e.Obj.(*extensions.Ingress); !ok {
t.Errorf("expected an Ingress type but %T returned", e.Obj)
}
switch e.Type {
case ingress.CreateEvent:
atomic.AddUint64(&add, 1)
break
case ingress.UpdateEvent:
atomic.AddUint64(&upd, 1)
break
case ingress.DeleteEvent:
atomic.AddUint64(&del, 1)
break
}
}
}(updateCh)
storer := New(ns.Name,
fmt.Sprintf("%v/config", ns.Name),
fmt.Sprintf("%v/tcp", ns.Name),
fmt.Sprintf("%v/udp", ns.Name),
10*time.Minute,
&record.FakeRecorder{},
clientSet,
annotations.Extractor{},
resolver.Mock{},
updateCh)
storer.StartSync(stopCh)
ing, err := ensureIngress(&v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "dummy",
Namespace: ns.Name,
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "dummy",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/",
Backend: v1beta1.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
},
},
}, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
// create an invalid ingress (different class)
_, err = ensureIngress(&v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "custom-class",
Namespace: ns.Name,
Annotations: map[string]string{
"kubernetes.io/ingress.class": "something",
},
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "dummy",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/",
Backend: v1beta1.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
},
},
}, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
ni := ing.DeepCopy()
ni.Spec.Rules[0].Host = "update-dummy"
_, err = ensureIngress(ni, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
err = clientSet.ExtensionsV1beta1().
Ingresses(ni.Namespace).
Delete(ni.Name, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
waitForNoIngressInNamespace(clientSet, ni.Namespace, ni.Name)
if atomic.LoadUint64(&add) != 1 {
t.Errorf("expected 1 event of type Create but %v ocurred", add)
}
if atomic.LoadUint64(&upd) != 1 {
t.Errorf("expected 1 event of type Update but %v ocurred", upd)
}
if atomic.LoadUint64(&del) != 1 {
t.Errorf("expected 1 event of type Delete but %v ocurred", del)
}
})
}
func createNamespace(clientSet *kubernetes.Clientset, t *testing.T) *apiv1.Namespace {
t.Log("creating temporal namespace")
ns, err := framework.CreateKubeNamespace("store-test", clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress client: %v", err)
}
t.Logf("temporal namespace %v created", ns.Name)
return ns
}
func deleteNamespace(ns *apiv1.Namespace, clientSet *kubernetes.Clientset, t *testing.T) {
t.Logf("deleting temporal namespace %v created", ns.Name)
err := framework.DeleteKubeNamespace(clientSet, ns.Name)
if err != nil {
t.Errorf("unexpected error creating ingress client: %v", err)
}
t.Logf("temporal namespace %v deleted", ns.Name)
}
func ensureIngress(ingress *extensions.Ingress, clientSet *kubernetes.Clientset) (*extensions.Ingress, error) {
s, err := clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress)
if err != nil {
if k8sErrors.IsNotFound(err) {
return clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(ingress)
}
return nil, err
}
return s, nil
}
func waitForNoIngressInNamespace(c kubernetes.Interface, namespace, name string) error {
return wait.PollImmediate(1*time.Second, time.Minute*2, noIngressInNamespace(c, namespace, name))
}
func noIngressInNamespace(c kubernetes.Interface, namespace, name string) wait.ConditionFunc {
return func() (bool, error) {
ing, err := c.ExtensionsV1beta1().Ingresses(namespace).Get(name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return true, nil
}
if err != nil {
return false, err
}
if ing == nil {
return true, nil
}
return false, nil
}
}

View file

@ -33,7 +33,6 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/redirect" "k8s.io/ingress-nginx/internal/ingress/annotations/redirect"
"k8s.io/ingress-nginx/internal/ingress/annotations/rewrite" "k8s.io/ingress-nginx/internal/ingress/annotations/rewrite"
"k8s.io/ingress-nginx/internal/ingress/resolver" "k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/internal/ingress/store"
) )
var ( var (
@ -42,17 +41,26 @@ var (
// The name of each file is <namespace>-<secret name>.pem. The content is the concatenated // The name of each file is <namespace>-<secret name>.pem. The content is the concatenated
// certificate and key. // certificate and key.
DefaultSSLDirectory = "/ingress-controller/ssl" DefaultSSLDirectory = "/ingress-controller/ssl"
// DefaultIngressClass defines the default ingress class of the ingress controller
DefaultIngressClass = "nginx"
// IngressClass contains the configured ingress class.
// By default is empty
IngressClass = ""
) )
// StoreLister returns the configured stores for ingresses, services, type EventType string
// endpoints, secrets and configmaps.
type StoreLister struct { const (
Ingress store.IngressLister CreateEvent EventType = "CREATE"
Service store.ServiceLister UpdateEvent EventType = "UPDATE"
Endpoint store.EndpointLister DeleteEvent EventType = "DELETE"
Secret store.SecretLister )
ConfigMap store.ConfigMapLister
IngressAnnotation store.IngressAnnotationsLister type Event struct {
Type EventType
Obj interface{}
} }
// Configuration holds the definition of all the parts required to describe all // Configuration holds the definition of all the parts required to describe all