Merge pull request #1387 from aledbf/profile
Improve resource usage in nginx controller
This commit is contained in:
commit
8584b25432
17 changed files with 388 additions and 794 deletions
|
@ -124,7 +124,7 @@ type NGINXController struct {
|
|||
|
||||
configmap *apiv1.ConfigMap
|
||||
|
||||
storeLister ingress.StoreLister
|
||||
storeLister *ingress.StoreLister
|
||||
|
||||
binary string
|
||||
resolver []net.IP
|
||||
|
@ -463,7 +463,7 @@ func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) {
|
|||
}
|
||||
|
||||
// SetListers sets the configured store listers in the generic ingress controller
|
||||
func (n *NGINXController) SetListers(lister ingress.StoreLister) {
|
||||
func (n *NGINXController) SetListers(lister *ingress.StoreLister) {
|
||||
n.storeLister = lister
|
||||
}
|
||||
|
||||
|
|
|
@ -49,11 +49,9 @@ const (
|
|||
|
||||
// Template ...
|
||||
type Template struct {
|
||||
tmpl *text_template.Template
|
||||
fw watch.FileWatcher
|
||||
s int
|
||||
tmplBuf *bytes.Buffer
|
||||
outCmdBuf *bytes.Buffer
|
||||
tmpl *text_template.Template
|
||||
fw watch.FileWatcher
|
||||
s int
|
||||
}
|
||||
|
||||
//NewTemplate returns a new Template instance or an
|
||||
|
@ -69,11 +67,9 @@ func NewTemplate(file string, onChange func()) (*Template, error) {
|
|||
}
|
||||
|
||||
return &Template{
|
||||
tmpl: tmpl,
|
||||
fw: fw,
|
||||
s: defBufferSize,
|
||||
tmplBuf: bytes.NewBuffer(make([]byte, 0, defBufferSize)),
|
||||
outCmdBuf: bytes.NewBuffer(make([]byte, 0, defBufferSize)),
|
||||
tmpl: tmpl,
|
||||
fw: fw,
|
||||
s: defBufferSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -85,15 +81,13 @@ func (t *Template) Close() {
|
|||
// Write populates a buffer using a template with NGINX configuration
|
||||
// and the servers and upstreams created by Ingress rules
|
||||
func (t *Template) Write(conf config.TemplateConfig) ([]byte, error) {
|
||||
defer t.tmplBuf.Reset()
|
||||
defer t.outCmdBuf.Reset()
|
||||
tmplBuf := bytes.NewBuffer(make([]byte, 0, t.s))
|
||||
outCmdBuf := bytes.NewBuffer(make([]byte, 0, t.s))
|
||||
|
||||
defer func() {
|
||||
if t.s < t.tmplBuf.Cap() {
|
||||
glog.V(2).Infof("adjusting template buffer size from %v to %v", t.s, t.tmplBuf.Cap())
|
||||
t.s = t.tmplBuf.Cap()
|
||||
t.tmplBuf = bytes.NewBuffer(make([]byte, 0, t.tmplBuf.Cap()))
|
||||
t.outCmdBuf = bytes.NewBuffer(make([]byte, 0, t.outCmdBuf.Cap()))
|
||||
if t.s < tmplBuf.Cap() {
|
||||
glog.V(2).Infof("adjusting template buffer size from %v to %v", t.s, tmplBuf.Cap())
|
||||
t.s = tmplBuf.Cap()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -105,7 +99,7 @@ func (t *Template) Write(conf config.TemplateConfig) ([]byte, error) {
|
|||
glog.Infof("NGINX configuration: %v", string(b))
|
||||
}
|
||||
|
||||
err := t.tmpl.Execute(t.tmplBuf, conf)
|
||||
err := t.tmpl.Execute(tmplBuf, conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -113,14 +107,14 @@ func (t *Template) Write(conf config.TemplateConfig) ([]byte, error) {
|
|||
// squeezes multiple adjacent empty lines to be single
|
||||
// spaced this is to avoid the use of regular expressions
|
||||
cmd := exec.Command("/ingress-controller/clean-nginx-conf.sh")
|
||||
cmd.Stdin = t.tmplBuf
|
||||
cmd.Stdout = t.outCmdBuf
|
||||
cmd.Stdin = tmplBuf
|
||||
cmd.Stdout = outCmdBuf
|
||||
if err := cmd.Run(); err != nil {
|
||||
glog.Warningf("unexpected error cleaning template: %v", err)
|
||||
return t.tmplBuf.Bytes(), nil
|
||||
return tmplBuf.Bytes(), nil
|
||||
}
|
||||
|
||||
return t.outCmdBuf.Bytes(), nil
|
||||
return outCmdBuf.Bytes(), nil
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
|
@ -348,6 +348,7 @@ http {
|
|||
{{ end }}
|
||||
|
||||
{{ range $index, $server := $servers }}
|
||||
|
||||
server {
|
||||
server_name {{ $server.Hostname }};
|
||||
{{ template "SERVER" serverConfig $all $server }}
|
||||
|
@ -355,6 +356,7 @@ http {
|
|||
|
||||
{{ template "CUSTOM_ERRORS" $all }}
|
||||
}
|
||||
|
||||
{{ if $server.Alias }}
|
||||
server {
|
||||
server_name {{ $server.Alias }};
|
||||
|
|
|
@ -1,12 +0,0 @@
|
|||
package base64
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Encode encodes a string to base64 removing the equals character
|
||||
func Encode(s string) string {
|
||||
str := base64.URLEncoding.EncodeToString([]byte(s))
|
||||
return strings.Replace(str, "=", "", -1)
|
||||
}
|
|
@ -17,13 +17,13 @@ limitations under the License.
|
|||
package ratelimit
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
|
||||
"k8s.io/ingress/core/pkg/base64"
|
||||
"k8s.io/ingress/core/pkg/ingress/annotations/parser"
|
||||
"k8s.io/ingress/core/pkg/ingress/resolver"
|
||||
"k8s.io/ingress/core/pkg/net"
|
||||
|
@ -218,7 +218,7 @@ func (a ratelimit) Parse(ing *extensions.Ingress) (interface{}, error) {
|
|||
LimitRate: lr,
|
||||
LimitRateAfter: lra,
|
||||
Name: zoneName,
|
||||
ID: base64.Encode(zoneName),
|
||||
ID: encode(zoneName),
|
||||
Whitelist: cidrs,
|
||||
}, nil
|
||||
}
|
||||
|
@ -248,3 +248,8 @@ func parseCIDRs(s string) ([]string, error) {
|
|||
|
||||
return cidrs, nil
|
||||
}
|
||||
|
||||
func encode(s string) string {
|
||||
str := base64.URLEncoding.EncodeToString([]byte(s))
|
||||
return strings.Replace(str, "=", "", -1)
|
||||
}
|
||||
|
|
|
@ -67,15 +67,11 @@ func (ic *GenericController) syncSecret(key string) {
|
|||
// getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
|
||||
// It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
|
||||
func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
|
||||
secretInterface, exists, err := ic.secrLister.Store.GetByKey(secretName)
|
||||
secret, err := ic.listers.Secret.GetByName(secretName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err)
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("secret named %v does not exist", secretName)
|
||||
}
|
||||
|
||||
secret := secretInterface.(*apiv1.Secret)
|
||||
cert, okcert := secret.Data[apiv1.TLSCertKey]
|
||||
key, okkey := secret.Data[apiv1.TLSPrivateKeyKey]
|
||||
|
||||
|
|
|
@ -86,6 +86,13 @@ func buildSecrListerForBackendSSL() store.SecretLister {
|
|||
return secrLister
|
||||
}
|
||||
|
||||
func buildListers() *ingress.StoreLister {
|
||||
sl := &ingress.StoreLister{}
|
||||
sl.Ingress.Store = buildIngListenerForBackendSSL()
|
||||
sl.Secret.Store = buildSecrListerForBackendSSL()
|
||||
return sl
|
||||
}
|
||||
|
||||
func buildControllerForBackendSSL() cache_client.Controller {
|
||||
cfg := &cache_client.Config{
|
||||
Queue: &MockQueue{Synced: true},
|
||||
|
@ -99,8 +106,7 @@ func buildGenericControllerForBackendSSL() *GenericController {
|
|||
cfg: &Configuration{
|
||||
Client: buildSimpleClientSetForBackendSSL(),
|
||||
},
|
||||
ingLister: buildIngListenerForBackendSSL(),
|
||||
secrLister: buildSecrListerForBackendSSL(),
|
||||
listers: buildListers(),
|
||||
|
||||
ingController: buildControllerForBackendSSL(),
|
||||
endpController: buildControllerForBackendSSL(),
|
||||
|
@ -162,7 +168,7 @@ func TestSyncSecret(t *testing.T) {
|
|||
secret.SetNamespace("default")
|
||||
secret.SetName("foo_secret")
|
||||
secret.Data = foo.Data
|
||||
ic.secrLister.Add(secret)
|
||||
ic.listers.Secret.Add(secret)
|
||||
|
||||
key := "default/foo_secret"
|
||||
// for add
|
||||
|
@ -209,7 +215,7 @@ func TestGetPemCertificate(t *testing.T) {
|
|||
ic := buildGenericControllerForBackendSSL()
|
||||
secret := buildSecretForBackendSSL()
|
||||
secret.Data = foo.Data
|
||||
ic.secrLister.Add(secret)
|
||||
ic.listers.Secret.Add(secret)
|
||||
sslCert, err := ic.getPemCertificate(foo.secretName)
|
||||
|
||||
if foo.eErr {
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
apiv1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
@ -39,7 +38,6 @@ import (
|
|||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
fcache "k8s.io/client-go/tools/cache/testing"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
|
||||
|
@ -51,7 +49,6 @@ import (
|
|||
"k8s.io/ingress/core/pkg/ingress/defaults"
|
||||
"k8s.io/ingress/core/pkg/ingress/resolver"
|
||||
"k8s.io/ingress/core/pkg/ingress/status"
|
||||
"k8s.io/ingress/core/pkg/ingress/store"
|
||||
"k8s.io/ingress/core/pkg/k8s"
|
||||
"k8s.io/ingress/core/pkg/net/ssl"
|
||||
local_strings "k8s.io/ingress/core/pkg/strings"
|
||||
|
@ -87,12 +84,7 @@ type GenericController struct {
|
|||
secrController cache.Controller
|
||||
mapController cache.Controller
|
||||
|
||||
ingLister store.IngressLister
|
||||
svcLister store.ServiceLister
|
||||
nodeLister store.NodeLister
|
||||
endpLister store.EndpointLister
|
||||
secrLister store.SecretLister
|
||||
mapLister store.ConfigMapLister
|
||||
listers *ingress.StoreLister
|
||||
|
||||
annotations annotationExtractor
|
||||
|
||||
|
@ -119,6 +111,8 @@ type GenericController struct {
|
|||
runningConfig *ingress.Configuration
|
||||
|
||||
forceReload bool
|
||||
|
||||
initialSyncDone bool
|
||||
}
|
||||
|
||||
// Configuration contains all the settings required by an Ingress controller
|
||||
|
@ -171,177 +165,18 @@ func newIngressController(config *Configuration) *GenericController {
|
|||
Component: "ingress-controller",
|
||||
}),
|
||||
sslCertTracker: newSSLCertTracker(),
|
||||
listers: &ingress.StoreLister{},
|
||||
}
|
||||
|
||||
ic.syncQueue = task.NewTaskQueue(ic.syncIngress)
|
||||
|
||||
// from here to the end of the method all the code is just boilerplate
|
||||
// required to watch Ingress, Secrets, ConfigMaps and Endoints.
|
||||
// This is used to detect new content, updates or removals and act accordingly
|
||||
ingEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
addIng := obj.(*extensions.Ingress)
|
||||
if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
|
||||
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
|
||||
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
|
||||
return
|
||||
}
|
||||
ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
|
||||
ic.syncQueue.Enqueue(obj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
delIng, ok := obj.(*extensions.Ingress)
|
||||
if !ok {
|
||||
// If we reached here it means the ingress was deleted but its final state is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
delIng, ok = tombstone.Obj.(*extensions.Ingress)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
|
||||
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
|
||||
return
|
||||
}
|
||||
ic.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
|
||||
ic.syncQueue.Enqueue(obj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldIng := old.(*extensions.Ingress)
|
||||
curIng := cur.(*extensions.Ingress)
|
||||
validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
|
||||
validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
|
||||
if !validOld && validCur {
|
||||
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
|
||||
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
} else if validOld && !validCur {
|
||||
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
|
||||
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
} else if validCur && !reflect.DeepEqual(old, cur) {
|
||||
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
}
|
||||
|
||||
ic.syncQueue.Enqueue(cur)
|
||||
},
|
||||
}
|
||||
|
||||
secrEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
sec := cur.(*apiv1.Secret)
|
||||
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
||||
ic.syncSecret(key)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
sec, ok := obj.(*apiv1.Secret)
|
||||
if !ok {
|
||||
// If we reached here it means the secret was deleted but its final state is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
sec, ok = tombstone.Obj.(*apiv1.Secret)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
||||
ic.sslCertTracker.DeleteAll(key)
|
||||
},
|
||||
}
|
||||
|
||||
eventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
ic.syncQueue.Enqueue(obj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
ic.syncQueue.Enqueue(obj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oep := old.(*apiv1.Endpoints)
|
||||
ocur := cur.(*apiv1.Endpoints)
|
||||
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
|
||||
ic.syncQueue.Enqueue(cur)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
mapEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
upCmap := obj.(*apiv1.ConfigMap)
|
||||
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
|
||||
if mapKey == ic.cfg.ConfigMapName {
|
||||
glog.V(2).Infof("adding configmap %v to backend", mapKey)
|
||||
ic.cfg.Backend.SetConfig(upCmap)
|
||||
ic.forceReload = true
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
upCmap := cur.(*apiv1.ConfigMap)
|
||||
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
|
||||
if mapKey == ic.cfg.ConfigMapName {
|
||||
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
|
||||
ic.cfg.Backend.SetConfig(upCmap)
|
||||
ic.forceReload = true
|
||||
}
|
||||
// updates to configuration configmaps can trigger an update
|
||||
if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName {
|
||||
ic.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
|
||||
ic.syncQueue.Enqueue(cur)
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
watchNs := apiv1.NamespaceAll
|
||||
if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != apiv1.NamespaceAll {
|
||||
watchNs = ic.cfg.Namespace
|
||||
}
|
||||
|
||||
ic.ingLister.Store, ic.ingController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
|
||||
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)
|
||||
|
||||
ic.endpLister.Store, ic.endpController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
|
||||
&apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
|
||||
|
||||
ic.secrLister.Store, ic.secrController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
|
||||
&apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
|
||||
|
||||
ic.mapLister.Store, ic.mapController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
|
||||
&apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
|
||||
|
||||
ic.svcLister.Store, ic.svcController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
|
||||
&apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||
|
||||
var nodeListerWatcher cache.ListerWatcher
|
||||
if config.DisableNodeList {
|
||||
nodeListerWatcher = fcache.NewFakeControllerSource()
|
||||
} else {
|
||||
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
|
||||
}
|
||||
ic.nodeLister.Store, ic.nodeController = cache.NewInformer(
|
||||
nodeListerWatcher,
|
||||
&apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||
ic.createListers(config.DisableNodeList)
|
||||
|
||||
if config.UpdateStatus {
|
||||
ic.syncStatus = status.NewStatusSyncer(status.Config{
|
||||
Client: config.Client,
|
||||
PublishService: ic.cfg.PublishService,
|
||||
IngressLister: ic.ingLister,
|
||||
IngressLister: ic.listers.Ingress,
|
||||
ElectionID: config.ElectionID,
|
||||
IngressClass: config.IngressClass,
|
||||
DefaultIngressClass: config.DefaultIngressClass,
|
||||
|
@ -353,14 +188,7 @@ func newIngressController(config *Configuration) *GenericController {
|
|||
}
|
||||
ic.annotations = newAnnotationExtractor(ic)
|
||||
|
||||
ic.cfg.Backend.SetListers(ingress.StoreLister{
|
||||
Ingress: ic.ingLister,
|
||||
Service: ic.svcLister,
|
||||
Node: ic.nodeLister,
|
||||
Endpoint: ic.endpLister,
|
||||
Secret: ic.secrLister,
|
||||
ConfigMap: ic.mapLister,
|
||||
})
|
||||
ic.cfg.Backend.SetListers(ic.listers)
|
||||
|
||||
cloner.RegisterDeepCopyFunc(ingress.GetGeneratedDeepCopyFuncs)
|
||||
|
||||
|
@ -384,7 +212,7 @@ func (ic GenericController) GetDefaultBackend() defaults.Backend {
|
|||
|
||||
// GetPublishService returns the configured service used to set ingress status
|
||||
func (ic GenericController) GetPublishService() *apiv1.Service {
|
||||
s, err := ic.GetService(ic.cfg.PublishService)
|
||||
s, err := ic.listers.Service.GetByName(ic.cfg.PublishService)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -399,37 +227,12 @@ func (ic GenericController) GetRecorder() record.EventRecorder {
|
|||
|
||||
// GetSecret searches for a secret in the local secrets Store
|
||||
func (ic GenericController) GetSecret(name string) (*apiv1.Secret, error) {
|
||||
s, exists, err := ic.secrLister.Store.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("secret %v was not found", name)
|
||||
}
|
||||
return s.(*apiv1.Secret), nil
|
||||
return ic.listers.Secret.GetByName(name)
|
||||
}
|
||||
|
||||
// GetService searches for a service in the local secrets Store
|
||||
func (ic GenericController) GetService(name string) (*apiv1.Service, error) {
|
||||
s, exists, err := ic.svcLister.Store.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("service %v was not found", name)
|
||||
}
|
||||
return s.(*apiv1.Service), nil
|
||||
}
|
||||
|
||||
func (ic *GenericController) getConfigMap(ns, name string) (*apiv1.ConfigMap, error) {
|
||||
s, exists, err := ic.mapLister.Store.GetByKey(fmt.Sprintf("%v/%v", ns, name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("configmap %v was not found", name)
|
||||
}
|
||||
return s.(*apiv1.ConfigMap), nil
|
||||
return ic.listers.Service.GetByName(name)
|
||||
}
|
||||
|
||||
// sync collects all the pieces required to assemble the configuration file and
|
||||
|
@ -443,7 +246,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
|
|||
}
|
||||
|
||||
if name, ok := key.(string); ok {
|
||||
if obj, exists, _ := ic.ingLister.GetByKey(name); exists {
|
||||
if obj, exists, _ := ic.listers.Ingress.GetByKey(name); exists {
|
||||
ing := obj.(*extensions.Ingress)
|
||||
ic.readSecrets(ing)
|
||||
}
|
||||
|
@ -511,15 +314,15 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
|
|||
return []ingress.L4Service{}
|
||||
}
|
||||
|
||||
ns, name, err := k8s.ParseNameNS(configmapName)
|
||||
_, _, err := k8s.ParseNameNS(configmapName)
|
||||
if err != nil {
|
||||
glog.Errorf("unexpected error reading configmap %v: %v", name, err)
|
||||
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
|
||||
return []ingress.L4Service{}
|
||||
}
|
||||
|
||||
configmap, err := ic.getConfigMap(ns, name)
|
||||
configmap, err := ic.listers.ConfigMap.GetByName(configmapName)
|
||||
if err != nil {
|
||||
glog.Errorf("unexpected error reading configmap %v: %v", name, err)
|
||||
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
|
||||
return []ingress.L4Service{}
|
||||
}
|
||||
|
||||
|
@ -562,7 +365,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
|
|||
continue
|
||||
}
|
||||
|
||||
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(nsName)
|
||||
svcObj, svcExists, err := ic.listers.Service.GetByKey(nsName)
|
||||
if err != nil {
|
||||
glog.Warningf("error getting service %v: %v", nsName, err)
|
||||
continue
|
||||
|
@ -578,7 +381,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
|
|||
var endps []ingress.Endpoint
|
||||
targetPort, err := strconv.Atoi(svcPort)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("searching service %v/%v endpoints using the name '%v'", svcNs, svcName, svcPort)
|
||||
glog.V(3).Infof("searching service %v endpoints using the name '%v'", svcNs, svcName, svcPort)
|
||||
for _, sp := range svc.Spec.Ports {
|
||||
if sp.Name == svcPort {
|
||||
if sp.Protocol == proto {
|
||||
|
@ -631,7 +434,7 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Backend {
|
|||
Name: defUpstreamName,
|
||||
}
|
||||
svcKey := ic.cfg.DefaultService
|
||||
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
|
||||
svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey)
|
||||
if err != nil {
|
||||
glog.Warningf("unexpected error searching the default backend %v: %v", ic.cfg.DefaultService, err)
|
||||
upstream.Endpoints = append(upstream.Endpoints, ic.cfg.Backend.DefaultEndpoint())
|
||||
|
@ -656,36 +459,19 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Backend {
|
|||
return upstream
|
||||
}
|
||||
|
||||
type ingressByRevision []interface{}
|
||||
|
||||
func (c ingressByRevision) Len() int { return len(c) }
|
||||
func (c ingressByRevision) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
||||
func (c ingressByRevision) Less(i, j int) bool {
|
||||
ir := c[i].(*extensions.Ingress).ResourceVersion
|
||||
jr := c[j].(*extensions.Ingress).ResourceVersion
|
||||
return ir < jr
|
||||
}
|
||||
|
||||
// getBackendServers returns a list of Upstream and Server to be used by the backend
|
||||
// An upstream can be used in multiple servers if the namespace, service name and port are the same
|
||||
func (ic *GenericController) getBackendServers() ([]*ingress.Backend, []*ingress.Server) {
|
||||
ings := ic.ingLister.Store.List()
|
||||
sort.Sort(ingressByRevision(ings))
|
||||
ings := ic.listers.Ingress.List()
|
||||
sort.Slice(ings, func(i, j int) bool {
|
||||
ir := ings[i].(*extensions.Ingress).ResourceVersion
|
||||
jr := ings[j].(*extensions.Ingress).ResourceVersion
|
||||
return ir < jr
|
||||
})
|
||||
|
||||
upstreams := ic.createUpstreams(ings)
|
||||
servers := ic.createServers(ings, upstreams)
|
||||
|
||||
// If a server has a hostname equivalent to a pre-existing alias, then we
|
||||
// remove the alias to avoid conflicts.
|
||||
for _, server := range servers {
|
||||
for j, alias := range servers {
|
||||
if server.Hostname == alias.Alias {
|
||||
glog.Warningf("There is a conflict with server hostname '%v' and alias '%v' (in server %v). Removing alias to avoid conflicts.",
|
||||
server.Hostname, alias.Hostname, alias.Hostname)
|
||||
servers[j].Alias = ""
|
||||
}
|
||||
}
|
||||
}
|
||||
du := ic.getDefaultUpstream()
|
||||
upstreams := ic.createUpstreams(ings, du)
|
||||
servers := ic.createServers(ings, upstreams, du)
|
||||
|
||||
for _, ingIf := range ings {
|
||||
ing := ingIf.(*extensions.Ingress)
|
||||
|
@ -860,15 +646,22 @@ func (ic *GenericController) getBackendServers() ([]*ingress.Backend, []*ingress
|
|||
}
|
||||
|
||||
if ic.cfg.SortBackends {
|
||||
sort.Sort(ingress.BackendByNameServers(aUpstreams))
|
||||
sort.Slice(aUpstreams, func(a, b int) bool {
|
||||
return aUpstreams[a].Name < aUpstreams[b].Name
|
||||
})
|
||||
}
|
||||
|
||||
aServers := make([]*ingress.Server, 0, len(servers))
|
||||
for _, value := range servers {
|
||||
sort.Sort(ingress.LocationByPath(value.Locations))
|
||||
sort.Slice(value.Locations, func(i, j int) bool {
|
||||
return value.Locations[i].Path > value.Locations[j].Path
|
||||
})
|
||||
aServers = append(aServers, value)
|
||||
}
|
||||
sort.Sort(ingress.ServerByName(aServers))
|
||||
|
||||
sort.Slice(aServers, func(i, j int) bool {
|
||||
return aServers[i].Hostname < aServers[j].Hostname
|
||||
})
|
||||
|
||||
return aUpstreams, aServers
|
||||
}
|
||||
|
@ -879,7 +672,7 @@ func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.Aut
|
|||
ic.syncSecret(secretName)
|
||||
}
|
||||
|
||||
_, err := ic.GetSecret(secretName)
|
||||
_, err := ic.listers.Secret.GetByName(secretName)
|
||||
if err != nil {
|
||||
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -898,9 +691,9 @@ func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.Aut
|
|||
|
||||
// createUpstreams creates the NGINX upstreams for each service referenced in
|
||||
// Ingress rules. The servers inside the upstream are endpoints.
|
||||
func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ingress.Backend {
|
||||
func (ic *GenericController) createUpstreams(data []interface{}, du *ingress.Backend) map[string]*ingress.Backend {
|
||||
upstreams := make(map[string]*ingress.Backend)
|
||||
upstreams[defUpstreamName] = ic.getDefaultUpstream()
|
||||
upstreams[defUpstreamName] = du
|
||||
|
||||
for _, ingIf := range data {
|
||||
ing := ingIf.(*extensions.Ingress)
|
||||
|
@ -994,18 +787,13 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
|
|||
upstreams[name].Endpoints = endp
|
||||
}
|
||||
|
||||
s, exists, err := ic.svcLister.Store.GetByKey(svcKey)
|
||||
s, err := ic.listers.Service.GetByName(svcKey)
|
||||
if err != nil {
|
||||
glog.Warningf("error obtaining service: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !exists {
|
||||
glog.Warningf("service %v does not exists", svcKey)
|
||||
continue
|
||||
}
|
||||
|
||||
upstreams[name].Service = s.(*apiv1.Service)
|
||||
upstreams[name].Service = s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1014,7 +802,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
|
|||
}
|
||||
|
||||
func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
|
||||
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
|
||||
svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey)
|
||||
|
||||
if !svcExists {
|
||||
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
|
||||
|
@ -1035,19 +823,13 @@ func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *e
|
|||
// to a service.
|
||||
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
|
||||
hz *healthcheck.Upstream) ([]ingress.Endpoint, error) {
|
||||
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
|
||||
svc, err := ic.listers.Service.GetByName(svcKey)
|
||||
|
||||
var upstreams []ingress.Endpoint
|
||||
if err != nil {
|
||||
return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
|
||||
}
|
||||
|
||||
if !svcExists {
|
||||
err = fmt.Errorf("service %v does not exist", svcKey)
|
||||
return upstreams, err
|
||||
}
|
||||
|
||||
svc := svcObj.(*apiv1.Service)
|
||||
glog.V(3).Infof("obtaining port information for service %v", svcKey)
|
||||
for _, servicePort := range svc.Spec.Ports {
|
||||
// targetPort could be a string, use the name or the port (int)
|
||||
|
@ -1061,7 +843,15 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
|
|||
}
|
||||
|
||||
if ic.cfg.SortBackends {
|
||||
sort.Sort(ingress.EndpointByAddrPort(endps))
|
||||
sort.Slice(endps, func(i, j int) bool {
|
||||
iName := endps[i].Address
|
||||
jName := endps[j].Address
|
||||
if iName != jName {
|
||||
return iName < jName
|
||||
}
|
||||
|
||||
return endps[i].Port < endps[j].Port
|
||||
})
|
||||
}
|
||||
upstreams = append(upstreams, endps...)
|
||||
break
|
||||
|
@ -1084,11 +874,16 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
|
|||
// SSL certificates. Each server is configured with location / using a default
|
||||
// backend specified by the user or the one inside the ingress spec.
|
||||
func (ic *GenericController) createServers(data []interface{},
|
||||
upstreams map[string]*ingress.Backend) map[string]*ingress.Server {
|
||||
servers := make(map[string]*ingress.Server)
|
||||
upstreams map[string]*ingress.Backend,
|
||||
du *ingress.Backend) map[string]*ingress.Server {
|
||||
|
||||
servers := make(map[string]*ingress.Server, len(data))
|
||||
// If a server has a hostname equivalent to a pre-existing alias, then we
|
||||
// remove the alias to avoid conflicts.
|
||||
aliases := make(map[string]string, len(data))
|
||||
|
||||
bdef := ic.GetDefaultBackend()
|
||||
ngxProxy := proxy.Configuration{
|
||||
ngxProxy := &proxy.Configuration{
|
||||
BodySize: bdef.ProxyBodySize,
|
||||
ConnectTimeout: bdef.ProxyConnectTimeout,
|
||||
SendTimeout: bdef.ProxySendTimeout,
|
||||
|
@ -1111,7 +906,6 @@ func (ic *GenericController) createServers(data []interface{},
|
|||
}
|
||||
|
||||
// initialize the default server
|
||||
du := ic.getDefaultUpstream()
|
||||
servers[defServerName] = &ingress.Server{
|
||||
Hostname: defServerName,
|
||||
SSLCertificate: defaultPemFileName,
|
||||
|
@ -1137,7 +931,6 @@ func (ic *GenericController) createServers(data []interface{},
|
|||
sslpt := ic.annotations.SSLPassthrough(ing)
|
||||
|
||||
// default upstream server
|
||||
du := ic.getDefaultUpstream()
|
||||
un := du.Name
|
||||
|
||||
if ing.Spec.Backend != nil {
|
||||
|
@ -1178,7 +971,9 @@ func (ic *GenericController) createServers(data []interface{},
|
|||
Proxy: ngxProxy,
|
||||
Service: &apiv1.Service{},
|
||||
},
|
||||
}, SSLPassthrough: sslpt}
|
||||
},
|
||||
SSLPassthrough: sslpt,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1200,6 +995,11 @@ func (ic *GenericController) createServers(data []interface{},
|
|||
|
||||
// setup server aliases
|
||||
servers[host].Alias = aliasAnnotation
|
||||
if aliasAnnotation != "" {
|
||||
if _, ok := aliases[aliasAnnotation]; !ok {
|
||||
aliases[aliasAnnotation] = host
|
||||
}
|
||||
}
|
||||
|
||||
// only add a certificate if the server does not have one previously configured
|
||||
if servers[host].SSLCertificate != "" {
|
||||
|
@ -1258,6 +1058,12 @@ func (ic *GenericController) createServers(data []interface{},
|
|||
}
|
||||
}
|
||||
|
||||
for alias, host := range aliases {
|
||||
if _, ok := servers[alias]; ok {
|
||||
glog.Warningf("There is a conflict with server hostname '%v' and alias '%v' (in server %v). Removing alias to avoid conflicts.", alias, host)
|
||||
servers[host].Alias = ""
|
||||
}
|
||||
}
|
||||
return servers
|
||||
}
|
||||
|
||||
|
@ -1292,7 +1098,7 @@ func (ic *GenericController) getEndpoints(
|
|||
}
|
||||
|
||||
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
|
||||
ep, err := ic.endpLister.GetServiceEndpoints(s)
|
||||
ep, err := ic.listers.Endpoint.GetServiceEndpoints(s)
|
||||
if err != nil {
|
||||
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
|
||||
return upsServers
|
||||
|
@ -1402,9 +1208,16 @@ func (ic GenericController) Start() {
|
|||
}
|
||||
|
||||
// initial sync of secrets to avoid unnecessary reloads
|
||||
for _, key := range ic.ingLister.ListKeys() {
|
||||
if obj, exists, _ := ic.ingLister.GetByKey(key); exists {
|
||||
for _, key := range ic.listers.Ingress.ListKeys() {
|
||||
if obj, exists, _ := ic.listers.Ingress.GetByKey(key); exists {
|
||||
ing := obj.(*extensions.Ingress)
|
||||
|
||||
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
|
||||
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
|
||||
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
|
||||
continue
|
||||
}
|
||||
|
||||
ic.readSecrets(ing)
|
||||
}
|
||||
}
|
||||
|
@ -1417,6 +1230,12 @@ func (ic GenericController) Start() {
|
|||
go ic.syncStatus.Run(ic.stopCh)
|
||||
}
|
||||
|
||||
ic.initialSyncDone = true
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
// force initial sync
|
||||
ic.syncQueue.Enqueue(&extensions.Ingress{})
|
||||
|
||||
<-ic.stopCh
|
||||
}
|
||||
|
||||
|
|
200
core/pkg/ingress/controller/listers.go
Normal file
200
core/pkg/ingress/controller/listers.go
Normal file
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
fcache "k8s.io/client-go/tools/cache/testing"
|
||||
|
||||
"k8s.io/ingress/core/pkg/ingress/annotations/class"
|
||||
"k8s.io/ingress/core/pkg/ingress/annotations/parser"
|
||||
)
|
||||
|
||||
func (ic *GenericController) createListers(disableNodeLister bool) {
|
||||
// from here to the end of the method all the code is just boilerplate
|
||||
// required to watch Ingress, Secrets, ConfigMaps and Endoints.
|
||||
// This is used to detect new content, updates or removals and act accordingly
|
||||
ingEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
addIng := obj.(*extensions.Ingress)
|
||||
if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
|
||||
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
|
||||
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
|
||||
return
|
||||
}
|
||||
ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
|
||||
|
||||
if ic.initialSyncDone {
|
||||
ic.syncQueue.Enqueue(obj)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
delIng, ok := obj.(*extensions.Ingress)
|
||||
if !ok {
|
||||
// If we reached here it means the ingress was deleted but its final state is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
delIng, ok = tombstone.Obj.(*extensions.Ingress)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
|
||||
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
|
||||
return
|
||||
}
|
||||
ic.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
|
||||
ic.syncQueue.Enqueue(obj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldIng := old.(*extensions.Ingress)
|
||||
curIng := cur.(*extensions.Ingress)
|
||||
validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
|
||||
validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
|
||||
if !validOld && validCur {
|
||||
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
|
||||
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
} else if validOld && !validCur {
|
||||
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
|
||||
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
} else if validCur && !reflect.DeepEqual(old, cur) {
|
||||
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
|
||||
}
|
||||
|
||||
ic.syncQueue.Enqueue(cur)
|
||||
},
|
||||
}
|
||||
|
||||
secrEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
sec := cur.(*apiv1.Secret)
|
||||
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
||||
ic.syncSecret(key)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
sec, ok := obj.(*apiv1.Secret)
|
||||
if !ok {
|
||||
// If we reached here it means the secret was deleted but its final state is unrecorded.
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
sec, ok = tombstone.Obj.(*apiv1.Secret)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not a Secret: %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
||||
ic.sslCertTracker.DeleteAll(key)
|
||||
},
|
||||
}
|
||||
|
||||
eventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
ic.syncQueue.Enqueue(obj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
ic.syncQueue.Enqueue(obj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oep := old.(*apiv1.Endpoints)
|
||||
ocur := cur.(*apiv1.Endpoints)
|
||||
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
|
||||
ic.syncQueue.Enqueue(cur)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
mapEventHandler := cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
upCmap := obj.(*apiv1.ConfigMap)
|
||||
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
|
||||
if mapKey == ic.cfg.ConfigMapName {
|
||||
glog.V(2).Infof("adding configmap %v to backend", mapKey)
|
||||
ic.cfg.Backend.SetConfig(upCmap)
|
||||
ic.forceReload = true
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
upCmap := cur.(*apiv1.ConfigMap)
|
||||
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
|
||||
if mapKey == ic.cfg.ConfigMapName {
|
||||
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
|
||||
ic.cfg.Backend.SetConfig(upCmap)
|
||||
ic.forceReload = true
|
||||
}
|
||||
// updates to configuration configmaps can trigger an update
|
||||
if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName {
|
||||
ic.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
|
||||
ic.syncQueue.Enqueue(cur)
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
watchNs := apiv1.NamespaceAll
|
||||
if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != apiv1.NamespaceAll {
|
||||
watchNs = ic.cfg.Namespace
|
||||
}
|
||||
|
||||
ic.listers.Ingress.Store, ic.ingController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
|
||||
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)
|
||||
|
||||
ic.listers.Endpoint.Store, ic.endpController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
|
||||
&apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
|
||||
|
||||
ic.listers.Secret.Store, ic.secrController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
|
||||
&apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
|
||||
|
||||
ic.listers.ConfigMap.Store, ic.mapController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
|
||||
&apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
|
||||
|
||||
ic.listers.Service.Store, ic.svcController = cache.NewInformer(
|
||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
|
||||
&apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||
|
||||
var nodeListerWatcher cache.ListerWatcher
|
||||
if disableNodeLister {
|
||||
nodeListerWatcher = fcache.NewFakeControllerSource()
|
||||
} else {
|
||||
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
|
||||
}
|
||||
ic.listers.Node.Store, ic.nodeController = cache.NewInformer(
|
||||
nodeListerWatcher,
|
||||
&apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||
}
|
|
@ -51,7 +51,7 @@ func TestMergeLocationAnnotations(t *testing.T) {
|
|||
"Redirect": redirect.Redirect{},
|
||||
"Rewrite": rewrite.Redirect{},
|
||||
"Whitelist": ipwhitelist.SourceRange{},
|
||||
"Proxy": proxy.Configuration{},
|
||||
"Proxy": &proxy.Configuration{},
|
||||
"UsePortInRedirects": true,
|
||||
}
|
||||
|
||||
|
|
|
@ -24,52 +24,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
// BackendByNameServers sorts upstreams by name
|
||||
type BackendByNameServers []*Backend
|
||||
|
||||
func (c BackendByNameServers) Len() int { return len(c) }
|
||||
func (c BackendByNameServers) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
||||
func (c BackendByNameServers) Less(i, j int) bool {
|
||||
|
||||
return c[i].Name < c[j].Name
|
||||
}
|
||||
|
||||
// EndpointByAddrPort sorts endpoints by address and port
|
||||
type EndpointByAddrPort []Endpoint
|
||||
|
||||
func (c EndpointByAddrPort) Len() int { return len(c) }
|
||||
func (c EndpointByAddrPort) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
||||
func (c EndpointByAddrPort) Less(i, j int) bool {
|
||||
iName := c[i].Address
|
||||
jName := c[j].Address
|
||||
if iName != jName {
|
||||
return iName < jName
|
||||
}
|
||||
|
||||
iU := c[i].Port
|
||||
jU := c[j].Port
|
||||
return iU < jU
|
||||
}
|
||||
|
||||
// ServerByName sorts servers by name
|
||||
type ServerByName []*Server
|
||||
|
||||
func (c ServerByName) Len() int { return len(c) }
|
||||
func (c ServerByName) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
||||
func (c ServerByName) Less(i, j int) bool {
|
||||
return c[i].Hostname < c[j].Hostname
|
||||
}
|
||||
|
||||
// LocationByPath sorts location by path in descending order
|
||||
// Location / is the last one
|
||||
type LocationByPath []*Location
|
||||
|
||||
func (c LocationByPath) Len() int { return len(c) }
|
||||
func (c LocationByPath) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
||||
func (c LocationByPath) Less(i, j int) bool {
|
||||
return c[i].Path > c[j].Path
|
||||
}
|
||||
|
||||
// SSLCert describes a SSL certificate to be used in a server
|
||||
type SSLCert struct {
|
||||
metav1.ObjectMeta `json:"metadata,omitempty"`
|
||||
|
|
|
@ -22,347 +22,6 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func buildBackendByNameServers() BackendByNameServers {
|
||||
return []*Backend{
|
||||
{
|
||||
Name: "foo1",
|
||||
Secure: true,
|
||||
Endpoints: []Endpoint{},
|
||||
},
|
||||
{
|
||||
Name: "foo2",
|
||||
Secure: false,
|
||||
Endpoints: []Endpoint{},
|
||||
},
|
||||
{
|
||||
Name: "foo3",
|
||||
Secure: true,
|
||||
Endpoints: []Endpoint{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackendByNameServersLen(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
backends BackendByNameServers
|
||||
el int
|
||||
}{
|
||||
{[]*Backend{}, 0},
|
||||
{buildBackendByNameServers(), 3},
|
||||
{nil, 0},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.backends.Len()
|
||||
if r != fooTest.el {
|
||||
t.Errorf("returned %v but expected %v for the len of BackendByNameServers: %v", r, fooTest.el, fooTest.backends)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackendByNameServersSwap(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
backends BackendByNameServers
|
||||
i int
|
||||
j int
|
||||
}{
|
||||
{buildBackendByNameServers(), 0, 1},
|
||||
{buildBackendByNameServers(), 2, 1},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
fooi := fooTest.backends[fooTest.i]
|
||||
fooj := fooTest.backends[fooTest.j]
|
||||
fooTest.backends.Swap(fooTest.i, fooTest.j)
|
||||
if fooi.Name != fooTest.backends[fooTest.j].Name || fooj.Name != fooTest.backends[fooTest.i].Name {
|
||||
t.Errorf("failed to swap for ByNameServers, foo: %v", fooTest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackendByNameServersLess(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
backends BackendByNameServers
|
||||
i int
|
||||
j int
|
||||
er bool
|
||||
}{
|
||||
// order by name
|
||||
{buildBackendByNameServers(), 0, 2, true},
|
||||
{buildBackendByNameServers(), 1, 0, false},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.backends.Less(fooTest.i, fooTest.j)
|
||||
if r != fooTest.er {
|
||||
t.Errorf("returned %v but expected %v for the foo: %v", r, fooTest.er, fooTest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func buildEndpointByAddrPort() EndpointByAddrPort {
|
||||
return []Endpoint{
|
||||
{
|
||||
Address: "127.0.0.1",
|
||||
Port: "8080",
|
||||
MaxFails: 3,
|
||||
FailTimeout: 10,
|
||||
},
|
||||
{
|
||||
Address: "127.0.0.1",
|
||||
Port: "8081",
|
||||
MaxFails: 3,
|
||||
FailTimeout: 10,
|
||||
},
|
||||
{
|
||||
Address: "127.0.0.1",
|
||||
Port: "8082",
|
||||
MaxFails: 3,
|
||||
FailTimeout: 10,
|
||||
},
|
||||
{
|
||||
Address: "127.0.0.2",
|
||||
Port: "8082",
|
||||
MaxFails: 3,
|
||||
FailTimeout: 10,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointByAddrPortLen(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
endpoints EndpointByAddrPort
|
||||
el int
|
||||
}{
|
||||
{[]Endpoint{}, 0},
|
||||
{buildEndpointByAddrPort(), 4},
|
||||
{nil, 0},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.endpoints.Len()
|
||||
if r != fooTest.el {
|
||||
t.Errorf("returned %v but expected %v for the len of EndpointByAddrPort: %v", r, fooTest.el, fooTest.endpoints)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointByAddrPortSwap(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
endpoints EndpointByAddrPort
|
||||
i int
|
||||
j int
|
||||
}{
|
||||
{buildEndpointByAddrPort(), 0, 1},
|
||||
{buildEndpointByAddrPort(), 2, 1},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
fooi := fooTest.endpoints[fooTest.i]
|
||||
fooj := fooTest.endpoints[fooTest.j]
|
||||
fooTest.endpoints.Swap(fooTest.i, fooTest.j)
|
||||
if fooi.Port != fooTest.endpoints[fooTest.j].Port ||
|
||||
fooi.Address != fooTest.endpoints[fooTest.j].Address ||
|
||||
fooj.Port != fooTest.endpoints[fooTest.i].Port ||
|
||||
fooj.Address != fooTest.endpoints[fooTest.i].Address {
|
||||
t.Errorf("failed to swap for EndpointByAddrPort, foo: %v", fooTest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointByAddrPortLess(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
endpoints EndpointByAddrPort
|
||||
i int
|
||||
j int
|
||||
er bool
|
||||
}{
|
||||
// 1) order by name
|
||||
// 2) order by port(if the name is the same one)
|
||||
{buildEndpointByAddrPort(), 0, 1, true},
|
||||
{buildEndpointByAddrPort(), 2, 1, false},
|
||||
{buildEndpointByAddrPort(), 2, 3, true},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.endpoints.Less(fooTest.i, fooTest.j)
|
||||
if r != fooTest.er {
|
||||
t.Errorf("returned %v but expected %v for the foo: %v", r, fooTest.er, fooTest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func buildServerByName() ServerByName {
|
||||
return []*Server{
|
||||
{
|
||||
Hostname: "foo1",
|
||||
SSLPassthrough: true,
|
||||
SSLCertificate: "foo1_cert",
|
||||
SSLPemChecksum: "foo1_pem",
|
||||
Locations: []*Location{},
|
||||
},
|
||||
{
|
||||
Hostname: "foo2",
|
||||
SSLPassthrough: true,
|
||||
SSLCertificate: "foo2_cert",
|
||||
SSLPemChecksum: "foo2_pem",
|
||||
Locations: []*Location{},
|
||||
},
|
||||
{
|
||||
Hostname: "foo3",
|
||||
SSLPassthrough: false,
|
||||
SSLCertificate: "foo3_cert",
|
||||
SSLPemChecksum: "foo3_pem",
|
||||
Locations: []*Location{},
|
||||
},
|
||||
{
|
||||
Hostname: "_",
|
||||
SSLPassthrough: false,
|
||||
SSLCertificate: "foo4_cert",
|
||||
SSLPemChecksum: "foo4_pem",
|
||||
Locations: []*Location{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestServerByNameLen(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
servers ServerByName
|
||||
el int
|
||||
}{
|
||||
{[]*Server{}, 0},
|
||||
{buildServerByName(), 4},
|
||||
{nil, 0},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.servers.Len()
|
||||
if r != fooTest.el {
|
||||
t.Errorf("returned %v but expected %v for the len of ServerByName: %v", r, fooTest.el, fooTest.servers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServerByNameSwap(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
servers ServerByName
|
||||
i int
|
||||
j int
|
||||
}{
|
||||
{buildServerByName(), 0, 1},
|
||||
{buildServerByName(), 2, 1},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
fooi := fooTest.servers[fooTest.i]
|
||||
fooj := fooTest.servers[fooTest.j]
|
||||
fooTest.servers.Swap(fooTest.i, fooTest.j)
|
||||
if fooi.Hostname != fooTest.servers[fooTest.j].Hostname ||
|
||||
fooj.Hostname != fooTest.servers[fooTest.i].Hostname {
|
||||
t.Errorf("failed to swap for ServerByName, foo: %v", fooTest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServerByNameLess(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
servers ServerByName
|
||||
i int
|
||||
j int
|
||||
er bool
|
||||
}{
|
||||
{buildServerByName(), 0, 1, true},
|
||||
{buildServerByName(), 2, 1, false},
|
||||
{buildServerByName(), 2, 3, false},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.servers.Less(fooTest.i, fooTest.j)
|
||||
if r != fooTest.er {
|
||||
t.Errorf("returned %v but expected %v for the foo: %v", r, fooTest.er, fooTest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func buildLocationByPath() LocationByPath {
|
||||
return []*Location{
|
||||
{
|
||||
Path: "a",
|
||||
IsDefBackend: true,
|
||||
Backend: "a_back",
|
||||
},
|
||||
{
|
||||
Path: "b",
|
||||
IsDefBackend: true,
|
||||
Backend: "b_back",
|
||||
},
|
||||
{
|
||||
Path: "c",
|
||||
IsDefBackend: true,
|
||||
Backend: "c_back",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocationByPath(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
locations LocationByPath
|
||||
el int
|
||||
}{
|
||||
{[]*Location{}, 0},
|
||||
{buildLocationByPath(), 3},
|
||||
{nil, 0},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.locations.Len()
|
||||
if r != fooTest.el {
|
||||
t.Errorf("returned %v but expected %v for the len of LocationByPath: %v", r, fooTest.el, fooTest.locations)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocationByPathSwap(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
locations LocationByPath
|
||||
i int
|
||||
j int
|
||||
}{
|
||||
{buildLocationByPath(), 0, 1},
|
||||
{buildLocationByPath(), 2, 1},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
fooi := fooTest.locations[fooTest.i]
|
||||
fooj := fooTest.locations[fooTest.j]
|
||||
fooTest.locations.Swap(fooTest.i, fooTest.j)
|
||||
if fooi.Path != fooTest.locations[fooTest.j].Path ||
|
||||
fooj.Path != fooTest.locations[fooTest.i].Path {
|
||||
t.Errorf("failed to swap for LocationByPath, foo: %v", fooTest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocationByPathLess(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
locations LocationByPath
|
||||
i int
|
||||
j int
|
||||
er bool
|
||||
}{
|
||||
// sorts location by path in descending order
|
||||
{buildLocationByPath(), 0, 1, false},
|
||||
{buildLocationByPath(), 2, 1, true},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.locations.Less(fooTest.i, fooTest.j)
|
||||
if r != fooTest.er {
|
||||
t.Errorf("returned %v but expected %v for the foo: %v", r, fooTest.er, fooTest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetObjectKindForSSLCert(t *testing.T) {
|
||||
fk := &SSLCert{
|
||||
ObjectMeta: metav1.ObjectMeta{},
|
||||
|
|
|
@ -45,7 +45,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
updateInterval = 30 * time.Second
|
||||
updateInterval = 60 * time.Second
|
||||
)
|
||||
|
||||
// Sync ...
|
||||
|
@ -56,14 +56,16 @@ type Sync interface {
|
|||
|
||||
// Config ...
|
||||
type Config struct {
|
||||
Client clientset.Interface
|
||||
Client clientset.Interface
|
||||
|
||||
PublishService string
|
||||
IngressLister store.IngressLister
|
||||
|
||||
ElectionID string
|
||||
|
||||
UpdateStatusOnShutdown bool
|
||||
|
||||
IngressLister store.IngressLister
|
||||
|
||||
DefaultIngressClass string
|
||||
IngressClass string
|
||||
|
||||
|
@ -293,7 +295,10 @@ func sliceToStatus(endpoints []string) []apiv1.LoadBalancerIngress {
|
|||
}
|
||||
}
|
||||
|
||||
sort.Sort(loadBalancerIngressByIP(lbi))
|
||||
sort.Slice(lbi, func(a, b int) bool {
|
||||
return lbi[a].IP < lbi[b].IP
|
||||
})
|
||||
|
||||
return lbi
|
||||
}
|
||||
|
||||
|
@ -328,7 +333,10 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
|
|||
}
|
||||
|
||||
curIPs := currIng.Status.LoadBalancer.Ingress
|
||||
sort.Sort(loadBalancerIngressByIP(curIPs))
|
||||
sort.Slice(curIPs, func(a, b int) bool {
|
||||
return curIPs[a].IP < curIPs[b].IP
|
||||
})
|
||||
|
||||
if ingressSliceEqual(addrs, curIPs) {
|
||||
glog.V(3).Infof("skipping update of Ingress %v/%v (no change)", currIng.Namespace, currIng.Name)
|
||||
return
|
||||
|
@ -361,12 +369,3 @@ func ingressSliceEqual(lhs, rhs []apiv1.LoadBalancerIngress) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// loadBalancerIngressByIP sorts LoadBalancerIngress using the field IP
|
||||
type loadBalancerIngressByIP []apiv1.LoadBalancerIngress
|
||||
|
||||
func (c loadBalancerIngressByIP) Len() int { return len(c) }
|
||||
func (c loadBalancerIngressByIP) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
||||
func (c loadBalancerIngressByIP) Less(i, j int) bool {
|
||||
return c[i].IP < c[j].IP
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ package status
|
|||
|
||||
import (
|
||||
"os"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -35,7 +34,7 @@ import (
|
|||
"k8s.io/ingress/core/pkg/task"
|
||||
)
|
||||
|
||||
func buildLoadBalancerIngressByIP() loadBalancerIngressByIP {
|
||||
func buildLoadBalancerIngressByIP() []apiv1.LoadBalancerIngress {
|
||||
return []apiv1.LoadBalancerIngress{
|
||||
{
|
||||
IP: "10.0.0.1",
|
||||
|
@ -232,6 +231,7 @@ func buildIngressListener() store.IngressLister {
|
|||
},
|
||||
},
|
||||
})
|
||||
|
||||
return store.IngressLister{Store: s}
|
||||
}
|
||||
|
||||
|
@ -376,7 +376,6 @@ func TestRunningAddresessWithPods(t *testing.T) {
|
|||
func TestUpdateStatus(t *testing.T) {
|
||||
fk := buildStatusSync()
|
||||
newIPs := buildLoadBalancerIngressByIP()
|
||||
sort.Sort(loadBalancerIngressByIP(newIPs))
|
||||
fk.updateStatus(newIPs)
|
||||
|
||||
fooIngress1, err1 := fk.Client.Extensions().Ingresses(apiv1.NamespaceDefault).Get("foo_ingress_1", metav1.GetOptions{})
|
||||
|
@ -460,61 +459,3 @@ func TestIngressSliceEqual(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadBalancerIngressByIPLen(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
ips loadBalancerIngressByIP
|
||||
el int
|
||||
}{
|
||||
{[]apiv1.LoadBalancerIngress{}, 0},
|
||||
{buildLoadBalancerIngressByIP(), 4},
|
||||
{nil, 0},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.ips.Len()
|
||||
if r != fooTest.el {
|
||||
t.Errorf("returned %v but expected %v ", r, fooTest.el)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadBalancerIngressByIPSwap(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
ips loadBalancerIngressByIP
|
||||
i int
|
||||
j int
|
||||
}{
|
||||
{buildLoadBalancerIngressByIP(), 0, 1},
|
||||
{buildLoadBalancerIngressByIP(), 2, 1},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
fooi := fooTest.ips[fooTest.i]
|
||||
fooj := fooTest.ips[fooTest.j]
|
||||
fooTest.ips.Swap(fooTest.i, fooTest.j)
|
||||
if fooi.IP != fooTest.ips[fooTest.j].IP ||
|
||||
fooj.IP != fooTest.ips[fooTest.i].IP {
|
||||
t.Errorf("failed to swap for loadBalancerIngressByIP")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadBalancerIngressByIPLess(t *testing.T) {
|
||||
fooTests := []struct {
|
||||
ips loadBalancerIngressByIP
|
||||
i int
|
||||
j int
|
||||
er bool
|
||||
}{
|
||||
{buildLoadBalancerIngressByIP(), 0, 1, true},
|
||||
{buildLoadBalancerIngressByIP(), 2, 1, false},
|
||||
}
|
||||
|
||||
for _, fooTest := range fooTests {
|
||||
r := fooTest.ips.Less(fooTest.i, fooTest.j)
|
||||
if r != fooTest.er {
|
||||
t.Errorf("returned %v but expected %v ", r, fooTest.er)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ package store
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
api "k8s.io/api/core/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
|
@ -29,20 +29,56 @@ type IngressLister struct {
|
|||
}
|
||||
|
||||
// SecretsLister makes a Store that lists Secrets.
|
||||
type SecretsLister struct {
|
||||
type SecretLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByName searches for a secret in the local secrets Store
|
||||
func (sl *SecretLister) GetByName(name string) (*apiv1.Secret, error) {
|
||||
s, exists, err := sl.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("secret %v was not found", name)
|
||||
}
|
||||
return s.(*apiv1.Secret), nil
|
||||
}
|
||||
|
||||
// ConfigMapLister makes a Store that lists Configmaps.
|
||||
type ConfigMapLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByName searches for a configmap in the local configmaps Store
|
||||
func (cml *ConfigMapLister) GetByName(name string) (*apiv1.ConfigMap, error) {
|
||||
s, exists, err := cml.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("configmap %v was not found", name)
|
||||
}
|
||||
return s.(*apiv1.ConfigMap), nil
|
||||
}
|
||||
|
||||
// ServiceLister makes a Store that lists Services.
|
||||
type ServiceLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// GetByName searches for a service in the local secrets Store
|
||||
func (sl *ServiceLister) GetByName(name string) (*apiv1.Service, error) {
|
||||
s, exists, err := sl.GetByKey(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("service %v was not found", name)
|
||||
}
|
||||
return s.(*apiv1.Service), nil
|
||||
}
|
||||
|
||||
// NodeLister makes a Store that lists Nodes.
|
||||
type NodeLister struct {
|
||||
cache.Store
|
||||
|
@ -54,9 +90,9 @@ type EndpointLister struct {
|
|||
}
|
||||
|
||||
// GetServiceEndpoints returns the endpoints of a service, matched on service name.
|
||||
func (s *EndpointLister) GetServiceEndpoints(svc *api.Service) (ep api.Endpoints, err error) {
|
||||
func (s *EndpointLister) GetServiceEndpoints(svc *apiv1.Service) (ep apiv1.Endpoints, err error) {
|
||||
for _, m := range s.Store.List() {
|
||||
ep = *m.(*api.Endpoints)
|
||||
ep = *m.(*apiv1.Endpoints)
|
||||
if svc.Name == ep.Name && svc.Namespace == ep.Namespace {
|
||||
return ep, nil
|
||||
}
|
||||
|
@ -64,8 +100,3 @@ func (s *EndpointLister) GetServiceEndpoints(svc *api.Service) (ep api.Endpoints
|
|||
err = fmt.Errorf("could not find endpoints for service: %v", svc.Name)
|
||||
return
|
||||
}
|
||||
|
||||
// SecretLister makes a Store that lists Secres.
|
||||
type SecretLister struct {
|
||||
cache.Store
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ type Controller interface {
|
|||
SetConfig(*apiv1.ConfigMap)
|
||||
// SetListers allows the access of store listers present in the generic controller
|
||||
// This avoid the use of the kubernetes client.
|
||||
SetListers(StoreLister)
|
||||
SetListers(*StoreLister)
|
||||
// BackendDefaults returns the minimum settings required to configure the
|
||||
// communication to endpoints
|
||||
BackendDefaults() defaults.Backend
|
||||
|
@ -309,7 +309,7 @@ type Location struct {
|
|||
// Proxy contains information about timeouts and buffer sizes
|
||||
// to be used in connections against endpoints
|
||||
// +optional
|
||||
Proxy proxy.Configuration `json:"proxy,omitempty"`
|
||||
Proxy *proxy.Configuration `json:"proxy,omitempty"`
|
||||
// UsePortInRedirects indicates if redirects must specify the port
|
||||
// +optional
|
||||
UsePortInRedirects bool `json:"usePortInRedirects"`
|
||||
|
|
|
@ -382,7 +382,7 @@ func (l1 *Location) Equal(l2 *Location) bool {
|
|||
if !(&l1.Whitelist).Equal(&l2.Whitelist) {
|
||||
return false
|
||||
}
|
||||
if !(&l1.Proxy).Equal(&l2.Proxy) {
|
||||
if !(l1.Proxy).Equal(l2.Proxy) {
|
||||
return false
|
||||
}
|
||||
if l1.UsePortInRedirects != l2.UsePortInRedirects {
|
||||
|
|
Loading…
Reference in a new issue