Merge pull request #1929 from aledbf/stores

Refactoring of kubernetes informers and local caches
This commit is contained in:
Manuel Alejandro de Brito Fontes 2018-01-19 17:44:06 -02:00 committed by GitHub
commit c49c17eb91
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 1768 additions and 872 deletions

View file

@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
@ -29,7 +28,6 @@ import (
"syscall"
"time"
proxyproto "github.com/armon/go-proxyproto"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -121,7 +119,7 @@ func main() {
// create the default SSL certificate (dummy)
defCert, defKey := ssl.GetFakeSSLCert()
c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{})
c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{}, fs)
if err != nil {
glog.Fatalf("Error generating self signed certificate: %v", err)
}
@ -133,10 +131,6 @@ func main() {
ngx := controller.NewNGINXController(conf, fs)
if conf.EnableSSLPassthrough {
setupSSLProxy(conf.ListenPorts.HTTPS, conf.ListenPorts.SSLProxy, ngx)
}
go handleSigterm(ngx, func(code int) {
os.Exit(code)
})
@ -168,49 +162,6 @@ func handleSigterm(ngx *controller.NGINXController, exit exiter) {
exit(exitCode)
}
func setupSSLProxy(sslPort, proxyPort int, n *controller.NGINXController) {
glog.Info("starting TLS proxy for SSL passthrough")
n.Proxy = &controller.TCPProxy{
Default: &controller.TCPServer{
Hostname: "localhost",
IP: "127.0.0.1",
Port: proxyPort,
ProxyProtocol: true,
},
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", sslPort))
if err != nil {
glog.Fatalf("%v", err)
}
proxyList := &proxyproto.Listener{Listener: listener}
// start goroutine that accepts tcp connections in port 443
go func() {
for {
var conn net.Conn
var err error
if n.IsProxyProtocolEnabled {
// we need to wrap the listener in order to decode
// proxy protocol before handling the connection
conn, err = proxyList.Accept()
} else {
conn, err = listener.Accept()
}
if err != nil {
glog.Warningf("unexpected error accepting tcp connection: %v", err)
continue
}
glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
go n.Proxy.Handle(conn)
}
}()
}
// createApiserverClient creates new Kubernetes Apiserver client. When kubeconfig or apiserverHost param is empty
// the function assumes that it is running inside a Kubernetes cluster and attempts to
// discover the Apiserver. Otherwise, it connects to the Apiserver specified.
@ -328,7 +279,9 @@ func registerHandlers(enableProfiling bool, port int, ic *controller.NGINXContro
Addr: fmt.Sprintf(":%v", port),
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second,
ReadHeaderTimeout: 10 * time.Second,
WriteTimeout: 300 * time.Second,
IdleTimeout: 120 * time.Second,
}
glog.Fatal(server.ListenAndServe())
}

View file

@ -36,14 +36,9 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/annotations/proxy"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/ingress/defaults"
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/task"
)
@ -101,14 +96,9 @@ type Configuration struct {
SyncRateLimit float32
}
// GetDefaultBackend returns the default backend
func (n NGINXController) GetDefaultBackend() defaults.Backend {
return n.backendDefaults
}
// GetPublishService returns the configured service used to set ingress status
func (n NGINXController) GetPublishService() *apiv1.Service {
s, err := n.listers.Service.GetByName(n.cfg.PublishService)
s, err := n.store.GetService(n.cfg.PublishService)
if err != nil {
return nil
}
@ -116,16 +106,6 @@ func (n NGINXController) GetPublishService() *apiv1.Service {
return s
}
// GetSecret searches for a secret in the local secrets Store
func (n NGINXController) GetSecret(name string) (*apiv1.Secret, error) {
return n.listers.Secret.GetByName(name)
}
// GetService searches for a service in the local secrets Store
func (n NGINXController) GetService(name string) (*apiv1.Service, error) {
return n.listers.Service.GetByName(name)
}
// sync collects all the pieces required to assemble the configuration file and
// then sends the content to the backend (OnUpdate) receiving the populated
// template as response reloading the backend if is required.
@ -138,33 +118,21 @@ func (n *NGINXController) syncIngress(item interface{}) error {
if element, ok := item.(task.Element); ok {
if name, ok := element.Key.(string); ok {
if obj, exists, _ := n.listers.Ingress.GetByKey(name); exists {
ing := obj.(*extensions.Ingress)
n.readSecrets(ing)
if ing, err := n.store.GetIngress(name); err == nil {
n.store.ReadSecrets(ing)
}
}
}
// Sort ingress rules using the ResourceVersion field
ings := n.listers.Ingress.List()
ings := n.store.ListIngresses()
sort.SliceStable(ings, func(i, j int) bool {
ir := ings[i].(*extensions.Ingress).ResourceVersion
jr := ings[j].(*extensions.Ingress).ResourceVersion
ir := ings[i].ResourceVersion
jr := ings[j].ResourceVersion
return ir < jr
})
// filter ingress rules
var ingresses []*extensions.Ingress
for _, ingIf := range ings {
ing := ingIf.(*extensions.Ingress)
if !class.IsValid(ing) {
continue
}
ingresses = append(ingresses, ing)
}
upstreams, servers := n.getBackendServers(ingresses)
upstreams, servers := n.getBackendServers(ings)
var passUpstreams []*ingress.SSLPassthroughBackend
for _, server := range servers {
@ -232,7 +200,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
return []ingress.L4Service{}
}
configmap, err := n.listers.ConfigMap.GetByName(configmapName)
configmap, err := n.store.GetConfigMap(configmapName)
if err != nil {
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
return []ingress.L4Service{}
@ -242,12 +210,6 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
var svcProxyProtocol ingress.ProxyProtocol
// k -> port to expose
// v -> <namespace>/<service name>:<port from service to be used>
for k, v := range configmap.Data {
externalPort, err := strconv.Atoi(k)
if err != nil {
glog.Warningf("%v is not valid as a TCP/UDP port", k)
continue
}
rp := []int{
n.cfg.ListenPorts.HTTP,
@ -257,8 +219,16 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
n.cfg.ListenPorts.Health,
n.cfg.ListenPorts.Default,
}
reserverdPorts := sets.NewInt(rp...)
if intInSlice(externalPort, rp) {
for k, v := range configmap.Data {
externalPort, err := strconv.Atoi(k)
if err != nil {
glog.Warningf("%v is not valid as a TCP/UDP port", k)
continue
}
if reserverdPorts.Has(externalPort) {
glog.Warningf("port %v cannot be used for TCP or UDP services. It is reserved for the Ingress controller", k)
continue
}
@ -290,19 +260,12 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
continue
}
svcObj, svcExists, err := n.listers.Service.GetByKey(nsName)
svc, err := n.store.GetService(nsName)
if err != nil {
glog.Warningf("error getting service %v: %v", nsName, err)
continue
}
if !svcExists {
glog.Warningf("service %v was not found", nsName)
continue
}
svc := svcObj.(*apiv1.Service)
var endps []ingress.Endpoint
targetPort, err := strconv.Atoi(svcPort)
if err != nil {
@ -359,20 +322,13 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
Name: defUpstreamName,
}
svcKey := n.cfg.DefaultService
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey)
svc, err := n.store.GetService(svcKey)
if err != nil {
glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err)
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream
}
if !svcExists {
glog.Warningf("service %v does not exist", svcKey)
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream
}
svc := svcObj.(*apiv1.Service)
endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{})
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
@ -392,7 +348,10 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
servers := n.createServers(ingresses, upstreams, du)
for _, ing := range ingresses {
anns := n.getIngressAnnotations(ing)
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
}
for _, rule := range ing.Spec.Rules {
host := rule.Host
@ -603,29 +562,6 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
return aUpstreams, aServers
}
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
func (n NGINXController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
if _, exists := n.sslCertTracker.Get(name); !exists {
n.syncSecret(name)
}
_, err := n.listers.Secret.GetByName(name)
if err != nil {
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
}
bc, exists := n.sslCertTracker.Get(name)
if !exists {
return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name)
}
cert := bc.(*ingress.SSLCert)
return &resolver.AuthSSLCert{
Secret: name,
CAFileName: cert.CAFileName,
PemSHA: cert.PemSHA,
}, nil
}
// createUpstreams creates the NGINX upstreams for each service referenced in
// Ingress rules. The servers inside the upstream are endpoints.
func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingress.Backend) map[string]*ingress.Backend {
@ -633,7 +569,10 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[defUpstreamName] = du
for _, ing := range data {
anns := n.getIngressAnnotations(ing)
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
}
var defBackend string
if ing.Spec.Backend != nil {
@ -730,7 +669,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
upstreams[name].Endpoints = endp
}
s, err := n.listers.Service.GetByName(svcKey)
s, err := n.store.GetService(svcKey)
if err != nil {
glog.Warningf("error obtaining service: %v", err)
continue
@ -745,13 +684,11 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
}
func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey)
if !svcExists {
svc, err := n.store.GetService(svcKey)
if err != nil {
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
}
svc := svcObj.(*apiv1.Service)
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
return endpoint, fmt.Errorf("No ClusterIP found for service %s", svcKey)
}
@ -783,7 +720,7 @@ func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *exte
// to a service.
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
hz *healthcheck.Config) ([]ingress.Endpoint, error) {
svc, err := n.listers.Service.GetByName(svcKey)
svc, err := n.store.GetService(svcKey)
var upstreams []ingress.Endpoint
if err != nil {
@ -865,7 +802,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// remove the alias to avoid conflicts.
aliases := make(map[string]string, len(data))
bdef := n.GetDefaultBackend()
bdef := n.store.GetDefaultBackend()
ngxProxy := proxy.Config{
BodySize: bdef.ProxyBodySize,
ConnectTimeout: bdef.ProxyConnectTimeout,
@ -885,7 +822,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// Tries to fetch the default Certificate from nginx configuration.
// If it does not exists, use the ones generated on Start()
defaultCertificate, err := n.getPemCertificate(n.cfg.DefaultSSLCertificate)
defaultCertificate, err := n.store.GetLocalSecret(n.cfg.DefaultSSLCertificate)
if err == nil {
defaultPemFileName = defaultCertificate.PemFileName
defaultPemSHA = defaultCertificate.PemSHA
@ -908,7 +845,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// initialize all the servers
for _, ing := range data {
anns := n.getIngressAnnotations(ing)
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
}
// default upstream server
un := du.Name
@ -976,7 +916,10 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
// configure default location, alias, and SSL
for _, ing := range data {
anns := n.getIngressAnnotations(ing)
anns, err := n.store.GetIngressAnnotations(ing)
if err != nil {
glog.Errorf("unexpected error reading ingress annotations: %v", err)
}
for _, rule := range ing.Spec.Rules {
host := rule.Host
@ -1041,13 +984,12 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName)
bc, exists := n.sslCertTracker.Get(key)
if !exists {
cert, err := n.store.GetLocalSecret(key)
if err != nil {
glog.Warningf("ssl certificate \"%v\" does not exist in local store", key)
continue
}
cert := bc.(*ingress.SSLCert)
err = cert.Certificate.VerifyHostname(host)
if err != nil {
glog.Warningf("unexpected error validating SSL certificate %v for host %v. Reason: %v", key, host, err)
@ -1124,7 +1066,7 @@ func (n *NGINXController) getEndpoints(
}
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := n.listers.Endpoint.GetServiceEndpoints(s)
ep, err := n.store.GetServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return upsServers
@ -1173,24 +1115,6 @@ func (n *NGINXController) getEndpoints(
return upsServers
}
// readSecrets extracts information about secrets from an Ingress rule
func (n *NGINXController) readSecrets(ing *extensions.Ingress) {
for _, tls := range ing.Spec.TLS {
if tls.SecretName == "" {
continue
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
n.syncSecret(key)
}
key, _ := parser.GetStringAnnotation("auth-tls-secret", ing)
if key == "" {
return
}
n.syncSecret(key)
}
func (n *NGINXController) isForceReload() bool {
return atomic.LoadInt32(&n.forceReload) != 0
}
@ -1204,27 +1128,3 @@ func (n *NGINXController) SetForceReload(shouldReload bool) {
atomic.StoreInt32(&n.forceReload, 0)
}
}
func (n *NGINXController) extractAnnotations(ing *extensions.Ingress) {
glog.V(3).Infof("updating annotations information for ingress %v/%v", ing.Namespace, ing.Name)
anns := n.annotations.Extract(ing)
err := n.listers.IngressAnnotation.Update(anns)
if err != nil {
glog.Errorf("unexpected error updating annotations information for ingress %v/%v: %v", anns.Namespace, anns.Name, err)
}
}
// getByIngress returns the parsed annotations from an Ingress
func (n *NGINXController) getIngressAnnotations(ing *extensions.Ingress) *annotations.Ingress {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
item, exists, err := n.listers.IngressAnnotation.GetByKey(key)
if err != nil {
glog.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
return &annotations.Ingress{}
}
if !exists {
glog.Errorf("ingress annotation %v was not found", key)
return &annotations.Ingress{}
}
return item.(*annotations.Ingress)
}

View file

@ -1,244 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"reflect"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
cache_client "k8s.io/client-go/tools/cache"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
)
type cacheController struct {
Ingress cache.Controller
Endpoint cache.Controller
Service cache.Controller
Secret cache.Controller
Configmap cache.Controller
}
func (c *cacheController) Run(stopCh chan struct{}) {
go c.Endpoint.Run(stopCh)
go c.Service.Run(stopCh)
go c.Secret.Run(stopCh)
go c.Configmap.Run(stopCh)
time.Sleep(1 * time.Second)
go c.Ingress.Run(stopCh)
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh,
c.Endpoint.HasSynced,
c.Service.HasSynced,
c.Secret.HasSynced,
c.Configmap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
// We need to wait before start syncing the ingress rules
// because the rules requires content from other listers
time.Sleep(1 * time.Second)
go c.Ingress.Run(stopCh)
if !cache.WaitForCacheSync(stopCh,
c.Ingress.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
}
func (n *NGINXController) createListers(stopCh chan struct{}) (*ingress.StoreLister, *cacheController) {
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !class.IsValid(addIng) {
a := addIng.GetAnnotations()[class.IngressKey]
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
return
}
n.extractAnnotations(addIng)
n.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
n.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
delIng, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(delIng) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
return
}
n.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
n.listers.IngressAnnotation.Delete(delIng)
n.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng)
validCur := class.IsValid(curIng)
c := curIng.GetAnnotations()[class.IngressKey]
if !validOld && validCur {
glog.Infof("creating ingress %v/%v based on annotation %v with value '%v'", curIng.Namespace, curIng.Name, class.IngressKey, c)
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v/%v based on annotation %v with value '%v'", curIng.Namespace, curIng.Name, class.IngressKey, c)
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
n.extractAnnotations(curIng)
n.syncQueue.Enqueue(cur)
},
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
n.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*apiv1.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
_, exists := n.sslCertTracker.Get(key)
if exists {
n.syncSecret(key)
}
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*apiv1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
sec, ok = tombstone.Obj.(*apiv1.Secret)
if !ok {
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
return
}
}
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
n.sslCertTracker.Delete(key)
n.syncQueue.Enqueue(obj)
},
}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
n.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
n.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
n.syncQueue.Enqueue(cur)
}
},
}
mapEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
upCmap := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == n.cfg.ConfigMapName {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
n.SetConfig(upCmap)
n.SetForceReload(true)
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upCmap := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == n.cfg.ConfigMapName {
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
n.SetConfig(upCmap)
n.SetForceReload(true)
}
// updates to configuration configmaps can trigger an update
if mapKey == n.cfg.ConfigMapName || mapKey == n.cfg.TCPConfigMapName || mapKey == n.cfg.UDPConfigMapName {
n.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
n.syncQueue.Enqueue(cur)
}
}
},
}
watchNs := apiv1.NamespaceAll
if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll {
watchNs = n.cfg.Namespace
}
lister := &ingress.StoreLister{}
lister.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
controller := &cacheController{}
lister.Ingress.Store, controller.Ingress = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", n.cfg.Namespace, fields.Everything()),
&extensions.Ingress{}, n.cfg.ResyncPeriod, ingEventHandler)
lister.Endpoint.Store, controller.Endpoint = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "endpoints", n.cfg.Namespace, fields.Everything()),
&apiv1.Endpoints{}, n.cfg.ResyncPeriod, eventHandler)
lister.Secret.Store, controller.Secret = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
&apiv1.Secret{}, n.cfg.ResyncPeriod, secrEventHandler)
lister.ConfigMap.Store, controller.Configmap = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
&apiv1.ConfigMap{}, n.cfg.ResyncPeriod, mapEventHandler)
lister.Service.Store, controller.Service = cache.NewInformer(
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "services", n.cfg.Namespace, fields.Everything()),
&apiv1.Service{}, n.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
return lister, controller
}

View file

@ -18,14 +18,12 @@ package controller
import (
"bytes"
"encoding/base64"
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"sync"
@ -34,9 +32,9 @@ import (
"github.com/golang/glog"
proxyproto "github.com/armon/go-proxyproto"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
@ -49,10 +47,9 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/ingress/controller/process"
"k8s.io/ingress-nginx/internal/ingress/controller/store"
ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template"
"k8s.io/ingress-nginx/internal/ingress/defaults"
"k8s.io/ingress-nginx/internal/ingress/status"
"k8s.io/ingress-nginx/internal/ingress/store"
ing_net "k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/net/dns"
"k8s.io/ingress-nginx/internal/net/ssl"
@ -96,16 +93,12 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
}
n := &NGINXController{
backendDefaults: ngx_config.NewDefault().Backend,
binary: ngx,
configmap: &apiv1.ConfigMap{},
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
cfg: config,
sslCertTracker: store.NewSSLCertTracker(),
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
@ -113,6 +106,8 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
}),
stopCh: make(chan struct{}),
updateCh: make(chan store.Event, 1024),
stopLock: &sync.Mutex{},
fileSystem: fs,
@ -121,19 +116,27 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
runningConfig: &ingress.Configuration{},
}
n.listers, n.controllers = n.createListers(n.stopCh)
n.store = store.New(true,
config.Namespace,
config.ConfigMapName,
config.TCPConfigMapName,
config.UDPConfigMapName,
config.ResyncPeriod,
config.Client,
fs,
n.updateCh)
n.stats = newStatsCollector(config.Namespace, class.IngressClass, n.binary, n.cfg.ListenPorts.Status)
n.syncQueue = task.NewTaskQueue(n.syncIngress)
n.annotations = annotations.NewAnnotationExtractor(n)
n.annotations = annotations.NewAnnotationExtractor(n.store)
if config.UpdateStatus {
n.syncStatus = status.NewStatusSyncer(status.Config{
Client: config.Client,
PublishService: config.PublishService,
IngressLister: n.listers.Ingress,
IngressLister: n.store,
ElectionID: config.ElectionID,
IngressClass: class.IngressClass,
DefaultIngressClass: class.DefaultClass,
@ -186,9 +189,6 @@ Error loading new template : %v
type NGINXController struct {
cfg *Configuration
listers *ingress.StoreLister
controllers *cacheController
annotations annotations.Extractor
recorder record.EventRecorder
@ -197,10 +197,6 @@ type NGINXController struct {
syncStatus status.Sync
// local store of SSL certificates
// (only certificates used in ingress)
sslCertTracker *store.SSLCertTracker
syncRateLimiter flowcontrol.RateLimiter
// stopLock is used to enforce only a single call to Stop is active.
@ -209,6 +205,7 @@ type NGINXController struct {
stopLock *sync.Mutex
stopCh chan struct{}
updateCh chan store.Event
// ngxErrCh channel used to detect errors with the nginx processes
ngxErrCh chan error
@ -220,8 +217,6 @@ type NGINXController struct {
t *ngx_template.Template
configmap *apiv1.ConfigMap
binary string
resolver []net.IP
@ -231,14 +226,11 @@ type NGINXController struct {
// returns true if IPV6 is enabled in the pod
isIPV6Enabled bool
// returns true if proxy protocol es enabled
IsProxyProtocolEnabled bool
isShuttingDown bool
Proxy *TCPProxy
backendDefaults defaults.Backend
store store.Storer
fileSystem filesystem.Filesystem
}
@ -247,32 +239,12 @@ type NGINXController struct {
func (n *NGINXController) Start() {
glog.Infof("starting Ingress controller")
n.controllers.Run(n.stopCh)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, obj := range n.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing) {
a := ing.GetAnnotations()[class.IngressKey]
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
}
n.readSecrets(ing)
}
if n.cfg.EnableSSLChainCompletion {
go wait.Until(n.checkSSLChainIssues, 60*time.Second, n.stopCh)
}
n.store.Run(n.stopCh)
if n.syncStatus != nil {
go n.syncStatus.Run()
}
go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh)
done := make(chan error, 1)
cmd := exec.Command(n.binary, "-c", cfgPath)
@ -283,6 +255,10 @@ func (n *NGINXController) Start() {
Pgid: 0,
}
if n.cfg.EnableSSLPassthrough {
n.setupSSLProxy()
}
glog.Info("starting NGINX process...")
n.start(cmd)
@ -310,6 +286,16 @@ func (n *NGINXController) Start() {
// start a new nginx master process if the controller is not being stopped
n.start(cmd)
}
case evt := <-n.updateCh:
if n.isShuttingDown {
break
}
glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
n.SetForceReload(true)
}
n.syncQueue.Enqueue(evt.Obj)
case <-n.stopCh:
break
}
@ -412,37 +398,6 @@ Error: %v
return nil
}
// SetConfig sets the configured configmap
func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) {
n.configmap = cmap
n.IsProxyProtocolEnabled = false
m := map[string]string{}
if cmap != nil {
m = cmap.Data
}
val, ok := m["use-proxy-protocol"]
if ok {
b, err := strconv.ParseBool(val)
if err == nil {
n.IsProxyProtocolEnabled = b
}
}
c := ngx_template.ReadConfig(m)
if c.SSLSessionTicketKey != "" {
d, err := base64.StdEncoding.DecodeString(c.SSLSessionTicketKey)
if err != nil {
glog.Warningf("unexpected error decoding key ssl-session-ticket-key: %v", err)
c.SSLSessionTicketKey = ""
}
ioutil.WriteFile("/etc/nginx/tickets.key", d, 0644)
}
n.backendDefaults = c.Backend
}
// OnUpdate is called periodically by syncQueue to keep the configuration in sync.
//
// 1. converts configmap configuration to custom configuration object
@ -452,7 +407,7 @@ func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) {
// returning nill implies the backend will be reloaded.
// if an error is returned means requeue the update
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := ngx_template.ReadConfig(n.configmap.Data)
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
servers := []*TCPServer{}
@ -488,10 +443,6 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
})
}
if n.cfg.EnableSSLPassthrough {
n.Proxy.ServerList = servers
}
// we need to check if the status module configuration changed
if cfg.EnableVtsStatus {
n.setupMonitor(vtsStatusModule)
@ -562,43 +513,38 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
setHeaders := map[string]string{}
if cfg.ProxySetHeaders != "" {
cmap, exists, err := n.listers.ConfigMap.GetByKey(cfg.ProxySetHeaders)
cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders)
if err != nil {
glog.Warningf("unexpected error reading configmap %v: %v", cfg.ProxySetHeaders, err)
}
if exists {
setHeaders = cmap.(*apiv1.ConfigMap).Data
}
setHeaders = cmap.Data
}
addHeaders := map[string]string{}
if cfg.AddHeaders != "" {
cmap, exists, err := n.listers.ConfigMap.GetByKey(cfg.AddHeaders)
cmap, err := n.store.GetConfigMap(cfg.AddHeaders)
if err != nil {
glog.Warningf("unexpected error reading configmap %v: %v", cfg.AddHeaders, err)
}
if exists {
addHeaders = cmap.(*apiv1.ConfigMap).Data
}
addHeaders = cmap.Data
}
sslDHParam := ""
if cfg.SSLDHParam != "" {
secretName := cfg.SSLDHParam
s, exists, err := n.listers.Secret.GetByKey(secretName)
secret, err := n.store.GetSecret(secretName)
if err != nil {
glog.Warningf("unexpected error reading secret %v: %v", secretName, err)
}
if exists {
secret := s.(*apiv1.Secret)
nsSecName := strings.Replace(secretName, "/", "-", -1)
dh, ok := secret.Data["dhparam.pem"]
if ok {
pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh)
pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh, n.fileSystem)
if err != nil {
glog.Warningf("unexpected error adding or updating dhparam %v file: %v", nsSecName, err)
} else {
@ -606,19 +552,9 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
}
}
}
}
cfg.SSLDHParam = sslDHParam
// disable features are not available in some platforms
switch runtime.GOARCH {
case "arm", "arm64", "ppc64le":
cfg.EnableModsecurity = false
case "s390x":
cfg.EnableModsecurity = false
cfg.EnableBrotli = false
}
tc := ngx_config.TemplateConfig{
ProxySetHeaders: setHeaders,
AddHeaders: addHeaders,
@ -713,3 +649,49 @@ func nextPowerOf2(v int) int {
return v
}
func (n *NGINXController) setupSSLProxy() {
sslPort := n.cfg.ListenPorts.HTTPS
proxyPort := n.cfg.ListenPorts.SSLProxy
glog.Info("starting TLS proxy for SSL passthrough")
n.Proxy = &TCPProxy{
Default: &TCPServer{
Hostname: "localhost",
IP: "127.0.0.1",
Port: proxyPort,
ProxyProtocol: true,
},
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", sslPort))
if err != nil {
glog.Fatalf("%v", err)
}
proxyList := &proxyproto.Listener{Listener: listener}
// start goroutine that accepts tcp connections in port 443
go func() {
for {
var conn net.Conn
var err error
if n.store.GetBackendConfiguration().UseProxyProtocol {
// we need to wrap the listener in order to decode
// proxy protocol before handling the connection
conn, err = proxyList.Accept()
} else {
conn, err = listener.Accept()
}
if err != nil {
glog.Warningf("unexpected error accepting tcp connection: %v", err)
continue
}
glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
go n.Proxy.Handle(conn)
}
}()
}

View file

@ -14,11 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
package store
import (
"fmt"
"io/ioutil"
"strings"
"github.com/golang/glog"
@ -26,52 +25,57 @@ import (
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/net/ssl"
)
// syncSecret keeps in sync Secrets used by Ingress rules with the files on
// disk to allow copy of the content of the secret to disk to be used
// by external processes.
func (ic *NGINXController) syncSecret(key string) {
func (s k8sStore) syncSecret(key string) {
s.mu.Lock()
defer s.mu.Unlock()
glog.V(3).Infof("starting syncing of secret %v", key)
cert, err := ic.getPemCertificate(key)
// TODO: getPemCertificate should not write to disk to avoid unnecessary overhead
cert, err := s.getPemCertificate(key)
if err != nil {
glog.Warningf("error obtaining PEM from secret %v: %v", key, err)
return
}
// create certificates and add or update the item in the store
cur, exists := ic.sslCertTracker.Get(key)
if exists {
s := cur.(*ingress.SSLCert)
if s.Equal(cert) {
cur, err := s.GetLocalSecret(key)
if err == nil {
if cur.Equal(cert) {
// no need to update
return
}
glog.Infof("updating secret %v in the local store", key)
ic.sslCertTracker.Update(key, cert)
s.sslStore.Update(key, cert)
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.syncQueue.Enqueue(&extensions.Ingress{})
s.sendDummyEvent()
return
}
glog.Infof("adding secret %v to the local store", key)
ic.sslCertTracker.Add(key, cert)
s.sslStore.Add(key, cert)
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.syncQueue.Enqueue(&extensions.Ingress{})
s.sendDummyEvent()
}
// getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
// It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
secret, err := ic.listers.Secret.GetByName(secretName)
func (s k8sStore) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
secret, err := s.listers.Secret.ByKey(secretName)
if err != nil {
return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err)
}
@ -83,7 +87,7 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer
// namespace/secretName -> namespace-secretName
nsSecName := strings.Replace(secretName, "/", "-", -1)
var s *ingress.SSLCert
var sslCert *ingress.SSLCert
if okcert && okkey {
if cert == nil {
return nil, fmt.Errorf("secret %v has no 'tls.crt'", secretName)
@ -94,18 +98,17 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer
// If 'ca.crt' is also present, it will allow this secret to be used in the
// 'nginx.ingress.kubernetes.io/auth-tls-secret' annotation
s, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca)
sslCert, err = ssl.AddOrUpdateCertAndKey(nsSecName, cert, key, ca, s.filesystem)
if err != nil {
return nil, fmt.Errorf("unexpected error creating pem file: %v", err)
}
glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, s.CN)
glog.V(3).Infof("found 'tls.crt' and 'tls.key', configuring %v as a TLS Secret (CN: %v)", secretName, sslCert.CN)
if ca != nil {
glog.V(3).Infof("found 'ca.crt', secret %v can also be used for Certificate Authentication", secretName)
}
} else if ca != nil {
s, err = ssl.AddCertAuth(nsSecName, ca)
sslCert, err = ssl.AddCertAuth(nsSecName, ca, s.filesystem)
if err != nil {
return nil, fmt.Errorf("unexpected error creating pem file: %v", err)
@ -119,29 +122,40 @@ func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCer
return nil, fmt.Errorf("no keypair or CA cert could be found in %v", secretName)
}
s.Name = secret.Name
s.Namespace = secret.Namespace
return s, nil
sslCert.Name = secret.Name
sslCert.Namespace = secret.Namespace
return sslCert, nil
}
func (ic *NGINXController) checkSSLChainIssues() {
for _, secretName := range ic.sslCertTracker.ListKeys() {
s, _ := ic.sslCertTracker.Get(secretName)
secret := s.(*ingress.SSLCert)
func (s k8sStore) checkSSLChainIssues() {
for _, item := range s.ListLocalSecrets() {
secretName := k8s.MetaNamespaceKey(item)
secret, err := s.GetLocalSecret(secretName)
if err != nil {
continue
}
if secret.FullChainPemFileName != "" {
// chain already checked
continue
}
data, err := ssl.FullChainCert(secret.PemFileName)
data, err := ssl.FullChainCert(secret.PemFileName, s.filesystem)
if err != nil {
glog.Errorf("unexpected error generating SSL certificate with full intermediate chain CA certs: %v", err)
continue
}
fullChainPemFileName := fmt.Sprintf("%v/%v-%v-full-chain.pem", ingress.DefaultSSLDirectory, secret.Namespace, secret.Name)
err = ioutil.WriteFile(fullChainPemFileName, data, 0655)
fullChainPemFileName := fmt.Sprintf("%v/%v-%v-full-chain.pem", file.DefaultSSLDirectory, secret.Namespace, secret.Name)
file, err := s.filesystem.Create(fullChainPemFileName)
if err != nil {
glog.Errorf("unexpected error creating SSL certificate file %v: %v", fullChainPemFileName, err)
continue
}
_, err = file.Write(data)
if err != nil {
glog.Errorf("unexpected error creating SSL certificate: %v", err)
continue
@ -158,42 +172,67 @@ func (ic *NGINXController) checkSSLChainIssues() {
dst.FullChainPemFileName = fullChainPemFileName
glog.Infof("updating local copy of ssl certificate %v with missing intermediate CA certs", secretName)
ic.sslCertTracker.Update(secretName, dst)
s.sslStore.Update(secretName, dst)
// this update must trigger an update
// (like an update event from a change in Ingress)
ic.syncQueue.Enqueue(&extensions.Ingress{})
s.sendDummyEvent()
}
}
// checkMissingSecrets verify if one or more ingress rules contains a reference
// to a secret that is not present in the local secret store.
// In this case we call syncSecret.
func (ic *NGINXController) checkMissingSecrets() {
for _, obj := range ic.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing) {
continue
}
// checkMissingSecrets verifies if one or more ingress rules contains
// a reference to a secret that is not present in the local secret store.
func (s k8sStore) checkMissingSecrets() {
for _, ing := range s.ListIngresses() {
for _, tls := range ing.Spec.TLS {
if tls.SecretName == "" {
continue
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
if _, ok := ic.sslCertTracker.Get(key); !ok {
ic.syncSecret(key)
if _, ok := s.sslStore.Get(key); !ok {
s.syncSecret(key)
}
}
key, _ := parser.GetStringAnnotation("auth-tls-secret", ing)
if key == "" {
return
}
if _, ok := s.sslStore.Get(key); !ok {
s.syncSecret(key)
}
}
}
// ReadSecrets extracts information about secrets from an Ingress rule
func (s k8sStore) ReadSecrets(ing *extensions.Ingress) {
for _, tls := range ing.Spec.TLS {
if tls.SecretName == "" {
continue
}
if _, ok := ic.sslCertTracker.Get(key); !ok {
ic.syncSecret(key)
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
s.syncSecret(key)
}
key, _ := parser.GetStringAnnotation("auth-tls-secret", ing)
if key == "" {
return
}
s.syncSecret(key)
}
// sendDummyEvent sends a dummy event to trigger an update
// This is used in when a secret change
func (s *k8sStore) sendDummyEvent() {
s.updateCh <- Event{
Type: UpdateEvent,
Obj: &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "dummy",
Namespace: "dummy",
},
},
}
}

View file

@ -14,23 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
package store
import (
"encoding/base64"
"fmt"
"io/ioutil"
"testing"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testclient "k8s.io/client-go/kubernetes/fake"
cache_client "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/store"
"k8s.io/ingress-nginx/internal/task"
)
const (
@ -65,8 +57,8 @@ func buildSimpleClientSetForBackendSSL() *testclient.Clientset {
return testclient.NewSimpleClientset()
}
func buildIngListenerForBackendSSL() store.IngressLister {
ingLister := store.IngressLister{}
func buildIngListenerForBackendSSL() IngressLister {
ingLister := IngressLister{}
ingLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
return ingLister
}
@ -80,20 +72,21 @@ func buildSecretForBackendSSL() *apiv1.Secret {
}
}
func buildSecrListerForBackendSSL() store.SecretLister {
secrLister := store.SecretLister{}
func buildSecrListerForBackendSSL() SecretLister {
secrLister := SecretLister{}
secrLister.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
return secrLister
}
/*
func buildListers() *ingress.StoreLister {
sl := &ingress.StoreLister{}
sl.Ingress.Store = buildIngListenerForBackendSSL()
sl.Secret.Store = buildSecrListerForBackendSSL()
return sl
}
*/
func buildControllerForBackendSSL() cache_client.Controller {
cfg := &cache_client.Config{
Queue: &MockQueue{Synced: true},
@ -102,6 +95,7 @@ func buildControllerForBackendSSL() cache_client.Controller {
return cache_client.New(cfg)
}
/*
func buildGenericControllerForBackendSSL() *NGINXController {
gc := &NGINXController{
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
@ -109,21 +103,15 @@ func buildGenericControllerForBackendSSL() *NGINXController {
Client: buildSimpleClientSetForBackendSSL(),
},
listers: buildListers(),
sslCertTracker: store.NewSSLCertTracker(),
sslCertTracker: NewSSLCertTracker(),
}
gc.syncQueue = task.NewTaskQueue(gc.syncIngress)
return gc
}
*/
func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) {
// prepare
td, err := ioutil.TempDir("", "ssl")
if err != nil {
return nil, nil, nil, fmt.Errorf("error occurs while creating temp directory: %v", err)
}
ingress.DefaultSSLDirectory = td
dCrt, err := base64.StdEncoding.DecodeString(tlsCrt)
if err != nil {
return nil, nil, nil, err
@ -139,6 +127,7 @@ func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) {
return dCrt, dKey, dCa, nil
}
/*
func TestSyncSecret(t *testing.T) {
// prepare for test
dCrt, dKey, dCa, err := buildCrtKeyAndCA()
@ -152,8 +141,8 @@ func TestSyncSecret(t *testing.T) {
Data map[string][]byte
expectSuccess bool
}{
{"getPemCertificate_error", "default/foo_secret", map[string][]byte{apiv1.TLSPrivateKeyKey: dKey}, false},
{"normal_test", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt, apiv1.TLSPrivateKeyKey: dKey, tlscaName: dCa}, true},
{"getPemCertificate_error", "default/foo_secret", map[string][]byte{api.TLSPrivateKeyKey: dKey}, false},
{"normal_test", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt, api.TLSPrivateKeyKey: dKey, tlscaName: dCa}, true},
}
for _, foo := range foos {
@ -199,12 +188,12 @@ func TestGetPemCertificate(t *testing.T) {
}{
{"sceret_not_exist", "default/foo_secret_not_exist", nil, true},
{"data_not_complete_all_not_exist", "default/foo_secret", map[string][]byte{}, true},
{"data_not_complete_TLSCertKey_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSPrivateKeyKey: dKey, tlscaName: dCa}, false},
{"data_not_complete_TLSCertKeyAndCA_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSPrivateKeyKey: dKey}, true},
{"data_not_complete_TLSPrivateKeyKey_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt, tlscaName: dCa}, false},
{"data_not_complete_TLSPrivateKeyKeyAndCA_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt}, true},
{"data_not_complete_CA_not_exist", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt, apiv1.TLSPrivateKeyKey: dKey}, false},
{"normal_test", "default/foo_secret", map[string][]byte{apiv1.TLSCertKey: dCrt, apiv1.TLSPrivateKeyKey: dKey, tlscaName: dCa}, false},
{"data_not_complete_TLSCertKey_not_exist", "default/foo_secret", map[string][]byte{api.TLSPrivateKeyKey: dKey, tlscaName: dCa}, false},
{"data_not_complete_TLSCertKeyAndCA_not_exist", "default/foo_secret", map[string][]byte{api.TLSPrivateKeyKey: dKey}, true},
{"data_not_complete_TLSPrivateKeyKey_not_exist", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt, tlscaName: dCa}, false},
{"data_not_complete_TLSPrivateKeyKeyAndCA_not_exist", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt}, true},
{"data_not_complete_CA_not_exist", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt, api.TLSPrivateKeyKey: dKey}, false},
{"normal_test", "default/foo_secret", map[string][]byte{api.TLSCertKey: dCrt, api.TLSPrivateKeyKey: dKey, tlscaName: dCa}, false},
}
for _, foo := range foos {
@ -231,3 +220,4 @@ func TestGetPemCertificate(t *testing.T) {
})
}
}
*/

View file

@ -0,0 +1,41 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// ConfigMapLister makes a Store that lists Configmaps.
type ConfigMapLister struct {
cache.Store
}
// ByKey searches for a configmap in the local configmaps Store
func (cml *ConfigMapLister) ByKey(key string) (*apiv1.ConfigMap, error) {
s, exists, err := cml.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("configmap %v was not found", key)
}
return s.(*apiv1.ConfigMap), nil
}

View file

@ -0,0 +1,42 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// EndpointLister makes a Store that lists Endpoints.
type EndpointLister struct {
cache.Store
}
// GetServiceEndpoints returns the endpoints of a service, matched on service name.
func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
key := fmt.Sprintf("%v/%v", svc.Namespace, svc.Name)
eps, exists, err := s.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("could not find endpoints for service %v", key)
}
return eps.(*apiv1.Endpoints), nil
}

View file

@ -0,0 +1,41 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/tools/cache"
)
// IngressLister makes a Store that lists Ingress.
type IngressLister struct {
cache.Store
}
// ByKey searches for an ingress in the local ingress Store
func (il IngressLister) ByKey(key string) (*extensions.Ingress, error) {
i, exists, err := il.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("ingress %v was not found", key)
}
return i.(*extensions.Ingress), nil
}

View file

@ -0,0 +1,26 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"k8s.io/client-go/tools/cache"
)
// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules.
type IngressAnnotationsLister struct {
cache.Store
}

View file

@ -0,0 +1,46 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-nginx/internal/ingress"
)
// SSLCertTracker holds a store of referenced Secrets in Ingress rules
type SSLCertTracker struct {
cache.ThreadSafeStore
}
// NewSSLCertTracker creates a new SSLCertTracker store
func NewSSLCertTracker() *SSLCertTracker {
return &SSLCertTracker{
cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
}
}
// ByKey searches for an ingress in the local ingress Store
func (s SSLCertTracker) ByKey(key string) (*ingress.SSLCert, error) {
cert, exists := s.Get(key)
if !exists {
return nil, fmt.Errorf("local SSL certificate %v was not found", key)
}
return cert.(*ingress.SSLCert), nil
}

View file

@ -0,0 +1,39 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import "testing"
func TestSSLCertTracker(t *testing.T) {
tracker := NewSSLCertTracker()
items := len(tracker.List())
if items != 0 {
t.Errorf("expected 0 items in the store but %v returned", items)
}
tracker.Add("key", "value")
items = len(tracker.List())
if items != 1 {
t.Errorf("expected 1 item in the store but %v returned", items)
}
item, exists := tracker.Get("key")
if !exists || item == nil {
t.Errorf("expected an item from the store but none returned")
}
}

View file

@ -0,0 +1,41 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// SecretLister makes a Store that lists Secrets.
type SecretLister struct {
cache.Store
}
// ByKey searches for a secret in the local secrets Store
func (sl *SecretLister) ByKey(key string) (*apiv1.Secret, error) {
s, exists, err := sl.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("secret %v was not found", key)
}
return s.(*apiv1.Secret), nil
}

View file

@ -0,0 +1,41 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// ServiceLister makes a Store that lists Services.
type ServiceLister struct {
cache.Store
}
// ByKey searches for a service in the local secrets Store
func (sl *ServiceLister) ByKey(key string) (*apiv1.Service, error) {
s, exists, err := sl.GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("service %v was not found", key)
}
return s.(*apiv1.Service), nil
}

View file

@ -0,0 +1,554 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"encoding/base64"
"fmt"
"io/ioutil"
"reflect"
"sync"
"time"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
cache_client "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template"
"k8s.io/ingress-nginx/internal/ingress/defaults"
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/internal/k8s"
)
// Storer is the interface that wraps the required methods to gather information
// about ingresses, services, secrets and ingress annotations.
type Storer interface {
// GetBackendConfiguration returns the nginx configuration stored in a configmap
GetBackendConfiguration() ngx_config.Configuration
// GetConfigMap returns a ConfigmMap using the namespace and name as key
GetConfigMap(key string) (*apiv1.ConfigMap, error)
// GetSecret returns a Secret using the namespace and name as key
GetSecret(key string) (*apiv1.Secret, error)
// GetService returns a Service using the namespace and name as key
GetService(key string) (*apiv1.Service, error)
GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error)
// GetSecret returns an Ingress using the namespace and name as key
GetIngress(key string) (*extensions.Ingress, error)
// ListIngresses returns the list of Ingresses
ListIngresses() []*extensions.Ingress
// GetIngressAnnotations returns the annotations associated to an Ingress
GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error)
// GetLocalSecret returns the local copy of a Secret
GetLocalSecret(name string) (*ingress.SSLCert, error)
// ListLocalSecrets returns the list of local Secrets
ListLocalSecrets() []*ingress.SSLCert
// GetAuthCertificate resolves a given secret name into an SSL certificate.
// The secret must contain 3 keys named:
// ca.crt: contains the certificate chain used for authentication
GetAuthCertificate(string) (*resolver.AuthSSLCert, error)
// GetDefaultBackend returns the default backend configuration
GetDefaultBackend() defaults.Backend
// Run initiates the synchronization of the controllers
Run(stopCh chan struct{})
// ReadSecrets extracts information about secrets from an Ingress rule
ReadSecrets(*extensions.Ingress)
}
// EventType type of event associated with an informer
type EventType string
const (
// CreateEvent event associated with new objects in an informer
CreateEvent EventType = "CREATE"
// UpdateEvent event associated with an object update in an informer
UpdateEvent EventType = "UPDATE"
// DeleteEvent event associated when an object is removed from an informer
DeleteEvent EventType = "DELETE"
// ConfigurationEvent event associated when a configuration object is created or updated
ConfigurationEvent EventType = "CONFIGURATION"
)
// Event holds the context of an event
type Event struct {
Type EventType
Obj interface{}
}
// Lister returns the stores for ingresses, services, endpoints, secrets and configmaps.
type Lister struct {
Ingress IngressLister
Service ServiceLister
Endpoint EndpointLister
Secret SecretLister
ConfigMap ConfigMapLister
IngressAnnotation IngressAnnotationsLister
}
// Controller defines the required controllers that interact agains the api server
type Controller struct {
Ingress cache.Controller
Endpoint cache.Controller
Service cache.Controller
Secret cache.Controller
Configmap cache.Controller
}
// Run initiates the synchronization of the controllers against the api server
func (c *Controller) Run(stopCh chan struct{}) {
go c.Endpoint.Run(stopCh)
go c.Service.Run(stopCh)
go c.Secret.Run(stopCh)
go c.Configmap.Run(stopCh)
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh,
c.Endpoint.HasSynced,
c.Service.HasSynced,
c.Secret.HasSynced,
c.Configmap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
// We need to wait before start syncing the ingress rules
// because the rules requires content from other listers
time.Sleep(1 * time.Second)
go c.Ingress.Run(stopCh)
if !cache.WaitForCacheSync(stopCh,
c.Ingress.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
}
// k8sStore internal Storer implementation using informers and thread safe stores
type k8sStore struct {
isOCSPCheckEnabled bool
// backendConfig contains the running configuration from the configmap
// this is required because this rarely changes but is a very expensive
// operation to execute in each OnUpdate invocation
backendConfig ngx_config.Configuration
// cache contains the cache Controllers
cache *Controller
// listers contains the cache.Store used in the ingress controller
listers *Lister
// sslStore local store of SSL certificates (certificates used in ingress)
// this is required because the certificates must be present in the
// container filesystem
sslStore *SSLCertTracker
annotations annotations.Extractor
filesystem file.Filesystem
// updateCh
updateCh chan Event
// mu mutex used to avoid simultaneous incovations to syncSecret
mu *sync.Mutex
}
// New creates a new object store to be used in the ingress controller
func New(checkOCSP bool,
namespace, configmap, tcp, udp string,
resyncPeriod time.Duration,
client clientset.Interface,
fs file.Filesystem,
updateCh chan Event) Storer {
store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
cache: &Controller{},
listers: &Lister{},
sslStore: NewSSLCertTracker(),
filesystem: fs,
updateCh: updateCh,
backendConfig: ngx_config.NewDefault(),
mu: &sync.Mutex{},
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: client.CoreV1().Events(namespace),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
})
// k8sStore fulfils resolver.Resolver interface
store.annotations = annotations.NewAnnotationExtractor(store)
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !class.IsValid(addIng) {
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
return
}
store.extractAnnotations(addIng)
recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
updateCh <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
delIng, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(delIng) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
return
}
recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
store.listers.IngressAnnotation.Delete(delIng)
updateCh <- Event{
Type: DeleteEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng)
validCur := class.IsValid(curIng)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
store.extractAnnotations(curIng)
updateCh <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
}
secrEventHandler := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
sec := cur.(*apiv1.Secret)
_, exists := store.sslStore.Get(k8s.MetaNamespaceKey(sec))
if exists {
updateCh <- Event{
Type: UpdateEvent,
Obj: cur,
}
}
}
},
DeleteFunc: func(obj interface{}) {
sec, ok := obj.(*apiv1.Secret)
if !ok {
// If we reached here it means the secret was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
sec, ok = tombstone.Obj.(*apiv1.Secret)
if !ok {
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
return
}
}
store.sslStore.Delete(k8s.MetaNamespaceKey(sec))
updateCh <- Event{
Type: DeleteEvent,
Obj: obj,
}
},
}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh <- Event{
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
updateCh <- Event{
Type: DeleteEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
updateCh <- Event{
Type: UpdateEvent,
Obj: cur,
}
}
},
}
mapEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
m := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name)
if mapKey == configmap {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
store.setConfig(m)
updateCh <- Event{
Type: CreateEvent,
Obj: obj,
}
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
m := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", m.Namespace, m.Name)
if mapKey == configmap {
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
store.setConfig(m)
updateCh <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
}
// updates to configuration configmaps can trigger an update
if mapKey == configmap || mapKey == tcp || mapKey == udp {
recorder.Eventf(m, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
updateCh <- Event{
Type: ConfigurationEvent,
Obj: cur,
}
}
}
},
}
store.listers.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
store.listers.Ingress.Store, store.cache.Ingress = cache.NewInformer(
cache.NewListWatchFromClient(client.ExtensionsV1beta1().RESTClient(), "ingresses", namespace, fields.Everything()),
&extensions.Ingress{}, resyncPeriod, ingEventHandler)
store.listers.Endpoint.Store, store.cache.Endpoint = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", namespace, fields.Everything()),
&apiv1.Endpoints{}, resyncPeriod, eventHandler)
store.listers.Secret.Store, store.cache.Secret = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "secrets", namespace, fields.Everything()),
&apiv1.Secret{}, resyncPeriod, secrEventHandler)
store.listers.ConfigMap.Store, store.cache.Configmap = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()),
&apiv1.ConfigMap{}, resyncPeriod, mapEventHandler)
store.listers.Service.Store, store.cache.Service = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "services", namespace, fields.Everything()),
&apiv1.Service{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
return store
}
func (s k8sStore) extractAnnotations(ing *extensions.Ingress) {
anns := s.annotations.Extract(ing)
glog.V(3).Infof("updating annotations information for ingres %v/%v", anns.Namespace, anns.Name)
err := s.listers.IngressAnnotation.Update(anns)
if err != nil {
glog.Error(err)
}
}
// GetSecret returns a Secret using the namespace and name as key
func (s k8sStore) GetSecret(key string) (*apiv1.Secret, error) {
return s.listers.Secret.ByKey(key)
}
// ListLocalSecrets returns the list of local Secrets
func (s k8sStore) ListLocalSecrets() []*ingress.SSLCert {
var certs []*ingress.SSLCert
for _, item := range s.sslStore.List() {
if s, ok := item.(*ingress.SSLCert); ok {
certs = append(certs, s)
}
}
return certs
}
// GetService returns a Service using the namespace and name as key
func (s k8sStore) GetService(key string) (*apiv1.Service, error) {
return s.listers.Service.ByKey(key)
}
// GetSecret returns an Ingress using the namespace and name as key
func (s k8sStore) GetIngress(key string) (*extensions.Ingress, error) {
return s.listers.Ingress.ByKey(key)
}
// ListIngresses returns the list of Ingresses
func (s k8sStore) ListIngresses() []*extensions.Ingress {
// filter ingress rules
var ingresses []*extensions.Ingress
for _, item := range s.listers.Ingress.List() {
ing := item.(*extensions.Ingress)
if !class.IsValid(ing) {
continue
}
ingresses = append(ingresses, ing)
}
return ingresses
}
// GetIngressAnnotations returns the annotations associated to an Ingress
func (s k8sStore) GetIngressAnnotations(ing *extensions.Ingress) (*annotations.Ingress, error) {
key := fmt.Sprintf("%v/%v", ing.Namespace, ing.Name)
item, exists, err := s.listers.IngressAnnotation.GetByKey(key)
if err != nil {
return &annotations.Ingress{}, fmt.Errorf("unexpected error getting ingress annotation %v: %v", key, err)
}
if !exists {
return &annotations.Ingress{}, fmt.Errorf("ingress annotations %v was not found", key)
}
return item.(*annotations.Ingress), nil
}
// GetLocalSecret returns the local copy of a Secret
func (s k8sStore) GetLocalSecret(key string) (*ingress.SSLCert, error) {
return s.sslStore.ByKey(key)
}
func (s k8sStore) GetConfigMap(key string) (*apiv1.ConfigMap, error) {
return s.listers.ConfigMap.ByKey(key)
}
func (s k8sStore) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
return s.listers.Endpoint.GetServiceEndpoints(svc)
}
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
func (s k8sStore) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
if _, err := s.GetLocalSecret(name); err != nil {
s.syncSecret(name)
}
cert, err := s.GetLocalSecret(name)
if err != nil {
return nil, err
}
return &resolver.AuthSSLCert{
Secret: name,
CAFileName: cert.CAFileName,
PemSHA: cert.PemSHA,
}, nil
}
// GetDefaultBackend returns the default backend
func (s k8sStore) GetDefaultBackend() defaults.Backend {
return s.backendConfig.Backend
}
func (s k8sStore) GetBackendConfiguration() ngx_config.Configuration {
return s.backendConfig
}
func (s *k8sStore) setConfig(cmap *apiv1.ConfigMap) {
s.backendConfig = ngx_template.ReadConfig(cmap.Data)
// TODO: this should not be done here
if s.backendConfig.SSLSessionTicketKey != "" {
d, err := base64.StdEncoding.DecodeString(s.backendConfig.SSLSessionTicketKey)
if err != nil {
glog.Warningf("unexpected error decoding key ssl-session-ticket-key: %v", err)
s.backendConfig.SSLSessionTicketKey = ""
}
ioutil.WriteFile("/etc/nginx/tickets.key", d, 0644)
}
}
// Run initiates the synchronization of the controllers
// and the initial synchronization of the secrets.
func (s k8sStore) Run(stopCh chan struct{}) {
// start controllers
s.cache.Run(stopCh)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, ing := range s.ListIngresses() {
s.ReadSecrets(ing)
}
// start goroutine to check for missing local secrets
go wait.Until(s.checkMissingSecrets, 10*time.Second, stopCh)
if s.isOCSPCheckEnabled {
go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh)
}
}

View file

@ -0,0 +1,526 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
"os"
"sync/atomic"
"testing"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
extensions "k8s.io/api/extensions/v1beta1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/test/e2e/framework"
)
func TestStore(t *testing.T) {
// TODO: find a way to avoid the need to use a real api server
home := os.Getenv("HOME")
kubeConfigFile := fmt.Sprintf("%v/.kube/config", home)
kubeContext := ""
kubeConfig, err := framework.LoadConfig(kubeConfigFile, kubeContext)
if err != nil {
t.Errorf("unexpected error loading kubeconfig file: %v", err)
}
clientSet, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
t.Errorf("unexpected error creating ingress client: %v", err)
}
t.Run("should return an error searching for non existing objects", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
go func(ch chan Event) {
for {
<-ch
}
}(updateCh)
fs := newFS(t)
storer := New(true,
ns.Name,
fmt.Sprintf("%v/config", ns.Name),
fmt.Sprintf("%v/tcp", ns.Name),
fmt.Sprintf("%v/udp", ns.Name),
10*time.Minute,
clientSet,
fs,
updateCh)
storer.Run(stopCh)
key := fmt.Sprintf("%v/anything", ns.Name)
ing, err := storer.GetIngress(key)
if err == nil {
t.Errorf("expected an error but none returned")
}
if ing != nil {
t.Errorf("expected an Ingres but none returned")
}
ls, err := storer.GetLocalSecret(key)
if err == nil {
t.Errorf("expected an error but none returned")
}
if ls != nil {
t.Errorf("expected an Ingres but none returned")
}
s, err := storer.GetSecret(key)
if err == nil {
t.Errorf("expected an error but none returned")
}
if s != nil {
t.Errorf("expected an Ingres but none returned")
}
svc, err := storer.GetService(key)
if err == nil {
t.Errorf("expected an error but none returned")
}
if svc != nil {
t.Errorf("expected an Ingres but none returned")
}
close(updateCh)
close(stopCh)
})
t.Run("should return ingress one event for add, update and delete", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
var add uint64
var upd uint64
var del uint64
go func(ch chan Event) {
for {
e, ok := <-ch
if !ok {
return
}
if e.Obj == nil {
continue
}
if _, ok := e.Obj.(*extensions.Ingress); !ok {
t.Errorf("expected an Ingress type but %T returned", e.Obj)
}
switch e.Type {
case CreateEvent:
atomic.AddUint64(&add, 1)
case UpdateEvent:
atomic.AddUint64(&upd, 1)
case DeleteEvent:
atomic.AddUint64(&del, 1)
}
}
}(updateCh)
fs := newFS(t)
storer := New(true,
ns.Name,
fmt.Sprintf("%v/config", ns.Name),
fmt.Sprintf("%v/tcp", ns.Name),
fmt.Sprintf("%v/udp", ns.Name),
10*time.Minute,
clientSet,
fs,
updateCh)
storer.Run(stopCh)
ing, err := ensureIngress(&v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "dummy",
Namespace: ns.Name,
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "dummy",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/",
Backend: v1beta1.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
},
},
}, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
// create an invalid ingress (different class)
_, err = ensureIngress(&v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "custom-class",
Namespace: ns.Name,
Annotations: map[string]string{
"kubernetes.io/ingress.class": "something",
},
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "dummy",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/",
Backend: v1beta1.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
},
},
}, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
ni := ing.DeepCopy()
ni.Spec.Rules[0].Host = "update-dummy"
_, err = ensureIngress(ni, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
err = clientSet.ExtensionsV1beta1().
Ingresses(ni.Namespace).
Delete(ni.Name, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
framework.WaitForNoIngressInNamespace(clientSet, ni.Namespace, ni.Name)
if atomic.LoadUint64(&add) != 1 {
t.Errorf("expected 1 event of type Create but %v ocurred", add)
}
if atomic.LoadUint64(&upd) != 1 {
t.Errorf("expected 1 event of type Update but %v ocurred", upd)
}
if atomic.LoadUint64(&del) != 1 {
t.Errorf("expected 1 event of type Delete but %v ocurred", del)
}
close(updateCh)
close(stopCh)
})
t.Run("should not receive events from new secret no referenced from ingress", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
var add uint64
var upd uint64
var del uint64
go func(ch chan Event) {
for {
e, ok := <-ch
if !ok {
return
}
if e.Obj == nil {
continue
}
switch e.Type {
case CreateEvent:
atomic.AddUint64(&add, 1)
case UpdateEvent:
atomic.AddUint64(&upd, 1)
case DeleteEvent:
atomic.AddUint64(&del, 1)
}
}
}(updateCh)
fs := newFS(t)
storer := New(true,
ns.Name,
fmt.Sprintf("%v/config", ns.Name),
fmt.Sprintf("%v/tcp", ns.Name),
fmt.Sprintf("%v/udp", ns.Name),
10*time.Minute,
clientSet,
fs,
updateCh)
storer.Run(stopCh)
secretName := "no-referenced"
_, _, _, err = framework.CreateIngressTLSSecret(clientSet, []string{"foo"}, secretName, ns.Name)
if err != nil {
t.Errorf("unexpected error creating secret: %v", err)
}
time.Sleep(1 * time.Second)
if atomic.LoadUint64(&add) != 0 {
t.Errorf("expected 0 events of type Create but %v ocurred", add)
}
if atomic.LoadUint64(&upd) != 0 {
t.Errorf("expected 0 events of type Update but %v ocurred", upd)
}
if atomic.LoadUint64(&del) != 0 {
t.Errorf("expected 0 events of type Delete but %v ocurred", del)
}
err = clientSet.CoreV1().Secrets(ns.Name).Delete(secretName, &metav1.DeleteOptions{})
if err != nil {
t.Errorf("unexpected error deleting secret: %v", err)
}
time.Sleep(1 * time.Second)
if atomic.LoadUint64(&add) != 0 {
t.Errorf("expected 0 events of type Create but %v ocurred", add)
}
if atomic.LoadUint64(&upd) != 0 {
t.Errorf("expected 0 events of type Update but %v ocurred", upd)
}
if atomic.LoadUint64(&del) != 1 {
t.Errorf("expected 1 events of type Delete but %v ocurred", del)
}
close(updateCh)
close(stopCh)
})
t.Run("should create an ingress with a secret it doesn't exists", func(t *testing.T) {
ns := createNamespace(clientSet, t)
defer deleteNamespace(ns, clientSet, t)
stopCh := make(chan struct{})
updateCh := make(chan Event, 1024)
var add uint64
var upd uint64
var del uint64
go func(ch <-chan Event) {
for {
e, ok := <-ch
if !ok {
return
}
if e.Obj == nil {
continue
}
switch e.Type {
case CreateEvent:
atomic.AddUint64(&add, 1)
case UpdateEvent:
atomic.AddUint64(&upd, 1)
case DeleteEvent:
atomic.AddUint64(&del, 1)
}
}
}(updateCh)
fs := newFS(t)
storer := New(true,
ns.Name,
fmt.Sprintf("%v/config", ns.Name),
fmt.Sprintf("%v/tcp", ns.Name),
fmt.Sprintf("%v/udp", ns.Name),
10*time.Minute,
clientSet,
fs,
updateCh)
storer.Run(stopCh)
name := "ingress-with-secret"
secretHosts := []string{name}
_, err := ensureIngress(&v1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns.Name,
},
Spec: v1beta1.IngressSpec{
TLS: []v1beta1.IngressTLS{
{
Hosts: secretHosts,
SecretName: name,
},
},
Rules: []v1beta1.IngressRule{
{
Host: name,
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/",
Backend: v1beta1.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
},
},
}, clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress: %v", err)
}
err = framework.WaitForIngressInNamespace(clientSet, ns.Name, name)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
}
if atomic.LoadUint64(&add) != 1 {
t.Errorf("expected 1 events of type Create but %v ocurred", add)
}
if atomic.LoadUint64(&upd) != 0 {
t.Errorf("expected 0 events of type Update but %v ocurred", upd)
}
if atomic.LoadUint64(&del) != 0 {
t.Errorf("expected 0 events of type Delete but %v ocurred", del)
}
_, _, _, err = framework.CreateIngressTLSSecret(clientSet, secretHosts, name, ns.Name)
if err != nil {
t.Errorf("unexpected error creating secret: %v", err)
}
t.Run("should exists a secret in the local store and filesystem", func(t *testing.T) {
err := framework.WaitForSecretInNamespace(clientSet, ns.Name, name)
if err != nil {
t.Errorf("unexpected error waiting for secret: %v", err)
}
time.Sleep(30 * time.Second)
pemFile := fmt.Sprintf("%v/%v-%v.pem", file.DefaultSSLDirectory, ns.Name, name)
err = framework.WaitForFileInFS(pemFile, fs)
if err != nil {
t.Errorf("unexpected error waiting for file to exists in the filesystem: %v", err)
}
secretName := fmt.Sprintf("%v/%v", ns.Name, name)
sslCert, err := storer.GetLocalSecret(secretName)
if err != nil {
t.Errorf("unexpected error reading local secret %v: %v", secretName, err)
}
if sslCert == nil {
t.Errorf("expected a secret but none returned")
}
pemSHA := file.SHA1(pemFile)
if sslCert.PemSHA != pemSHA {
t.Errorf("SHA of secret on disk differs from local secret store (%v != %v)", pemSHA, sslCert.PemSHA)
}
})
close(updateCh)
close(stopCh)
})
// test add ingress with secret it doesn't exists and then add secret
// check secret is generated on fs
// check ocsp
// check invalid secret (missing crt)
// check invalid secret (missing key)
// check invalid secret (missing ca)
}
func createNamespace(clientSet *kubernetes.Clientset, t *testing.T) *apiv1.Namespace {
t.Log("creating temporal namespace")
ns, err := framework.CreateKubeNamespace("store-test", clientSet)
if err != nil {
t.Errorf("unexpected error creating ingress client: %v", err)
}
t.Logf("temporal namespace %v created", ns.Name)
return ns
}
func deleteNamespace(ns *apiv1.Namespace, clientSet *kubernetes.Clientset, t *testing.T) {
t.Logf("deleting temporal namespace %v created", ns.Name)
err := framework.DeleteKubeNamespace(clientSet, ns.Name)
if err != nil {
t.Errorf("unexpected error creating ingress client: %v", err)
}
t.Logf("temporal namespace %v deleted", ns.Name)
}
func ensureIngress(ingress *extensions.Ingress, clientSet *kubernetes.Clientset) (*extensions.Ingress, error) {
s, err := clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress)
if err != nil {
if k8sErrors.IsNotFound(err) {
return clientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(ingress)
}
return nil, err
}
return s, nil
}
func newFS(t *testing.T) file.Filesystem {
fs, err := file.NewFakeFS()
if err != nil {
t.Fatalf("unexpected error creating filesystem: %v", err)
}
return fs
}

View file

@ -26,6 +26,7 @@ import (
"github.com/mitchellh/mapstructure"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-nginx/internal/ingress/controller/config"
ing_net "k8s.io/ingress-nginx/internal/net"
)
@ -42,7 +43,7 @@ const (
)
var (
validRedirectCodes = []int{301, 302, 307, 308}
validRedirectCodes = sets.NewInt([]int{301, 302, 307, 308}...)
)
// ReadConfig obtains the configuration defined by the user merged with the defaults.
@ -114,7 +115,7 @@ func ReadConfig(src map[string]string) config.Configuration {
if err != nil {
glog.Warningf("%v is not a valid HTTP code: %v", val, err)
} else {
if intInSlice(j, validRedirectCodes) {
if validRedirectCodes.Has(j) {
redirectCode = j
} else {
glog.Warningf("The code %v is not a valid as HTTP redirect code. Using the default.", val)
@ -175,12 +176,3 @@ func filterErrors(codes []int) []int {
return fa
}
func intInSlice(i int, list []int) bool {
for _, v := range list {
if v == i {
return true
}
}
return false
}

View file

@ -66,12 +66,3 @@ func sysctlFSFileMax() int {
}
return int(rLimit.Max)
}
func intInSlice(i int, list []int) bool {
for _, v := range list {
if v == i {
return true
}
}
return false
}

View file

@ -26,26 +26,6 @@ func (fe *fakeError) Error() string {
return "fakeError"
}
func TestIntInSlice(t *testing.T) {
fooTests := []struct {
i int
list []int
er bool
}{
{1, []int{1, 2}, true},
{3, []int{1, 2}, false},
{1, nil, false},
{0, nil, false},
}
for _, fooTest := range fooTests {
r := intInSlice(fooTest.i, fooTest.list)
if r != fooTest.er {
t.Errorf("returned %t but expected %t for s=%v & list=%v", r, fooTest.er, fooTest.i, fooTest.list)
}
}
}
func TestSysctlFSFileMax(t *testing.T) {
i := sysctlFSFileMax()
if i < 1 {

View file

@ -40,8 +40,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/store"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/task"
)
@ -56,6 +54,11 @@ type Sync interface {
Shutdown()
}
type ingressLister interface {
// ListIngresses returns the list of Ingresses
ListIngresses() []*extensions.Ingress
}
// Config ...
type Config struct {
Client clientset.Interface
@ -68,7 +71,7 @@ type Config struct {
UseNodeInternalIP bool
IngressLister store.IngressLister
IngressLister ingressLister
DefaultIngressClass string
IngressClass string
@ -297,20 +300,14 @@ func sliceToStatus(endpoints []string) []apiv1.LoadBalancerIngress {
// updateStatus changes the status information of Ingress rules
func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
ings := s.IngressLister.List()
ings := s.IngressLister.ListIngresses()
p := pool.NewLimited(10)
defer p.Close()
batch := p.Batch()
for _, cur := range ings {
ing := cur.(*extensions.Ingress)
if !class.IsValid(ing) {
continue
}
for _, ing := range ings {
batch.Queue(runUpdate(ing, newIngressPoint, s.Client))
}

View file

@ -25,10 +25,8 @@ import (
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/store"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/task"
)
@ -212,14 +210,18 @@ func buildExtensionsIngresses() []extensions.Ingress {
}
}
func buildIngressListener() store.IngressLister {
s := cache.NewStore(cache.MetaNamespaceKeyFunc)
s.Add(&extensions.Ingress{
type testIngressLister struct {
}
func (til *testIngressLister) ListIngresses() []*extensions.Ingress {
var ingresses []*extensions.Ingress
ingresses = append(ingresses, &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "foo_ingress_non_01",
Namespace: apiv1.NamespaceDefault,
}})
s.Add(&extensions.Ingress{
ingresses = append(ingresses, &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: "foo_ingress_1",
Namespace: apiv1.NamespaceDefault,
@ -231,7 +233,11 @@ func buildIngressListener() store.IngressLister {
},
})
return store.IngressLister{Store: s}
return ingresses
}
func buildIngressLister() ingressLister {
return &testIngressLister{}
}
func buildStatusSync() statusSync {
@ -247,7 +253,7 @@ func buildStatusSync() statusSync {
Config: Config{
Client: buildSimpleClientSet(),
PublishService: apiv1.NamespaceDefault + "/" + "foo",
IngressLister: buildIngressListener(),
IngressLister: buildIngressLister(),
},
}
}
@ -259,7 +265,7 @@ func TestStatusActions(t *testing.T) {
c := Config{
Client: buildSimpleClientSet(),
PublishService: "",
IngressLister: buildIngressListener(),
IngressLister: buildIngressLister(),
DefaultIngressClass: "nginx",
IngressClass: "",
UpdateStatusOnShutdown: true,

View file

@ -1,113 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
// IngressLister makes a Store that lists Ingress.
type IngressLister struct {
cache.Store
}
// IngressAnnotationsLister makes a Store that lists annotations in Ingress rules.
type IngressAnnotationsLister struct {
cache.Store
}
// SecretLister makes a Store that lists Secrets.
type SecretLister struct {
cache.Store
}
// GetByName searches for a secret in the local secrets Store
func (sl *SecretLister) GetByName(name string) (*apiv1.Secret, error) {
s, exists, err := sl.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("secret %v was not found", name)
}
return s.(*apiv1.Secret), nil
}
// ConfigMapLister makes a Store that lists Configmaps.
type ConfigMapLister struct {
cache.Store
}
// GetByName searches for a configmap in the local configmaps Store
func (cml *ConfigMapLister) GetByName(name string) (*apiv1.ConfigMap, error) {
s, exists, err := cml.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("configmap %v was not found", name)
}
return s.(*apiv1.ConfigMap), nil
}
// ServiceLister makes a Store that lists Services.
type ServiceLister struct {
cache.Store
}
// GetByName searches for a service in the local secrets Store
func (sl *ServiceLister) GetByName(name string) (*apiv1.Service, error) {
s, exists, err := sl.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("service %v was not found", name)
}
return s.(*apiv1.Service), nil
}
// EndpointLister makes a Store that lists Endpoints.
type EndpointLister struct {
cache.Store
}
// GetServiceEndpoints returns the endpoints of a service, matched on service name.
func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (*apiv1.Endpoints, error) {
for _, m := range s.Store.List() {
ep := m.(*apiv1.Endpoints)
if svc.Name == ep.Name && svc.Namespace == ep.Namespace {
return ep, nil
}
}
return nil, fmt.Errorf("could not find endpoints for service: %v", svc.Name)
}
// SSLCertTracker holds a store of referenced Secrets in Ingress rules
type SSLCertTracker struct {
cache.ThreadSafeStore
}
// NewSSLCertTracker creates a new SSLCertTracker store
func NewSSLCertTracker() *SSLCertTracker {
return &SSLCertTracker{
cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
}
}

View file

@ -33,7 +33,6 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/redirect"
"k8s.io/ingress-nginx/internal/ingress/annotations/rewrite"
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/internal/ingress/store"
)
var (
@ -44,17 +43,6 @@ var (
DefaultSSLDirectory = "/ingress-controller/ssl"
)
// StoreLister returns the configured stores for ingresses, services,
// endpoints, secrets and configmaps.
type StoreLister struct {
Ingress store.IngressLister
Service store.ServiceLister
Endpoint store.EndpointLister
Secret store.SecretLister
ConfigMap store.ConfigMapLister
IngressAnnotation store.IngressAnnotationsLister
}
// Configuration holds the definition of all the parts required to describe all
// ingresses reachable by the ingress controller (using a filter by namespace)
type Configuration struct {

View file

@ -21,9 +21,11 @@ import (
"os"
"strings"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
// ParseNameNS parses a string searching a namespace and name
@ -96,3 +98,13 @@ func GetPodDetails(kubeClient clientset.Interface) (*PodInfo, error) {
Labels: pod.GetLabels(),
}, nil
}
// MetaNamespaceKey knows how to make keys for API objects which implement meta.Interface.
func MetaNamespaceKey(obj interface{}) string {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.Warning(err)
}
return key
}

View file

@ -26,10 +26,8 @@ import (
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
"math/big"
"net"
"os"
"strconv"
"time"
@ -47,10 +45,12 @@ var (
)
// AddOrUpdateCertAndKey creates a .pem file wth the cert and the key with the specified name
func AddOrUpdateCertAndKey(name string, cert, key, ca []byte) (*ingress.SSLCert, error) {
func AddOrUpdateCertAndKey(name string, cert, key, ca []byte,
fs file.Filesystem) (*ingress.SSLCert, error) {
pemName := fmt.Sprintf("%v.pem", name)
pemFileName := fmt.Sprintf("%v/%v", ingress.DefaultSSLDirectory, pemName)
tempPemFile, err := ioutil.TempFile(ingress.DefaultSSLDirectory, pemName)
pemFileName := fmt.Sprintf("%v/%v", file.DefaultSSLDirectory, pemName)
tempPemFile, err := fs.TempFile(file.DefaultSSLDirectory, pemName)
if err != nil {
return nil, fmt.Errorf("could not create temp pem file %v: %v", pemFileName, err)
@ -74,34 +74,30 @@ func AddOrUpdateCertAndKey(name string, cert, key, ca []byte) (*ingress.SSLCert,
if err != nil {
return nil, fmt.Errorf("could not close temp pem file %v: %v", tempPemFile.Name(), err)
}
defer fs.RemoveAll(tempPemFile.Name())
pemCerts, err := ioutil.ReadFile(tempPemFile.Name())
pemCerts, err := fs.ReadFile(tempPemFile.Name())
if err != nil {
_ = os.Remove(tempPemFile.Name())
return nil, err
}
pemBlock, _ := pem.Decode(pemCerts)
if pemBlock == nil {
_ = os.Remove(tempPemFile.Name())
return nil, fmt.Errorf("no valid PEM formatted block found")
}
// If the file does not start with 'BEGIN CERTIFICATE' it's invalid and must not be used.
if pemBlock.Type != "CERTIFICATE" {
_ = os.Remove(tempPemFile.Name())
return nil, fmt.Errorf("certificate %v contains invalid data, and must be created with 'kubectl create secret tls'", name)
}
pemCert, err := x509.ParseCertificate(pemBlock.Bytes)
if err != nil {
_ = os.Remove(tempPemFile.Name())
return nil, err
}
//Ensure that certificate and private key have a matching public key
if _, err := tls.X509KeyPair(cert, key); err != nil {
_ = os.Remove(tempPemFile.Name())
return nil, err
}
@ -129,7 +125,7 @@ func AddOrUpdateCertAndKey(name string, cert, key, ca []byte) (*ingress.SSLCert,
}
}
err = os.Rename(tempPemFile.Name(), pemFileName)
err = fs.Rename(tempPemFile.Name(), pemFileName)
if err != nil {
return nil, fmt.Errorf("could not move temp pem file %v to destination %v: %v", tempPemFile.Name(), pemFileName, err)
}
@ -147,18 +143,24 @@ func AddOrUpdateCertAndKey(name string, cert, key, ca []byte) (*ingress.SSLCert,
return nil, errors.New(oe)
}
caFile, err := os.OpenFile(pemFileName, os.O_RDWR|os.O_APPEND, 0600)
caData, err := fs.ReadFile(pemFileName)
if err != nil {
return nil, fmt.Errorf("could not open file %v for writing additional CA chains: %v", pemFileName, err)
}
defer caFile.Close()
caFile, err := fs.Create(pemFileName)
_, err = caFile.Write(caData)
if err != nil {
return nil, fmt.Errorf("could not append CA to cert file %v: %v", pemFileName, err)
}
_, err = caFile.Write([]byte("\n"))
if err != nil {
return nil, fmt.Errorf("could not append CA to cert file %v: %v", pemFileName, err)
}
caFile.Write(ca)
caFile.Write([]byte("\n"))
defer caFile.Close()
return &ingress.SSLCert{
Certificate: pemCert,
@ -249,10 +251,10 @@ func parseSANExtension(value []byte) (dnsNames, emailAddresses []string, ipAddre
// AddCertAuth creates a .pem file with the specified CAs to be used in Cert Authentication
// If it's already exists, it's clobbered.
func AddCertAuth(name string, ca []byte) (*ingress.SSLCert, error) {
func AddCertAuth(name string, ca []byte, fs file.Filesystem) (*ingress.SSLCert, error) {
caName := fmt.Sprintf("ca-%v.pem", name)
caFileName := fmt.Sprintf("%v/%v", ingress.DefaultSSLDirectory, caName)
caFileName := fmt.Sprintf("%v/%v", file.DefaultSSLDirectory, caName)
pemCABlock, _ := pem.Decode(ca)
if pemCABlock == nil {
@ -268,7 +270,13 @@ func AddCertAuth(name string, ca []byte) (*ingress.SSLCert, error) {
return nil, err
}
err = ioutil.WriteFile(caFileName, ca, 0644)
caFile, err := fs.Create(caFileName)
if err != nil {
return nil, fmt.Errorf("could not write CA file %v: %v", caFileName, err)
}
defer caFile.Close()
_, err = caFile.Write(ca)
if err != nil {
return nil, fmt.Errorf("could not write CA file %v: %v", caFileName, err)
}
@ -282,11 +290,11 @@ func AddCertAuth(name string, ca []byte) (*ingress.SSLCert, error) {
}
// AddOrUpdateDHParam creates a dh parameters file with the specified name
func AddOrUpdateDHParam(name string, dh []byte) (string, error) {
func AddOrUpdateDHParam(name string, dh []byte, fs file.Filesystem) (string, error) {
pemName := fmt.Sprintf("%v.pem", name)
pemFileName := fmt.Sprintf("%v/%v", ingress.DefaultSSLDirectory, pemName)
pemFileName := fmt.Sprintf("%v/%v", file.DefaultSSLDirectory, pemName)
tempPemFile, err := ioutil.TempFile(ingress.DefaultSSLDirectory, pemName)
tempPemFile, err := fs.TempFile(file.DefaultSSLDirectory, pemName)
glog.V(3).Infof("Creating temp file %v for DH param: %v", tempPemFile.Name(), pemName)
if err != nil {
@ -303,25 +311,24 @@ func AddOrUpdateDHParam(name string, dh []byte) (string, error) {
return "", fmt.Errorf("could not close temp pem file %v: %v", tempPemFile.Name(), err)
}
pemCerts, err := ioutil.ReadFile(tempPemFile.Name())
defer fs.RemoveAll(tempPemFile.Name())
pemCerts, err := fs.ReadFile(tempPemFile.Name())
if err != nil {
_ = os.Remove(tempPemFile.Name())
return "", err
}
pemBlock, _ := pem.Decode(pemCerts)
if pemBlock == nil {
_ = os.Remove(tempPemFile.Name())
return "", fmt.Errorf("no valid PEM formatted block found")
}
// If the file does not start with 'BEGIN DH PARAMETERS' it's invalid and must not be used.
if pemBlock.Type != "DH PARAMETERS" {
_ = os.Remove(tempPemFile.Name())
return "", fmt.Errorf("certificate %v contains invalid data", name)
}
err = os.Rename(tempPemFile.Name(), pemFileName)
err = fs.Rename(tempPemFile.Name(), pemFileName)
if err != nil {
return "", fmt.Errorf("could not move temp pem file %v to destination %v: %v", tempPemFile.Name(), pemFileName, err)
}
@ -382,13 +389,8 @@ func GetFakeSSLCert() ([]byte, []byte) {
// FullChainCert checks if a certificate file contains issues in the intermediate CA chain
// Returns a new certificate with the intermediate certificates.
// If the certificate does not contains issues with the chain it return an empty byte array
func FullChainCert(in string) ([]byte, error) {
inputFile, err := os.Open(in)
if err != nil {
return nil, err
}
data, err := ioutil.ReadAll(inputFile)
func FullChainCert(in string, fs file.Filesystem) ([]byte, error) {
data, err := fs.ReadFile(in)
if err != nil {
return nil, err
}

View file

@ -19,14 +19,13 @@ package ssl
import (
"crypto/x509"
"fmt"
"io/ioutil"
"testing"
"time"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/cert/triple"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/file"
)
// generateRSACerts generates a self signed certificate using a self generated ca
@ -57,11 +56,7 @@ func generateRSACerts(host string) (*triple.KeyPair, *triple.KeyPair, error) {
}
func TestAddOrUpdateCertAndKey(t *testing.T) {
td, err := ioutil.TempDir("", "ssl")
if err != nil {
t.Fatalf("Unexpected error creating temporal directory: %v", err)
}
ingress.DefaultSSLDirectory = td
fs := newFS(t)
cert, _, err := generateRSACerts("echoheaders")
if err != nil {
@ -73,7 +68,7 @@ func TestAddOrUpdateCertAndKey(t *testing.T) {
c := certutil.EncodeCertPEM(cert.Cert)
k := certutil.EncodePrivateKeyPEM(cert.Key)
ngxCert, err := AddOrUpdateCertAndKey(name, c, k, []byte{})
ngxCert, err := AddOrUpdateCertAndKey(name, c, k, []byte{}, fs)
if err != nil {
t.Fatalf("unexpected error checking SSL certificate: %v", err)
}
@ -92,11 +87,7 @@ func TestAddOrUpdateCertAndKey(t *testing.T) {
}
func TestCACert(t *testing.T) {
td, err := ioutil.TempDir("", "ssl")
if err != nil {
t.Fatalf("Unexpected error creating temporal directory: %v", err)
}
ingress.DefaultSSLDirectory = td
fs := newFS(t)
cert, CA, err := generateRSACerts("echoheaders")
if err != nil {
@ -109,7 +100,7 @@ func TestCACert(t *testing.T) {
k := certutil.EncodePrivateKeyPEM(cert.Key)
ca := certutil.EncodeCertPEM(CA.Cert)
ngxCert, err := AddOrUpdateCertAndKey(name, c, k, ca)
ngxCert, err := AddOrUpdateCertAndKey(name, c, k, ca, fs)
if err != nil {
t.Fatalf("unexpected error checking SSL certificate: %v", err)
}
@ -129,11 +120,10 @@ func TestGetFakeSSLCert(t *testing.T) {
}
func TestAddCertAuth(t *testing.T) {
td, err := ioutil.TempDir("", "ssl")
fs, err := file.NewFakeFS()
if err != nil {
t.Fatalf("Unexpected error creating temporal directory: %v", err)
t.Fatalf("unexpected error creating filesystem: %v", err)
}
ingress.DefaultSSLDirectory = td
cn := "demo-ca"
_, ca, err := generateRSACerts(cn)
@ -141,7 +131,7 @@ func TestAddCertAuth(t *testing.T) {
t.Fatalf("unexpected error creating SSL certificate: %v", err)
}
c := certutil.EncodeCertPEM(ca.Cert)
ic, err := AddCertAuth(cn, c)
ic, err := AddCertAuth(cn, c, fs)
if err != nil {
t.Fatalf("unexpected error creating SSL certificate: %v", err)
}
@ -149,3 +139,11 @@ func TestAddCertAuth(t *testing.T) {
t.Fatalf("expected a valid CA file name")
}
}
func newFS(t *testing.T) file.Filesystem {
fs, err := file.NewFakeFS()
if err != nil {
t.Fatalf("unexpected error creating filesystem: %v", err)
}
return fs
}

View file

@ -32,6 +32,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/ingress-nginx/internal/file"
)
const (
@ -97,9 +98,10 @@ var RunID = uuid.NewUUID()
// CreateKubeNamespace creates a new namespace in the cluster
func CreateKubeNamespace(baseName string, c kubernetes.Interface) (*v1.Namespace, error) {
ts := time.Now().UnixNano()
ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("e2e-tests-%v-", baseName),
GenerateName: fmt.Sprintf("e2e-tests-%v-%v-", baseName, ts),
},
}
// Be robust about making the namespace creation call.
@ -207,6 +209,30 @@ func secretInNamespace(c kubernetes.Interface, namespace, name string) wait.Cond
}
}
// WaitForFileInFS waits a default amount of time for the specified file is present in the filesystem
func WaitForFileInFS(file string, fs file.Filesystem) error {
return wait.PollImmediate(1*time.Second, time.Minute*2, fileInFS(file, fs))
}
func fileInFS(file string, fs file.Filesystem) wait.ConditionFunc {
return func() (bool, error) {
stat, err := fs.Stat(file)
if err != nil {
return false, err
}
if stat == nil {
return false, fmt.Errorf("file %v does not exists", file)
}
if stat.Size() > 0 {
return true, nil
}
return false, fmt.Errorf("the file %v exists but it is empty", file)
}
}
// WaitForNoIngressInNamespace waits until there is no ingress object in a particular namespace
func WaitForNoIngressInNamespace(c kubernetes.Interface, namespace, name string) error {
return wait.PollImmediate(1*time.Second, time.Minute*2, noIngressInNamespace(c, namespace, name))