Merge dee4056cf0
into adfbc8cc84
This commit is contained in:
commit
f6f386cc82
23 changed files with 1182 additions and 571 deletions
|
@ -34,6 +34,9 @@ jobs:
|
|||
- go get github.com/golang/lint/golint
|
||||
- make fmt lint vet
|
||||
- stage: Coverage
|
||||
before_script:
|
||||
- make e2e-image
|
||||
- test/e2e/up.sh
|
||||
script:
|
||||
- go get github.com/mattn/goveralls
|
||||
- go get github.com/modocache/gover
|
||||
|
|
|
@ -27,15 +27,12 @@ import (
|
|||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
"k8s.io/ingress-nginx/internal/ingress/controller"
|
||||
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
|
||||
ing_net "k8s.io/ingress-nginx/internal/net"
|
||||
)
|
||||
|
||||
const (
|
||||
defIngressClass = "nginx"
|
||||
)
|
||||
|
||||
func parseFlags() (bool, *controller.Configuration, error) {
|
||||
var (
|
||||
flags = pflag.NewFlagSet("", pflag.ExitOnError)
|
||||
|
@ -157,6 +154,8 @@ func parseFlags() (bool, *controller.Configuration, error) {
|
|||
}
|
||||
}
|
||||
|
||||
ingress.IngressClass = *ingressClass
|
||||
|
||||
// check port collisions
|
||||
if !ing_net.IsPortAvailable(*httpPort) {
|
||||
return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --http-port", *httpPort)
|
||||
|
@ -198,7 +197,6 @@ func parseFlags() (bool, *controller.Configuration, error) {
|
|||
EnableSSLChainCompletion: *enableSSLChainCompletion,
|
||||
ResyncPeriod: *resyncPeriod,
|
||||
DefaultService: *defaultSvc,
|
||||
IngressClass: *ingressClass,
|
||||
Namespace: *watchNamespace,
|
||||
ConfigMapName: *configMap,
|
||||
TCPConfigMapName: *tcpConfigMapName,
|
||||
|
|
|
@ -133,6 +133,7 @@ func main() {
|
|||
ngx := controller.NewNGINXController(conf)
|
||||
|
||||
if conf.EnableSSLPassthrough {
|
||||
glog.Info("setting up TLS proxy for SSL passthrough")
|
||||
setupSSLProxy(conf.ListenPorts.HTTPS, conf.ListenPorts.SSLProxy, ngx)
|
||||
}
|
||||
|
||||
|
|
|
@ -37,8 +37,6 @@ import (
|
|||
clientset "k8s.io/client-go/kubernetes"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/proxy"
|
||||
|
@ -76,8 +74,8 @@ type Configuration struct {
|
|||
|
||||
ConfigMapName string
|
||||
DefaultService string
|
||||
IngressClass string
|
||||
Namespace string
|
||||
|
||||
Namespace string
|
||||
|
||||
ForceNamespaceIsolation bool
|
||||
|
||||
|
@ -117,26 +115,6 @@ func (n NGINXController) GetDefaultBackend() defaults.Backend {
|
|||
return n.backendDefaults
|
||||
}
|
||||
|
||||
// GetPublishService returns the configured service used to set ingress status
|
||||
func (n NGINXController) GetPublishService() *apiv1.Service {
|
||||
s, err := n.listers.Service.GetByName(n.cfg.PublishService)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// GetSecret searches for a secret in the local secrets Store
|
||||
func (n NGINXController) GetSecret(name string) (*apiv1.Secret, error) {
|
||||
return n.listers.Secret.GetByName(name)
|
||||
}
|
||||
|
||||
// GetService searches for a service in the local secrets Store
|
||||
func (n NGINXController) GetService(name string) (*apiv1.Service, error) {
|
||||
return n.listers.Service.GetByName(name)
|
||||
}
|
||||
|
||||
// GetAnnotationWithPrefix returns the prefix of ingress annotations
|
||||
func (n NGINXController) GetAnnotationWithPrefix(suffix string) string {
|
||||
return fmt.Sprintf("%v/%v", n.cfg.AnnotationsPrefix, suffix)
|
||||
|
@ -154,32 +132,20 @@ func (n *NGINXController) syncIngress(item interface{}) error {
|
|||
|
||||
if element, ok := item.(task.Element); ok {
|
||||
if name, ok := element.Key.(string); ok {
|
||||
if obj, exists, _ := n.listers.Ingress.GetByKey(name); exists {
|
||||
ing := obj.(*extensions.Ingress)
|
||||
if ing, err := n.store.GetIngress(name); err == nil {
|
||||
n.readSecrets(ing)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort ingress rules using the ResourceVersion field
|
||||
ings := n.listers.Ingress.List()
|
||||
sort.SliceStable(ings, func(i, j int) bool {
|
||||
ir := ings[i].(*extensions.Ingress).ResourceVersion
|
||||
jr := ings[j].(*extensions.Ingress).ResourceVersion
|
||||
ingresses := n.store.ListIngresses()
|
||||
sort.SliceStable(ingresses, func(i, j int) bool {
|
||||
ir := ingresses[i].ResourceVersion
|
||||
jr := ingresses[j].ResourceVersion
|
||||
return ir < jr
|
||||
})
|
||||
|
||||
// filter ingress rules
|
||||
var ingresses []*extensions.Ingress
|
||||
for _, ingIf := range ings {
|
||||
ing := ingIf.(*extensions.Ingress)
|
||||
if !class.IsValid(ing, n.cfg.IngressClass, n.cfg.DefaultIngressClass) {
|
||||
continue
|
||||
}
|
||||
|
||||
ingresses = append(ingresses, ing)
|
||||
}
|
||||
|
||||
upstreams, servers := n.getBackendServers(ingresses)
|
||||
var passUpstreams []*ingress.SSLPassthroughBackend
|
||||
|
||||
|
@ -248,7 +214,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
|
|||
return []ingress.L4Service{}
|
||||
}
|
||||
|
||||
configmap, err := n.listers.ConfigMap.GetByName(configmapName)
|
||||
configmap, err := n.store.GetConfigMap(configmapName)
|
||||
if err != nil {
|
||||
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
|
||||
return []ingress.L4Service{}
|
||||
|
@ -306,19 +272,12 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
|
|||
continue
|
||||
}
|
||||
|
||||
svcObj, svcExists, err := n.listers.Service.GetByKey(nsName)
|
||||
svc, err := n.store.GetService(nsName)
|
||||
if err != nil {
|
||||
glog.Warningf("error getting service %v: %v", nsName, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !svcExists {
|
||||
glog.Warningf("service %v was not found", nsName)
|
||||
continue
|
||||
}
|
||||
|
||||
svc := svcObj.(*apiv1.Service)
|
||||
|
||||
var endps []ingress.Endpoint
|
||||
targetPort, err := strconv.Atoi(svcPort)
|
||||
if err != nil {
|
||||
|
@ -375,20 +334,13 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
|
|||
Name: defUpstreamName,
|
||||
}
|
||||
svcKey := n.cfg.DefaultService
|
||||
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey)
|
||||
svc, err := n.store.GetService(svcKey)
|
||||
if err != nil {
|
||||
glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err)
|
||||
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
|
||||
return upstream
|
||||
}
|
||||
|
||||
if !svcExists {
|
||||
glog.Warningf("service %v does not exist", svcKey)
|
||||
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
|
||||
return upstream
|
||||
}
|
||||
|
||||
svc := svcObj.(*apiv1.Service)
|
||||
endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{})
|
||||
if len(endps) == 0 {
|
||||
glog.Warningf("service %v does not have any active endpoints", svcKey)
|
||||
|
@ -408,7 +360,10 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
|
|||
servers := n.createServers(ingresses, upstreams, du)
|
||||
|
||||
for _, ing := range ingresses {
|
||||
anns := n.getIngressAnnotations(ing)
|
||||
anns, err := n.store.GetIngressAnnotations(ing)
|
||||
if err != nil {
|
||||
glog.Warningf("%v", err)
|
||||
}
|
||||
|
||||
for _, rule := range ing.Spec.Rules {
|
||||
host := rule.Host
|
||||
|
@ -622,20 +577,19 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
|
|||
|
||||
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
|
||||
func (n NGINXController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
|
||||
if _, exists := n.sslCertTracker.Get(name); !exists {
|
||||
if _, err := n.store.GetLocalSecret(name); err != nil {
|
||||
n.syncSecret(name)
|
||||
}
|
||||
|
||||
_, err := n.listers.Secret.GetByName(name)
|
||||
_, err := n.store.GetLocalSecret(name)
|
||||
if err != nil {
|
||||
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
bc, exists := n.sslCertTracker.Get(name)
|
||||
if !exists {
|
||||
return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name)
|
||||
cert, err := n.store.GetLocalSecret(name)
|
||||
if err != nil {
|
||||
return &resolver.AuthSSLCert{}, err
|
||||
}
|
||||
cert := bc.(*ingress.SSLCert)
|
||||
return &resolver.AuthSSLCert{
|
||||
Secret: name,
|
||||
CAFileName: cert.CAFileName,
|
||||
|
@ -650,7 +604,10 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
|
|||
upstreams[defUpstreamName] = du
|
||||
|
||||
for _, ing := range data {
|
||||
anns := n.getIngressAnnotations(ing)
|
||||
anns, err := n.store.GetIngressAnnotations(ing)
|
||||
if err != nil {
|
||||
glog.Warningf("%v", err)
|
||||
}
|
||||
|
||||
var defBackend string
|
||||
if ing.Spec.Backend != nil {
|
||||
|
@ -737,7 +694,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
|
|||
upstreams[name].Endpoints = endp
|
||||
}
|
||||
|
||||
s, err := n.listers.Service.GetByName(svcKey)
|
||||
s, err := n.store.GetService(svcKey)
|
||||
if err != nil {
|
||||
glog.Warningf("error obtaining service: %v", err)
|
||||
continue
|
||||
|
@ -752,13 +709,11 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
|
|||
}
|
||||
|
||||
func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
|
||||
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey)
|
||||
|
||||
if !svcExists {
|
||||
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
|
||||
svc, err := n.store.GetService(svcKey)
|
||||
if err != nil {
|
||||
return endpoint, err
|
||||
}
|
||||
|
||||
svc := svcObj.(*apiv1.Service)
|
||||
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
|
||||
return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey)
|
||||
}
|
||||
|
@ -790,7 +745,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *exte
|
|||
// to a service.
|
||||
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
|
||||
hz *healthcheck.Config) ([]ingress.Endpoint, error) {
|
||||
svc, err := n.listers.Service.GetByName(svcKey)
|
||||
svc, err := n.store.GetService(svcKey)
|
||||
|
||||
var upstreams []ingress.Endpoint
|
||||
if err != nil {
|
||||
|
@ -915,7 +870,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
|
|||
|
||||
// initialize all the servers
|
||||
for _, ing := range data {
|
||||
anns := n.getIngressAnnotations(ing)
|
||||
anns, err := n.store.GetIngressAnnotations(ing)
|
||||
if err != nil {
|
||||
glog.Warningf("%v", err)
|
||||
}
|
||||
|
||||
// default upstream server
|
||||
un := du.Name
|
||||
|
@ -966,7 +924,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
|
|||
|
||||
// configure default location, alias, and SSL
|
||||
for _, ing := range data {
|
||||
anns := n.getIngressAnnotations(ing)
|
||||
anns, err := n.store.GetIngressAnnotations(ing)
|
||||
if err != nil {
|
||||
glog.Warningf("%v", err)
|
||||
}
|
||||
|
||||
for _, rule := range ing.Spec.Rules {
|
||||
host := rule.Host
|
||||
|
@ -1031,13 +992,12 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
|
|||
}
|
||||
|
||||
key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName)
|
||||
bc, exists := n.sslCertTracker.Get(key)
|
||||
if !exists {
|
||||
glog.Warningf("ssl certificate \"%v\" does not exist in local store", key)
|
||||
cert, err := n.store.GetLocalSecret(key)
|
||||
if err != nil {
|
||||
glog.Warningf("%v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
cert := bc.(*ingress.SSLCert)
|
||||
err = cert.Certificate.VerifyHostname(host)
|
||||
if err != nil {
|
||||
glog.Warningf("ssl certificate %v does not contain a Common Name or Subject Alternative Name for host %v", key, host)
|
||||
|
@ -1107,7 +1067,7 @@ func (n *NGINXController) getEndpoints(
|
|||
}
|
||||
|
||||
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
|
||||
ep, err := n.listers.Endpoint.GetServiceEndpoints(s)
|
||||
ep, err := n.store.GetServiceEndpoints(s)
|
||||
if err != nil {
|
||||
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
|
||||
return upsServers
|
||||
|
@ -1187,24 +1147,3 @@ func (n *NGINXController) SetForceReload(shouldReload bool) {
|
|||
atomic.StoreInt32(&n.forceReload, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NGINXController) extractAnnotations(ing *extensions.Ingress) {
|
||||
anns := n.annotations.Extract(ing)
|
||||
glog.V(3).Infof("updating annotations information for ingres %v/%v", anns.Namespace, anns.Name)
|
||||
n.listers.IngressAnnotation.Update(anns)
|
||||
}
|
||||
|
||||
// getByIngress returns the parsed annotations from an Ingress
|
||||
func (n *NGINXController) getIngressAnnotations(ing *extensions.Ingress) *annotations.Ingress {
|
||||
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
|
||||
item, exists, err := n.listers.IngressAnnotation.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
|
||||
return &annotations.Ingress{}
|
||||
}
|
||||
if !exists {
|
||||
glog.Errorf("ingress annotation %v was not found", key)
|
||||
return &annotations.Ingress{}
|
||||
}
|
||||
return item.(*annotations.Ingress)
|
||||
}
|
||||
|
|
|
@ -1,228 +0,0 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
cache_client "k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
|
||||
)
|
||||
|
||||
type cacheController struct {
|
||||
Ingress cache.Controller
|
||||
Endpoint cache.Controller
|
||||
Service cache.Controller
|
||||
Secret cache.Controller
|
||||
Configmap cache.Controller
|
||||
}
|
||||
|
||||
func (c *cacheController) Run(stopCh chan struct{}) {
|
||||
go c.Ingress.Run(stopCh)
|
||||
go c.Endpoint.Run(stopCh)
|
||||
go c.Service.Run(stopCh)
|
||||
go c.Secret.Run(stopCh)
|
||||
go c.Configmap.Run(stopCh)
|
||||
|
||||
// Wait for all involved caches to be synced, before processing items from the queue is started
|
||||
if !cache.WaitForCacheSync(stopCh,
|
||||
c.Ingress.HasSynced,
|
||||
c.Endpoint.HasSynced,
|
||||
c.Service.HasSynced,
|
||||
c.Secret.HasSynced,
|
||||
c.Configmap.HasSynced,
|
||||
) {
|
||||
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NGINXController) createListers(stopCh chan struct{}) (*ingress.StoreLister, *cacheController) {
|
||||
ingEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
addIng := obj.(*extensions.Ingress)
|
||||
if !class.IsValid(addIng, n.cfg.IngressClass, defIngressClass) {
|
||||
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng, n)
|
||||
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
|
||||
return
|
||||
}
|
||||
|
||||
n.extractAnnotations(addIng)
|
||||
n.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
|
||||
n.syncQueue.Enqueue(obj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
delIng, ok := obj.(*extensions.Ingress)
|
||||
if !ok {
|
||||
// If we reached here it means the ingress was deleted but its final state is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
delIng, ok = tombstone.Obj.(*extensions.Ingress)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
if !class.IsValid(delIng, n.cfg.IngressClass, defIngressClass) {
|
||||
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
|
||||
return
|
||||
}
|
||||
n.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
|
||||
n.listers.IngressAnnotation.Delete(delIng)
|
||||
n.syncQueue.Enqueue(obj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldIng := old.(*extensions.Ingress)
|
||||
curIng := cur.(*extensions.Ingress)
|
||||
validOld := class.IsValid(oldIng, n.cfg.IngressClass, defIngressClass)
|
||||
validCur := class.IsValid(curIng, n.cfg.IngressClass, defIngressClass)
|
||||
if !validOld && validCur {
|
||||
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
|
||||
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
} else if validOld && !validCur {
|
||||
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
|
||||
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
} else if validCur && !reflect.DeepEqual(old, cur) {
|
||||
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
}
|
||||
|
||||
n.extractAnnotations(curIng)
|
||||
n.syncQueue.Enqueue(cur)
|
||||
},
|
||||
}
|
||||
|
||||
secrEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
sec := cur.(*apiv1.Secret)
|
||||
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
||||
_, exists := n.sslCertTracker.Get(key)
|
||||
if exists {
|
||||
n.syncSecret(key)
|
||||
}
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
sec, ok := obj.(*apiv1.Secret)
|
||||
if !ok {
|
||||
// If we reached here it means the secret was deleted but its final state is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
sec, ok = tombstone.Obj.(*apiv1.Secret)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
||||
n.sslCertTracker.Delete(key)
|
||||
n.syncQueue.Enqueue(key)
|
||||
},
|
||||
}
|
||||
|
||||
eventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
n.syncQueue.Enqueue(obj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
n.syncQueue.Enqueue(obj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oep := old.(*apiv1.Endpoints)
|
||||
ocur := cur.(*apiv1.Endpoints)
|
||||
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
|
||||
n.syncQueue.Enqueue(cur)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
mapEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
upCmap := obj.(*apiv1.ConfigMap)
|
||||
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
|
||||
if mapKey == n.cfg.ConfigMapName {
|
||||
glog.V(2).Infof("adding configmap %v to backend", mapKey)
|
||||
n.SetConfig(upCmap)
|
||||
n.SetForceReload(true)
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
upCmap := cur.(*apiv1.ConfigMap)
|
||||
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
|
||||
if mapKey == n.cfg.ConfigMapName {
|
||||
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
|
||||
n.SetConfig(upCmap)
|
||||
n.SetForceReload(true)
|
||||
}
|
||||
// updates to configuration configmaps can trigger an update
|
||||
if mapKey == n.cfg.ConfigMapName || mapKey == n.cfg.TCPConfigMapName || mapKey == n.cfg.UDPConfigMapName {
|
||||
n.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
|
||||
n.syncQueue.Enqueue(cur)
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
watchNs := apiv1.NamespaceAll
|
||||
if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll {
|
||||
watchNs = n.cfg.Namespace
|
||||
}
|
||||
|
||||
lister := &ingress.StoreLister{}
|
||||
lister.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
|
||||
|
||||
controller := &cacheController{}
|
||||
|
||||
lister.Ingress.Store, controller.Ingress = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(n.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", n.cfg.Namespace, fields.Everything()),
|
||||
&extensions.Ingress{}, n.cfg.ResyncPeriod, ingEventHandler)
|
||||
|
||||
lister.Endpoint.Store, controller.Endpoint = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "endpoints", n.cfg.Namespace, fields.Everything()),
|
||||
&apiv1.Endpoints{}, n.cfg.ResyncPeriod, eventHandler)
|
||||
|
||||
lister.Secret.Store, controller.Secret = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
|
||||
&apiv1.Secret{}, n.cfg.ResyncPeriod, secrEventHandler)
|
||||
|
||||
lister.ConfigMap.Store, controller.Configmap = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
|
||||
&apiv1.ConfigMap{}, n.cfg.ResyncPeriod, mapEventHandler)
|
||||
|
||||
lister.Service.Store, controller.Service = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "services", n.cfg.Namespace, fields.Everything()),
|
||||
&apiv1.Service{}, n.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||
|
||||
return lister, controller
|
||||
}
|
|
@ -69,10 +69,9 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
tmplPath = "/etc/nginx/template/nginx.tmpl"
|
||||
cfgPath = "/etc/nginx/nginx.conf"
|
||||
nginxBinary = "/usr/sbin/nginx"
|
||||
defIngressClass = "nginx"
|
||||
tmplPath = "/etc/nginx/template/nginx.tmpl"
|
||||
cfgPath = "/etc/nginx/nginx.conf"
|
||||
nginxBinary = "/usr/sbin/nginx"
|
||||
)
|
||||
|
||||
// NewNGINXController creates a new NGINX Ingress controller.
|
||||
|
@ -103,9 +102,9 @@ func NewNGINXController(config *Configuration) *NGINXController {
|
|||
|
||||
isIPV6Enabled: ing_net.IsIPv6Enabled(),
|
||||
|
||||
resolver: h,
|
||||
cfg: config,
|
||||
sslCertTracker: store.NewSSLCertTracker(),
|
||||
resolver: h,
|
||||
cfg: config,
|
||||
|
||||
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
|
||||
|
||||
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
|
||||
|
@ -113,19 +112,26 @@ func NewNGINXController(config *Configuration) *NGINXController {
|
|||
}),
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
updateCh: make(chan interface{}),
|
||||
|
||||
stopLock: &sync.Mutex{},
|
||||
|
||||
fileSystem: filesystem.DefaultFs{},
|
||||
}
|
||||
|
||||
n.listers, n.controllers = n.createListers(n.stopCh)
|
||||
watchNs := apiv1.NamespaceAll
|
||||
if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll {
|
||||
watchNs = n.cfg.Namespace
|
||||
}
|
||||
|
||||
ae := annotations.NewAnnotationExtractor(n)
|
||||
|
||||
n.store = store.New(watchNs, n.cfg.ResyncPeriod, n.recorder, n.cfg.Client, ae, n.updateCh)
|
||||
|
||||
n.stats = newStatsCollector(config.Namespace, config.IngressClass, n.binary, n.cfg.ListenPorts.Status)
|
||||
|
||||
n.syncQueue = task.NewTaskQueue(n.syncIngress)
|
||||
|
||||
n.annotations = annotations.NewAnnotationExtractor(n)
|
||||
|
||||
if config.UpdateStatus {
|
||||
n.syncStatus = status.NewStatusSyncer(status.Config{
|
||||
Client: config.Client,
|
||||
|
@ -174,21 +180,12 @@ Error loading new template : %v
|
|||
type NGINXController struct {
|
||||
cfg *Configuration
|
||||
|
||||
listers *ingress.StoreLister
|
||||
controllers *cacheController
|
||||
|
||||
annotations annotations.Extractor
|
||||
|
||||
recorder record.EventRecorder
|
||||
|
||||
syncQueue *task.Queue
|
||||
|
||||
syncStatus status.Sync
|
||||
|
||||
// local store of SSL certificates
|
||||
// (only certificates used in ingress)
|
||||
sslCertTracker *store.SSLCertTracker
|
||||
|
||||
syncRateLimiter flowcontrol.RateLimiter
|
||||
|
||||
// stopLock is used to enforce only a single call to Stop is active.
|
||||
|
@ -201,7 +198,7 @@ type NGINXController struct {
|
|||
// ngxErrCh channel used to detect errors with the nginx processes
|
||||
ngxErrCh chan error
|
||||
|
||||
// runningConfig contains the running configuration in the Backend
|
||||
// runningConfig contains the running configuration
|
||||
runningConfig *ingress.Configuration
|
||||
|
||||
forceReload int32
|
||||
|
@ -210,7 +207,7 @@ type NGINXController struct {
|
|||
|
||||
configmap *apiv1.ConfigMap
|
||||
|
||||
storeLister *ingress.StoreLister
|
||||
store store.Storer
|
||||
|
||||
binary string
|
||||
resolver []net.IP
|
||||
|
@ -224,8 +221,6 @@ type NGINXController struct {
|
|||
// returns true if proxy protocol es enabled
|
||||
IsProxyProtocolEnabled bool
|
||||
|
||||
isSSLPassthroughEnabled bool
|
||||
|
||||
isShuttingDown bool
|
||||
|
||||
Proxy *TCPProxy
|
||||
|
@ -239,7 +234,7 @@ type NGINXController struct {
|
|||
func (n *NGINXController) Start() {
|
||||
glog.Infof("starting Ingress controller")
|
||||
|
||||
n.controllers.Run(n.stopCh)
|
||||
n.store.Start()
|
||||
|
||||
// initial sync of secrets to avoid unnecessary reloads
|
||||
glog.Info("running initial sync of secrets")
|
||||
|
@ -265,8 +260,6 @@ func (n *NGINXController) Start() {
|
|||
go n.syncStatus.Run(n.stopCh)
|
||||
}
|
||||
|
||||
go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh)
|
||||
|
||||
done := make(chan error, 1)
|
||||
cmd := exec.Command(n.binary, "-c", cfgPath)
|
||||
|
||||
|
@ -472,16 +465,40 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
|
|||
}
|
||||
}
|
||||
|
||||
svcIP := svc.Spec.ClusterIP
|
||||
|
||||
// in case of headless services we can use just the first address
|
||||
isHeadlessService := svc.Spec.ClusterIP == "None"
|
||||
if isHeadlessService {
|
||||
ep, err := n.listers.Endpoint.GetServiceEndpoints(svc)
|
||||
if err != nil {
|
||||
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(ep.Subsets) == 0 {
|
||||
glog.Warningf("invalid service headless definition (no subsets)")
|
||||
continue
|
||||
}
|
||||
|
||||
if len(ep.Subsets[0].Addresses) == 0 {
|
||||
glog.Warningf("invalid service headless definition (no addresses)")
|
||||
continue
|
||||
}
|
||||
|
||||
svcIP = ep.Subsets[0].Addresses[0].IP
|
||||
}
|
||||
|
||||
//TODO: Allow PassthroughBackends to specify they support proxy-protocol
|
||||
servers = append(servers, &TCPServer{
|
||||
Hostname: pb.Hostname,
|
||||
IP: svc.Spec.ClusterIP,
|
||||
IP: svcIP,
|
||||
Port: port,
|
||||
ProxyProtocol: false,
|
||||
})
|
||||
}
|
||||
|
||||
if n.isSSLPassthroughEnabled {
|
||||
if n.cfg.EnableSSLPassthrough {
|
||||
n.Proxy.ServerList = servers
|
||||
}
|
||||
|
||||
|
@ -627,7 +644,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
|
|||
Cfg: cfg,
|
||||
IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6,
|
||||
RedirectServers: redirectServers,
|
||||
IsSSLPassthroughEnabled: n.isSSLPassthroughEnabled,
|
||||
IsSSLPassthroughEnabled: n.cfg.EnableSSLPassthrough,
|
||||
ListenPorts: n.cfg.ListenPorts,
|
||||
PublishService: n.GetPublishService(),
|
||||
}
|
||||
|
|
|
@ -29,6 +29,10 @@ import (
|
|||
"github.com/ncabatoff/process-exporter/proc"
|
||||
)
|
||||
|
||||
const (
|
||||
processName = "nginx"
|
||||
)
|
||||
|
||||
// IsRespawnIfRequired checks if error type is exec.ExitError or not
|
||||
func IsRespawnIfRequired(err error) bool {
|
||||
exitError, ok := err.(*exec.ExitError)
|
||||
|
@ -68,7 +72,7 @@ func WaitUntilPortIsAvailable(port int) {
|
|||
continue
|
||||
}
|
||||
|
||||
if pn == "nginx" {
|
||||
if pn == processName {
|
||||
osp, err := os.FindProcess(p.PID)
|
||||
if err != nil {
|
||||
glog.Errorf("unexpected error obtaining process information: %v", err)
|
||||
|
@ -85,7 +89,7 @@ func WaitUntilPortIsAvailable(port int) {
|
|||
func IsNginxRunning() bool {
|
||||
processes, _ := ps.Processes()
|
||||
for _, p := range processes {
|
||||
if p.Executable() == "nginx" {
|
||||
if p.Executable() == processName {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ type TCPProxy struct {
|
|||
|
||||
func (p *TCPProxy) Get(host string) *TCPServer {
|
||||
if p.ServerList == nil {
|
||||
glog.Warning("there is no servers configured with SSL passthrough. Returning default backend")
|
||||
return p.Default
|
||||
}
|
||||
|
||||
|
@ -94,6 +95,7 @@ func (p *TCPProxy) Handle(conn net.Conn) {
|
|||
glog.V(4).Infof("Writing proxy protocol header - %s", proxyProtocolHeader)
|
||||
_, err = fmt.Fprintf(clientConn, proxyProtocolHeader)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("unexpected error writing proxy-protocol header: %s", err)
|
||||
clientConn.Close()
|
||||
|
|
|
@ -40,6 +40,7 @@ import (
|
|||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
|
||||
"k8s.io/ingress-nginx/internal/ingress/store"
|
||||
"k8s.io/ingress-nginx/internal/k8s"
|
||||
|
@ -69,9 +70,6 @@ type Config struct {
|
|||
UseNodeInternalIP bool
|
||||
|
||||
IngressLister store.IngressLister
|
||||
|
||||
DefaultIngressClass string
|
||||
IngressClass string
|
||||
}
|
||||
|
||||
// statusSync keeps the status IP in each Ingress rule updated executing a periodic check
|
||||
|
@ -180,9 +178,9 @@ func NewStatusSyncer(config Config) Sync {
|
|||
|
||||
// we need to use the defined ingress class to allow multiple leaders
|
||||
// in order to update information about ingress status
|
||||
electionID := fmt.Sprintf("%v-%v", config.ElectionID, config.DefaultIngressClass)
|
||||
if config.IngressClass != "" {
|
||||
electionID = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass)
|
||||
electionID := fmt.Sprintf("%v-%v", config.ElectionID, ingress.DefaultIngressClass)
|
||||
if ingress.IngressClass != "" {
|
||||
electionID = fmt.Sprintf("%v-%v", config.ElectionID, ingress.IngressClass)
|
||||
}
|
||||
|
||||
callbacks := leaderelection.LeaderCallbacks{
|
||||
|
@ -314,7 +312,7 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
|
|||
for _, cur := range ings {
|
||||
ing := cur.(*extensions.Ingress)
|
||||
|
||||
if !class.IsValid(ing, s.Config.IngressClass, s.Config.DefaultIngressClass) {
|
||||
if !class.IsValid(ing, ingress.IngressClass, ingress.DefaultIngressClass) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -261,8 +261,6 @@ func TestStatusActions(t *testing.T) {
|
|||
Client: buildSimpleClientSet(),
|
||||
PublishService: "",
|
||||
IngressLister: buildIngressListener(),
|
||||
DefaultIngressClass: "nginx",
|
||||
IngressClass: "",
|
||||
UpdateStatusOnShutdown: true,
|
||||
}
|
||||
// create object
|
||||
|
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
@ -25,53 +25,51 @@ import (
|
|||
"github.com/imdario/mergo"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
|
||||
"k8s.io/ingress-nginx/internal/net/ssl"
|
||||
)
|
||||
|
||||
// syncSecret keeps in sync Secrets used by Ingress rules with the files on
|
||||
// disk to allow copy of the content of the secret to disk to be used
|
||||
// by external processes.
|
||||
func (ic *NGINXController) syncSecret(key string) {
|
||||
func (s k8sStore) syncSecret(key string) {
|
||||
glog.V(3).Infof("starting syncing of secret %v", key)
|
||||
|
||||
cert, err := ic.getPemCertificate(key)
|
||||
// cert
|
||||
_, err := s.getPemCertificate(key)
|
||||
if err != nil {
|
||||
glog.Warningf("error obtaining PEM from secret %v: %v", key, err)
|
||||
return
|
||||
}
|
||||
|
||||
// create certificates and add or update the item in the store
|
||||
cur, exists := ic.sslCertTracker.Get(key)
|
||||
if exists {
|
||||
s := cur.(*ingress.SSLCert)
|
||||
if s.Equal(cert) {
|
||||
// no need to update
|
||||
/*
|
||||
// create certificates and add or update the item in the store
|
||||
cur, exists := s.GetSecret(key)
|
||||
if exists {
|
||||
s := cur.(*ingress.SSLCert)
|
||||
if s.Equal(cert) {
|
||||
// no need to update
|
||||
return
|
||||
}
|
||||
glog.Infof("updating secret %v in the local store", key)
|
||||
ic.store.UpdateLocalSecret(key, cert)
|
||||
// this update must trigger an update
|
||||
// (like an update event from a change in Ingress)
|
||||
ic.syncQueue.Enqueue(&extensions.Ingress{})
|
||||
return
|
||||
}
|
||||
glog.Infof("updating secret %v in the local store", key)
|
||||
ic.sslCertTracker.Update(key, cert)
|
||||
|
||||
glog.Infof("adding secret %v to the local store", key)
|
||||
ic.store.AddLocalSecret(key, cert)
|
||||
// this update must trigger an update
|
||||
// (like an update event from a change in Ingress)
|
||||
ic.syncQueue.Enqueue(&extensions.Ingress{})
|
||||
return
|
||||
}
|
||||
|
||||
glog.Infof("adding secret %v to the local store", key)
|
||||
ic.sslCertTracker.Add(key, cert)
|
||||
// this update must trigger an update
|
||||
// (like an update event from a change in Ingress)
|
||||
ic.syncQueue.Enqueue(&extensions.Ingress{})
|
||||
ic.syncQueue.Enqueue(&extensions.Ingress{})*/
|
||||
}
|
||||
|
||||
// getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
|
||||
// It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
|
||||
func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
|
||||
secret, err := ic.listers.Secret.GetByName(secretName)
|
||||
func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
|
||||
secret, err := s.listers.Secret.GetByNamespaceName(secretName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err)
|
||||
}
|
||||
|
@ -83,7 +81,7 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer
|
|||
// namespace/secretName -> namespace-secretName
|
||||
nsSecName := strings.Replace(secretName, "/", "-", -1)
|
||||
|
||||
var s *ingress.SSLCert
|
||||
var sslCert *ingress.SSLCert
|
||||
if okcert && okkey {
|
||||
if cert == nil {
|
||||
return nil, fmt.Errorf("secret %v has no 'tls.crt'", secretName)
|
||||
|
@ -94,18 +92,18 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer
|
|||
|
||||
// If 'ca.crt' is also present, it will allow this secret to be used in the
|
||||
// 'nginx.ingress.kubernetes.io/auth-tls-secret' annotation
|
||||
s, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca)
|
||||
sslCert, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error creating pem file: %v", err)
|
||||
}
|
||||
|
||||
glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, s.CN)
|
||||
if ca != nil {
|
||||
glog.V(3).Infof("found 'ca.crt', secret %v can also be used for Certificate Authentication", secretName)
|
||||
}
|
||||
|
||||
/*
|
||||
glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, s.CN)
|
||||
if ca != nil {
|
||||
glog.V(3).Infof("found 'ca.crt', secret %v can also be used for Certificate Authentication", secretName)
|
||||
}
|
||||
*/
|
||||
} else if ca != nil {
|
||||
s, err = ssl.AddCertAuth(nsSecName, ca)
|
||||
sslCert, err = ssl.AddCertAuth(nsSecName, ca)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error creating pem file: %v", err)
|
||||
|
@ -119,15 +117,20 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer
|
|||
return nil, fmt.Errorf("no keypair or CA cert could be found in %v", secretName)
|
||||
}
|
||||
|
||||
s.Name = secret.Name
|
||||
s.Namespace = secret.Namespace
|
||||
return s, nil
|
||||
sslCert.Name = secret.Name
|
||||
sslCert.Namespace = secret.Namespace
|
||||
|
||||
return sslCert, nil
|
||||
}
|
||||
|
||||
func (ic *NGINXController) checkSSLChainIssues() {
|
||||
for _, secretName := range ic.sslCertTracker.ListKeys() {
|
||||
s, _ := ic.sslCertTracker.Get(secretName)
|
||||
secret := s.(*ingress.SSLCert)
|
||||
func (s k8sStore) checkSSLChainIssues() {
|
||||
for _, item := range s.ListLocalSecrets() {
|
||||
secretName := fmt.Sprintf("%v/%v", item.Namespace, item.Name)
|
||||
|
||||
secret, err := s.GetLocalSecret(secretName)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if secret.FullChainPemFileName != "" {
|
||||
// chain already checked
|
||||
|
@ -158,42 +161,37 @@ func (ic *NGINXController) checkSSLChainIssues() {
|
|||
dst.FullChainPemFileName = fullChainPemFileName
|
||||
|
||||
glog.Infof("updating local copy of ssl certificate %v with missing intermediate CA certs", secretName)
|
||||
ic.sslCertTracker.Update(secretName, dst)
|
||||
s.sslStore.Update(secretName, dst)
|
||||
// this update must trigger an update
|
||||
// (like an update event from a change in Ingress)
|
||||
ic.syncQueue.Enqueue(&extensions.Ingress{})
|
||||
//ic.syncQueue.Enqueue(&extensions.Ingress{})
|
||||
}
|
||||
}
|
||||
|
||||
// checkMissingSecrets verify if one or more ingress rules contains a reference
|
||||
// to a secret that is not present in the local secret store.
|
||||
// In this case we call syncSecret.
|
||||
func (ic *NGINXController) checkMissingSecrets() {
|
||||
for _, obj := range ic.listers.Ingress.List() {
|
||||
ing := obj.(*extensions.Ingress)
|
||||
|
||||
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
|
||||
continue
|
||||
}
|
||||
|
||||
func (s k8sStore) checkMissingSecrets() {
|
||||
for _, ing := range s.ListIngresses() {
|
||||
for _, tls := range ing.Spec.TLS {
|
||||
if tls.SecretName == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
|
||||
if _, ok := ic.sslCertTracker.Get(key); !ok {
|
||||
ic.syncSecret(key)
|
||||
if _, ok := s.sslStore.Get(key); !ok {
|
||||
s.syncSecret(key)
|
||||
}
|
||||
}
|
||||
|
||||
key, _ := parser.GetStringAnnotation("auth-tls-secret", ing, ic)
|
||||
if key == "" {
|
||||
continue
|
||||
}
|
||||
/*
|
||||
key, _ := parser.GetStringAnnotation("auth-tls-secret", ing, resolver)
|
||||
if key == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := ic.sslCertTracker.Get(key); !ok {
|
||||
ic.syncSecret(key)
|
||||
}
|
||||
if _, ok := ic.sslCertTracker.Get(key); !ok {
|
||||
ic.syncSecret(key)
|
||||
}*/
|
||||
}
|
||||
}
|
|
@ -14,24 +14,20 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
package store
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
testclient "k8s.io/client-go/kubernetes/fake"
|
||||
cache_client "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
"k8s.io/ingress-nginx/internal/ingress/store"
|
||||
"k8s.io/ingress-nginx/internal/task"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -66,8 +62,8 @@ func buildSimpleClientSetForBackendSSL() *testclient.Clientset {
|
|||
return testclient.NewSimpleClientset()
|
||||
}
|
||||
|
||||
func buildIngListenerForBackendSSL() store.IngressLister {
|
||||
ingLister := store.IngressLister{}
|
||||
func buildIngListenerForBackendSSL() IngressLister {
|
||||
ingLister := IngressLister{}
|
||||
ingLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
|
||||
return ingLister
|
||||
}
|
||||
|
@ -81,20 +77,21 @@ func buildSecretForBackendSSL() *apiv1.Secret {
|
|||
}
|
||||
}
|
||||
|
||||
func buildSecrListerForBackendSSL() store.SecretLister {
|
||||
secrLister := store.SecretLister{}
|
||||
func buildSecrListerForBackendSSL() SecretLister {
|
||||
secrLister := SecretLister{}
|
||||
secrLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
|
||||
|
||||
return secrLister
|
||||
}
|
||||
|
||||
/*
|
||||
func buildListers() *ingress.StoreLister {
|
||||
sl := &ingress.StoreLister{}
|
||||
sl.Ingress.Store = buildIngListenerForBackendSSL()
|
||||
sl.Secret.Store = buildSecrListerForBackendSSL()
|
||||
return sl
|
||||
}
|
||||
|
||||
*/
|
||||
func buildControllerForBackendSSL() cache_client.Controller {
|
||||
cfg := &cache_client.Config{
|
||||
Queue: &MockQueue{Synced: true},
|
||||
|
@ -103,6 +100,7 @@ func buildControllerForBackendSSL() cache_client.Controller {
|
|||
return cache_client.New(cfg)
|
||||
}
|
||||
|
||||
/*
|
||||
func buildGenericControllerForBackendSSL() *NGINXController {
|
||||
gc := &NGINXController{
|
||||
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
|
||||
|
@ -110,13 +108,13 @@ func buildGenericControllerForBackendSSL() *NGINXController {
|
|||
Client: buildSimpleClientSetForBackendSSL(),
|
||||
},
|
||||
listers: buildListers(),
|
||||
sslCertTracker: store.NewSSLCertTracker(),
|
||||
sslCertTracker: NewSSLCertTracker(),
|
||||
}
|
||||
|
||||
gc.syncQueue = task.NewTaskQueue(gc.syncIngress)
|
||||
return gc
|
||||
}
|
||||
|
||||
*/
|
||||
func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) {
|
||||
// prepare
|
||||
td, err := ioutil.TempDir("", "ssl")
|
||||
|
@ -140,6 +138,7 @@ func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) {
|
|||
return dCrt, dKey, dCa, nil
|
||||
}
|
||||
|
||||
/*
|
||||
func TestSyncSecret(t *testing.T) {
|
||||
// prepare for test
|
||||
dCrt, dKey, dCa, err := buildCrtKeyAndCA()
|
||||
|
@ -232,3 +231,4 @@ func TestGetPemCertificate(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
*/
|
41
internal/ingress/store/configmap.go
Normal file
41
internal/ingress/store/configmap.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// ConfigMapLister makes a Store that lists Configmaps.
|
||||
type ConfigMapLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByNamespaceName searches for a configmap in the local configmaps Store
|
||||
func (cml *ConfigMapLister) GetByNamespaceName(key string) (*apiv1.ConfigMap, error) {
|
||||
s, exists, err := cml.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("configmap %v was not found", key)
|
||||
}
|
||||
return s.(*apiv1.ConfigMap), nil
|
||||
}
|
40
internal/ingress/store/endpoint.go
Normal file
40
internal/ingress/store/endpoint.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// EndpointLister makes a Store that lists Endpoints.
|
||||
type EndpointLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetServiceEndpoints returns the endpoints of a service, matched on service name.
|
||||
func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
|
||||
for _, m := range s.Store.List() {
|
||||
ep := m.(*apiv1.Endpoints)
|
||||
if svc.Name == ep.Name && svc.Namespace == ep.Namespace {
|
||||
return ep, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("could not find endpoints for service: %v", svc.Name)
|
||||
}
|
41
internal/ingress/store/ingress.go
Normal file
41
internal/ingress/store/ingress.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// IngressLister makes a Store that lists Ingress.
|
||||
type IngressLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByNamespaceName searches for an ingress in the local ingress Store
|
||||
func (il IngressLister) GetByNamespaceName(key string) (*extensions.Ingress, error) {
|
||||
i, exists, err := il.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("ingress %v was not found", key)
|
||||
}
|
||||
return i.(*extensions.Ingress), nil
|
||||
}
|
26
internal/ingress/store/ingress_annotation.go
Normal file
26
internal/ingress/store/ingress_annotation.go
Normal file
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules.
|
||||
type IngressAnnotationsLister struct {
|
||||
cache.Store
|
||||
}
|
30
internal/ingress/store/local_secrets.go
Normal file
30
internal/ingress/store/local_secrets.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
)
|
||||
|
||||
// SSLCertTracker holds a store of referenced Secrets in Ingress rules
|
||||
type SSLCertTracker struct {
|
||||
cache.ThreadSafeStore
|
||||
}
|
||||
|
||||
// NewSSLCertTracker creates a new SSLCertTracker store
|
||||
func NewSSLCertTracker() *SSLCertTracker {
|
||||
return &SSLCertTracker{
|
||||
cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
|
||||
}
|
||||
}
|
||||
|
||||
// GetByNamespaceName searches for an ingress in the local ingress Store
|
||||
func (s SSLCertTracker) GetByNamespaceName(key string) (*ingress.SSLCert, error) {
|
||||
cert, exists := s.Get(key)
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("local SSL certificate %v was not found", key)
|
||||
}
|
||||
return cert.(*ingress.SSLCert), nil
|
||||
}
|
|
@ -1,113 +0,0 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// IngressLister makes a Store that lists Ingress.
|
||||
type IngressLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules.
|
||||
type IngressAnnotationsLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// SecretLister makes a Store that lists Secrets.
|
||||
type SecretLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByName searches for a secret in the local secrets Store
|
||||
func (sl *SecretLister) GetByName(name string) (*apiv1.Secret, error) {
|
||||
s, exists, err := sl.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("secret %v was not found", name)
|
||||
}
|
||||
return s.(*apiv1.Secret), nil
|
||||
}
|
||||
|
||||
// ConfigMapLister makes a Store that lists Configmaps.
|
||||
type ConfigMapLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByName searches for a configmap in the local configmaps Store
|
||||
func (cml *ConfigMapLister) GetByName(name string) (*apiv1.ConfigMap, error) {
|
||||
s, exists, err := cml.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("configmap %v was not found", name)
|
||||
}
|
||||
return s.(*apiv1.ConfigMap), nil
|
||||
}
|
||||
|
||||
// ServiceLister makes a Store that lists Services.
|
||||
type ServiceLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByName searches for a service in the local secrets Store
|
||||
func (sl *ServiceLister) GetByName(name string) (*apiv1.Service, error) {
|
||||
s, exists, err := sl.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("service %v was not found", name)
|
||||
}
|
||||
return s.(*apiv1.Service), nil
|
||||
}
|
||||
|
||||
// EndpointLister makes a Store that lists Endpoints.
|
||||
type EndpointLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetServiceEndpoints returns the endpoints of a service, matched on service name.
|
||||
func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
|
||||
for _, m := range s.Store.List() {
|
||||
ep := m.(*apiv1.Endpoints)
|
||||
if svc.Name == ep.Name && svc.Namespace == ep.Namespace {
|
||||
return ep, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("could not find endpoints for service: %v", svc.Name)
|
||||
}
|
||||
|
||||
// SSLCertTracker holds a store of referenced Secrets in Ingress rules
|
||||
type SSLCertTracker struct {
|
||||
cache.ThreadSafeStore
|
||||
}
|
||||
|
||||
// NewSSLCertTracker creates a new SSLCertTracker store
|
||||
func NewSSLCertTracker() *SSLCertTracker {
|
||||
return &SSLCertTracker{
|
||||
cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
|
||||
}
|
||||
}
|
41
internal/ingress/store/secret.go
Normal file
41
internal/ingress/store/secret.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// SecretLister makes a Store that lists Secrets.
|
||||
type SecretLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByNamespaceName searches for a secret in the local secrets Store
|
||||
func (sl *SecretLister) GetByNamespaceName(key string) (*apiv1.Secret, error) {
|
||||
s, exists, err := sl.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("secret %v was not found", key)
|
||||
}
|
||||
return s.(*apiv1.Secret), nil
|
||||
}
|
41
internal/ingress/store/service.go
Normal file
41
internal/ingress/store/service.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// ServiceLister makes a Store that lists Services.
|
||||
type ServiceLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByNamespaceName searches for a service in the local secrets Store
|
||||
func (sl *ServiceLister) GetByNamespaceName(key string) (*apiv1.Service, error) {
|
||||
s, exists, err := sl.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("service %v was not found", key)
|
||||
}
|
||||
return s.(*apiv1.Service), nil
|
||||
}
|
412
internal/ingress/store/store.go
Normal file
412
internal/ingress/store/store.go
Normal file
|
@ -0,0 +1,412 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
cache_client "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
|
||||
"k8s.io/ingress-nginx/internal/ingress/resolver"
|
||||
)
|
||||
|
||||
// Storer is the interface that wraps the required methods to gather information
|
||||
// about ingresses, services, secrets and ingress annotations.
|
||||
type Storer interface {
|
||||
GetConfigMap(key string) (*apiv1.ConfigMap, error)
|
||||
|
||||
// GetSecret returns a Secret using the namespace and name as key
|
||||
GetSecret(key string) (*apiv1.Secret, error)
|
||||
|
||||
// GetService returns a Service using the namespace and name as key
|
||||
GetService(key string) (*apiv1.Service, error)
|
||||
|
||||
GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error)
|
||||
|
||||
// GetSecret returns an Ingress using the namespace and name as key
|
||||
GetIngress(key string) (*extensions.Ingress, error)
|
||||
|
||||
// ListIngresses returns the list of Ingresses
|
||||
ListIngresses() []*extensions.Ingress
|
||||
|
||||
// GetIngressAnnotations returns the annotations associated to an Ingress
|
||||
GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error)
|
||||
|
||||
// GetLocalSecret returns the local copy of a Secret
|
||||
GetLocalSecret(name string) (*ingress.SSLCert, error)
|
||||
|
||||
// ListLocalSecrets returns the list of local Secrets
|
||||
ListLocalSecrets() []*ingress.SSLCert
|
||||
|
||||
// StartSync initiates the synchronization of the controllers
|
||||
StartSync(stopCh chan struct{})
|
||||
}
|
||||
|
||||
// lister returns the stores for ingresses, services, endpoints, secrets and configmaps.
|
||||
type lister struct {
|
||||
Ingress IngressLister
|
||||
Service ServiceLister
|
||||
Endpoint EndpointLister
|
||||
Secret SecretLister
|
||||
ConfigMap ConfigMapLister
|
||||
IngressAnnotation IngressAnnotationsLister
|
||||
}
|
||||
|
||||
// controller defines the required controllers that interact agains the api server
|
||||
type controller struct {
|
||||
Ingress cache.Controller
|
||||
Endpoint cache.Controller
|
||||
Service cache.Controller
|
||||
Secret cache.Controller
|
||||
Configmap cache.Controller
|
||||
}
|
||||
|
||||
// Run initiates the synchronization of the controllers against the api server
|
||||
func (c *controller) Run(stopCh chan struct{}) {
|
||||
go c.Ingress.Run(stopCh)
|
||||
go c.Endpoint.Run(stopCh)
|
||||
go c.Service.Run(stopCh)
|
||||
go c.Secret.Run(stopCh)
|
||||
go c.Configmap.Run(stopCh)
|
||||
|
||||
// wait for all involved caches to be synced, before processing items
|
||||
// from the queue is started
|
||||
if !cache.WaitForCacheSync(stopCh,
|
||||
c.Ingress.HasSynced,
|
||||
c.Endpoint.HasSynced,
|
||||
c.Service.HasSynced,
|
||||
c.Secret.HasSynced,
|
||||
c.Configmap.HasSynced,
|
||||
) {
|
||||
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
|
||||
}
|
||||
}
|
||||
|
||||
type k8sStore struct {
|
||||
cache *controller
|
||||
// listers
|
||||
listers *lister
|
||||
|
||||
// sslStore local store of SSL certificates (certificates used in ingress)
|
||||
sslStore *SSLCertTracker
|
||||
|
||||
// annotations parser
|
||||
annotations annotations.Extractor
|
||||
}
|
||||
|
||||
// New creates a new object store to be used in the ingress controller
|
||||
func New(namespace, configmap, tcp, udp string,
|
||||
resyncPeriod time.Duration,
|
||||
recorder record.EventRecorder,
|
||||
client clientset.Interface,
|
||||
annotations annotations.Extractor,
|
||||
r resolver.Resolver,
|
||||
updateCh chan ingress.Event) Storer {
|
||||
|
||||
store := &k8sStore{
|
||||
cache: &controller{},
|
||||
listers: &lister{},
|
||||
sslStore: NewSSLCertTracker(),
|
||||
annotations: annotations,
|
||||
}
|
||||
|
||||
ingEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
addIng := obj.(*extensions.Ingress)
|
||||
if !class.IsValid(addIng, ingress.IngressClass, ingress.DefaultIngressClass) {
|
||||
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng, r)
|
||||
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
|
||||
return
|
||||
}
|
||||
|
||||
store.extractAnnotations(addIng)
|
||||
recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.CreateEvent,
|
||||
Obj: obj,
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
delIng, ok := obj.(*extensions.Ingress)
|
||||
if !ok {
|
||||
// If we reached here it means the ingress was deleted but its final state is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
delIng, ok = tombstone.Obj.(*extensions.Ingress)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
if !class.IsValid(delIng, ingress.IngressClass, ingress.DefaultIngressClass) {
|
||||
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
|
||||
return
|
||||
}
|
||||
recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
|
||||
store.listers.IngressAnnotation.Delete(delIng)
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.DeleteEvent,
|
||||
Obj: obj,
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldIng := old.(*extensions.Ingress)
|
||||
curIng := cur.(*extensions.Ingress)
|
||||
validOld := class.IsValid(oldIng, ingress.IngressClass, ingress.DefaultIngressClass)
|
||||
validCur := class.IsValid(curIng, ingress.IngressClass, ingress.DefaultIngressClass)
|
||||
if !validOld && validCur {
|
||||
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
|
||||
recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
} else if validOld && !validCur {
|
||||
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
|
||||
recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
} else if validCur && !reflect.DeepEqual(old, cur) {
|
||||
recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
}
|
||||
|
||||
store.extractAnnotations(curIng)
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.UpdateEvent,
|
||||
Obj: cur,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
secrEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
sec := cur.(*apiv1.Secret)
|
||||
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
||||
_, exists := store.sslStore.Get(key)
|
||||
if exists {
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.UpdateEvent,
|
||||
Obj: cur,
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
sec, ok := obj.(*apiv1.Secret)
|
||||
if !ok {
|
||||
// If we reached here it means the secret was deleted but its final state is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
sec, ok = tombstone.Obj.(*apiv1.Secret)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
||||
store.sslStore.Delete(key)
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.DeleteEvent,
|
||||
Obj: obj,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
eventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.CreateEvent,
|
||||
Obj: obj,
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.DeleteEvent,
|
||||
Obj: obj,
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oep := old.(*apiv1.Endpoints)
|
||||
ocur := cur.(*apiv1.Endpoints)
|
||||
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.UpdateEvent,
|
||||
Obj: cur,
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
mapEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
upCmap := obj.(*apiv1.ConfigMap)
|
||||
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
|
||||
if mapKey == configmap {
|
||||
glog.V(2).Infof("adding configmap %v to backend", mapKey)
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.CreateEvent,
|
||||
Obj: obj,
|
||||
}
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
upCmap := cur.(*apiv1.ConfigMap)
|
||||
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
|
||||
if mapKey == configmap {
|
||||
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.UpdateEvent,
|
||||
Obj: cur,
|
||||
}
|
||||
}
|
||||
// updates to configuration configmaps can trigger an update
|
||||
if mapKey == configmap || mapKey == tcp || mapKey == udp {
|
||||
recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
|
||||
updateCh <- ingress.Event{
|
||||
Type: ingress.UpdateEvent,
|
||||
Obj: cur,
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
store.listers.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
|
||||
|
||||
store.listers.Ingress.Store, store.cache.Ingress = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(client.ExtensionsV1beta1().RESTClient(), "ingresses", namespace, fields.Everything()),
|
||||
&extensions.Ingress{}, resyncPeriod, ingEventHandler)
|
||||
|
||||
store.listers.Endpoint.Store, store.cache.Endpoint = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", namespace, fields.Everything()),
|
||||
&apiv1.Endpoints{}, resyncPeriod, eventHandler)
|
||||
|
||||
store.listers.Secret.Store, store.cache.Secret = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "secrets", namespace, fields.Everything()),
|
||||
&apiv1.Secret{}, resyncPeriod, secrEventHandler)
|
||||
|
||||
store.listers.ConfigMap.Store, store.cache.Configmap = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()),
|
||||
&apiv1.ConfigMap{}, resyncPeriod, mapEventHandler)
|
||||
|
||||
store.listers.Service.Store, store.cache.Service = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "services", namespace, fields.Everything()),
|
||||
&apiv1.Service{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||
|
||||
return store
|
||||
}
|
||||
|
||||
func (s k8sStore) extractAnnotations(ing *extensions.Ingress) {
|
||||
anns := s.annotations.Extract(ing)
|
||||
glog.V(3).Infof("updating annotations information for ingres %v/%v", anns.Namespace, anns.Name)
|
||||
s.listers.IngressAnnotation.Update(anns)
|
||||
}
|
||||
|
||||
// GetSecret returns a Secret using the namespace and name as key
|
||||
func (s k8sStore) GetSecret(key string) (*apiv1.Secret, error) {
|
||||
return s.listers.Secret.GetByNamespaceName(key)
|
||||
}
|
||||
|
||||
// ListLocalSecrets returns the list of local Secrets
|
||||
func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert {
|
||||
var certs []*ingress.SSLCert
|
||||
for _, item := range s.sslStore.List() {
|
||||
if s, ok := item.(*ingress.SSLCert); ok {
|
||||
certs = append(certs, s)
|
||||
}
|
||||
}
|
||||
|
||||
return certs
|
||||
}
|
||||
|
||||
// GetService returns a Service using the namespace and name as key
|
||||
func (s k8sStore) GetService(key string) (*apiv1.Service, error) {
|
||||
return s.listers.Service.GetByNamespaceName(key)
|
||||
}
|
||||
|
||||
// GetSecret returns an Ingress using the namespace and name as key
|
||||
func (s k8sStore) GetIngress(key string) (*extensions.Ingress, error) {
|
||||
return s.listers.Ingress.GetByNamespaceName(key)
|
||||
}
|
||||
|
||||
// ListIngresses returns the list of Ingresses
|
||||
func (s k8sStore) ListIngresses() []*extensions.Ingress {
|
||||
// filter ingress rules
|
||||
var ingresses []*extensions.Ingress
|
||||
for _, item := range s.listers.Ingress.List() {
|
||||
ing := item.(*extensions.Ingress)
|
||||
if !class.IsValid(ing, ingress.IngressClass, ingress.DefaultIngressClass) {
|
||||
continue
|
||||
}
|
||||
|
||||
ingresses = append(ingresses, ing)
|
||||
}
|
||||
|
||||
return ingresses
|
||||
}
|
||||
|
||||
// GetIngressAnnotations returns the annotations associated to an Ingress
|
||||
func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) {
|
||||
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
|
||||
item, exists, err := s.listers.IngressAnnotation.GetByKey(key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("ingress annotation %v was not found", key)
|
||||
}
|
||||
return item.(*annotations.Ingress), nil
|
||||
}
|
||||
|
||||
// GetLocalSecret returns the local copy of a Secret
|
||||
func (s k8sStore) GetLocalSecret(key string) (*ingress.SSLCert, error) {
|
||||
return s.sslStore.GetByNamespaceName(key)
|
||||
}
|
||||
|
||||
func (s k8sStore) GetConfigMap(key string) (*apiv1.ConfigMap, error) {
|
||||
return s.listers.ConfigMap.GetByNamespaceName(key)
|
||||
}
|
||||
|
||||
func (s k8sStore) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
|
||||
return s.listers.Endpoint.GetServiceEndpoints(svc)
|
||||
}
|
||||
|
||||
// StartSync initiates the synchronization of the controllers
|
||||
func (s k8sStore) StartSync(stopCh chan struct{}) {
|
||||
// start controllers
|
||||
s.cache.Run(stopCh)
|
||||
// start goroutine to check for missing local secrets
|
||||
go wait.Until(s.checkMissingSecrets, 30*time.Second, stopCh)
|
||||
}
|
314
internal/ingress/store/store_test.go
Normal file
314
internal/ingress/store/store_test.go
Normal file
|
@ -0,0 +1,314 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/extensions/v1beta1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"k8s.io/ingress-nginx/internal/ingress"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations"
|
||||
"k8s.io/ingress-nginx/internal/ingress/resolver"
|
||||
"k8s.io/ingress-nginx/test/e2e/framework"
|
||||
)
|
||||
|
||||
func TestStore(t *testing.T) {
|
||||
// TODO: find a way to avoid the need to use a real api server
|
||||
home := os.Getenv("HOME")
|
||||
kubeConfigFile := fmt.Sprintf("%v/.kube/config", home)
|
||||
kubeContext := ""
|
||||
|
||||
kubeConfig, err := framework.LoadConfig(kubeConfigFile, kubeContext)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error loading kubeconfig file: %v", err)
|
||||
}
|
||||
|
||||
clientSet, err := kubernetes.NewForConfig(kubeConfig)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error creating ingress client: %v", err)
|
||||
}
|
||||
|
||||
t.Run("should return an error searching for non existing objects", func(t *testing.T) {
|
||||
ns := createNamespace(clientSet, t)
|
||||
defer deleteNamespace(ns, clientSet, t)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
updateCh := make(chan ingress.Event)
|
||||
defer close(updateCh)
|
||||
|
||||
go func(ch chan ingress.Event) {
|
||||
for {
|
||||
<-ch
|
||||
}
|
||||
}(updateCh)
|
||||
|
||||
storer := New(ns.Name,
|
||||
fmt.Sprintf("%v/config", ns.Name),
|
||||
fmt.Sprintf("%v/tcp", ns.Name),
|
||||
fmt.Sprintf("%v/udp", ns.Name),
|
||||
10*time.Minute,
|
||||
&record.FakeRecorder{},
|
||||
clientSet,
|
||||
annotations.Extractor{},
|
||||
resolver.Mock{},
|
||||
updateCh)
|
||||
|
||||
storer.StartSync(stopCh)
|
||||
|
||||
key := fmt.Sprintf("%v/anything", ns.Name)
|
||||
ing, err := storer.GetIngress(key)
|
||||
if err == nil {
|
||||
t.Errorf("expected an error but none returned")
|
||||
}
|
||||
if ing != nil {
|
||||
t.Errorf("expected an Ingres but none returned")
|
||||
}
|
||||
|
||||
ls, err := storer.GetLocalSecret(key)
|
||||
if err == nil {
|
||||
t.Errorf("expected an error but none returned")
|
||||
}
|
||||
if ls != nil {
|
||||
t.Errorf("expected an Ingres but none returned")
|
||||
}
|
||||
|
||||
s, err := storer.GetSecret(key)
|
||||
if err == nil {
|
||||
t.Errorf("expected an error but none returned")
|
||||
}
|
||||
if s != nil {
|
||||
t.Errorf("expected an Ingres but none returned")
|
||||
}
|
||||
|
||||
svc, err := storer.GetService(key)
|
||||
if err == nil {
|
||||
t.Errorf("expected an error but none returned")
|
||||
}
|
||||
if svc != nil {
|
||||
t.Errorf("expected an Ingres but none returned")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("should return ingress one event for add, update and delete", func(t *testing.T) {
|
||||
ns := createNamespace(clientSet, t)
|
||||
defer deleteNamespace(ns, clientSet, t)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
updateCh := make(chan ingress.Event)
|
||||
defer close(updateCh)
|
||||
|
||||
var add uint64
|
||||
var upd uint64
|
||||
var del uint64
|
||||
|
||||
go func(ch chan ingress.Event) {
|
||||
for {
|
||||
e := <-ch
|
||||
if e.Obj == nil {
|
||||
continue
|
||||
}
|
||||
if _, ok := e.Obj.(*extensions.Ingress); !ok {
|
||||
t.Errorf("expected an Ingress type but %T returned", e.Obj)
|
||||
}
|
||||
switch e.Type {
|
||||
case ingress.CreateEvent:
|
||||
atomic.AddUint64(&add, 1)
|
||||
break
|
||||
case ingress.UpdateEvent:
|
||||
atomic.AddUint64(&upd, 1)
|
||||
break
|
||||
case ingress.DeleteEvent:
|
||||
atomic.AddUint64(&del, 1)
|
||||
break
|
||||
}
|
||||
}
|
||||
}(updateCh)
|
||||
|
||||
storer := New(ns.Name,
|
||||
fmt.Sprintf("%v/config", ns.Name),
|
||||
fmt.Sprintf("%v/tcp", ns.Name),
|
||||
fmt.Sprintf("%v/udp", ns.Name),
|
||||
10*time.Minute,
|
||||
&record.FakeRecorder{},
|
||||
clientSet,
|
||||
annotations.Extractor{},
|
||||
resolver.Mock{},
|
||||
updateCh)
|
||||
|
||||
storer.StartSync(stopCh)
|
||||
|
||||
ing, err := ensureIngress(&v1beta1.Ingress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "dummy",
|
||||
Namespace: ns.Name,
|
||||
},
|
||||
Spec: v1beta1.IngressSpec{
|
||||
Rules: []v1beta1.IngressRule{
|
||||
{
|
||||
Host: "dummy",
|
||||
IngressRuleValue: v1beta1.IngressRuleValue{
|
||||
HTTP: &v1beta1.HTTPIngressRuleValue{
|
||||
Paths: []v1beta1.HTTPIngressPath{
|
||||
{
|
||||
Path: "/",
|
||||
Backend: v1beta1.IngressBackend{
|
||||
ServiceName: "http-svc",
|
||||
ServicePort: intstr.FromInt(80),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, clientSet)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error creating ingress: %v", err)
|
||||
}
|
||||
|
||||
// create an invalid ingress (different class)
|
||||
_, err = ensureIngress(&v1beta1.Ingress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "custom-class",
|
||||
Namespace: ns.Name,
|
||||
Annotations: map[string]string{
|
||||
"kubernetes.io/ingress.class": "something",
|
||||
},
|
||||
},
|
||||
Spec: v1beta1.IngressSpec{
|
||||
Rules: []v1beta1.IngressRule{
|
||||
{
|
||||
Host: "dummy",
|
||||
IngressRuleValue: v1beta1.IngressRuleValue{
|
||||
HTTP: &v1beta1.HTTPIngressRuleValue{
|
||||
Paths: []v1beta1.HTTPIngressPath{
|
||||
{
|
||||
Path: "/",
|
||||
Backend: v1beta1.IngressBackend{
|
||||
ServiceName: "http-svc",
|
||||
ServicePort: intstr.FromInt(80),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, clientSet)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error creating ingress: %v", err)
|
||||
}
|
||||
|
||||
ni := ing.DeepCopy()
|
||||
ni.Spec.Rules[0].Host = "update-dummy"
|
||||
_, err = ensureIngress(ni, clientSet)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error creating ingress: %v", err)
|
||||
}
|
||||
|
||||
err = clientSet.ExtensionsV1beta1().
|
||||
Ingresses(ni.Namespace).
|
||||
Delete(ni.Name, &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error creating ingress: %v", err)
|
||||
}
|
||||
|
||||
waitForNoIngressInNamespace(clientSet, ni.Namespace, ni.Name)
|
||||
|
||||
if atomic.LoadUint64(&add) != 1 {
|
||||
t.Errorf("expected 1 event of type Create but %v ocurred", add)
|
||||
}
|
||||
if atomic.LoadUint64(&upd) != 1 {
|
||||
t.Errorf("expected 1 event of type Update but %v ocurred", upd)
|
||||
}
|
||||
if atomic.LoadUint64(&del) != 1 {
|
||||
t.Errorf("expected 1 event of type Delete but %v ocurred", del)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func createNamespace(clientSet *kubernetes.Clientset, t *testing.T) *apiv1.Namespace {
|
||||
t.Log("creating temporal namespace")
|
||||
ns, err := framework.CreateKubeNamespace("store-test", clientSet)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error creating ingress client: %v", err)
|
||||
}
|
||||
t.Logf("temporal namespace %v created", ns.Name)
|
||||
|
||||
return ns
|
||||
}
|
||||
|
||||
func deleteNamespace(ns *apiv1.Namespace, clientSet *kubernetes.Clientset, t *testing.T) {
|
||||
t.Logf("deleting temporal namespace %v created", ns.Name)
|
||||
err := framework.DeleteKubeNamespace(clientSet, ns.Name)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error creating ingress client: %v", err)
|
||||
}
|
||||
t.Logf("temporal namespace %v deleted", ns.Name)
|
||||
}
|
||||
|
||||
func ensureIngress(ingress *extensions.Ingress, clientSet *kubernetes.Clientset) (*extensions.Ingress, error) {
|
||||
s, err := clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress)
|
||||
if err != nil {
|
||||
if k8sErrors.IsNotFound(err) {
|
||||
return clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(ingress)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func waitForNoIngressInNamespace(c kubernetes.Interface, namespace, name string) error {
|
||||
return wait.PollImmediate(1*time.Second, time.Minute*2, noIngressInNamespace(c, namespace, name))
|
||||
}
|
||||
|
||||
func noIngressInNamespace(c kubernetes.Interface, namespace, name string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
ing, err := c.ExtensionsV1beta1().Ingresses(namespace).Get(name, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
return true, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if ing == nil {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
|
@ -33,7 +33,6 @@ import (
|
|||
"k8s.io/ingress-nginx/internal/ingress/annotations/redirect"
|
||||
"k8s.io/ingress-nginx/internal/ingress/annotations/rewrite"
|
||||
"k8s.io/ingress-nginx/internal/ingress/resolver"
|
||||
"k8s.io/ingress-nginx/internal/ingress/store"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -42,17 +41,26 @@ var (
|
|||
// The name of each file is <namespace>-<secret name>.pem. The content is the concatenated
|
||||
// certificate and key.
|
||||
DefaultSSLDirectory = "/ingress-controller/ssl"
|
||||
|
||||
// DefaultIngressClass defines the default ingress class of the ingress controller
|
||||
DefaultIngressClass = "nginx"
|
||||
|
||||
// IngressClass contains the configured ingress class.
|
||||
// By default is empty
|
||||
IngressClass = ""
|
||||
)
|
||||
|
||||
// StoreLister returns the configured stores for ingresses, services,
|
||||
// endpoints, secrets and configmaps.
|
||||
type StoreLister struct {
|
||||
Ingress store.IngressLister
|
||||
Service store.ServiceLister
|
||||
Endpoint store.EndpointLister
|
||||
Secret store.SecretLister
|
||||
ConfigMap store.ConfigMapLister
|
||||
IngressAnnotation store.IngressAnnotationsLister
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
CreateEvent EventType = "CREATE"
|
||||
UpdateEvent EventType = "UPDATE"
|
||||
DeleteEvent EventType = "DELETE"
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
Type EventType
|
||||
Obj interface{}
|
||||
}
|
||||
|
||||
// Configuration holds the definition of all the parts required to describe all
|
||||
|
|
Loading…
Reference in a new issue