Refactor listers
This commit is contained in:
parent
73bca9f192
commit
a1b458f7fb
6 changed files with 104 additions and 87 deletions
|
@ -322,7 +322,7 @@ func TestBuildNextUpstream(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildRateLimit(t *testing.T) {
|
func TestBuildRateLimit(t *testing.T) {
|
||||||
loc := ingress.Location{}
|
loc := &ingress.Location{}
|
||||||
|
|
||||||
loc.RateLimit.Connections.Name = "con"
|
loc.RateLimit.Connections.Name = "con"
|
||||||
loc.RateLimit.Connections.Limit = 1
|
loc.RateLimit.Connections.Limit = 1
|
||||||
|
|
|
@ -55,19 +55,17 @@ func (ic *GenericController) syncSecret(key string) {
|
||||||
}
|
}
|
||||||
glog.Infof("updating secret %v in the local store", key)
|
glog.Infof("updating secret %v in the local store", key)
|
||||||
ic.sslCertTracker.Update(key, cert)
|
ic.sslCertTracker.Update(key, cert)
|
||||||
// we need to force the sync of the secret to disk
|
|
||||||
ic.syncSecret(key)
|
|
||||||
// this update must trigger an update
|
// this update must trigger an update
|
||||||
// (like an update event from a change in Ingress)
|
// (like an update event from a change in Ingress)
|
||||||
ic.syncIngress("update-secret")
|
ic.syncQueue.Enqueue(&extensions.Ingress{})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.Infof("adding secret %v to the local store", key)
|
glog.Infof("adding secret %v to the local store", key)
|
||||||
ic.sslCertTracker.Add(key, cert)
|
ic.sslCertTracker.Add(key, cert)
|
||||||
// this new secret must trigger an update
|
// this update must trigger an update
|
||||||
// (like an update event from a change in Ingress)
|
// (like an update event from a change in Ingress)
|
||||||
ic.syncIngress("add-secret")
|
ic.syncQueue.Enqueue(&extensions.Ingress{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
|
// getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
|
||||||
|
@ -130,8 +128,7 @@ func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLC
|
||||||
// to a secret that is not present in the local secret store.
|
// to a secret that is not present in the local secret store.
|
||||||
// In this case we call syncSecret.
|
// In this case we call syncSecret.
|
||||||
func (ic *GenericController) checkMissingSecrets() {
|
func (ic *GenericController) checkMissingSecrets() {
|
||||||
for _, key := range ic.listers.Ingress.ListKeys() {
|
for _, obj := range ic.listers.Ingress.List() {
|
||||||
if obj, exists, _ := ic.listers.Ingress.GetByKey(key); exists {
|
|
||||||
ing := obj.(*extensions.Ingress)
|
ing := obj.(*extensions.Ingress)
|
||||||
|
|
||||||
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
|
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
|
||||||
|
@ -158,7 +155,6 @@ func (ic *GenericController) checkMissingSecrets() {
|
||||||
ic.syncSecret(key)
|
ic.syncSecret(key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// sslCertTracker holds a store of referenced Secrets in Ingress rules
|
// sslCertTracker holds a store of referenced Secrets in Ingress rules
|
||||||
|
|
|
@ -26,9 +26,11 @@ import (
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
testclient "k8s.io/client-go/kubernetes/fake"
|
testclient "k8s.io/client-go/kubernetes/fake"
|
||||||
cache_client "k8s.io/client-go/tools/cache"
|
cache_client "k8s.io/client-go/tools/cache"
|
||||||
|
"k8s.io/client-go/util/flowcontrol"
|
||||||
|
|
||||||
"k8s.io/ingress/core/pkg/ingress"
|
"k8s.io/ingress/core/pkg/ingress"
|
||||||
"k8s.io/ingress/core/pkg/ingress/store"
|
"k8s.io/ingress/core/pkg/ingress/store"
|
||||||
|
"k8s.io/ingress/core/pkg/task"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -102,21 +104,17 @@ func buildControllerForBackendSSL() cache_client.Controller {
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildGenericControllerForBackendSSL() *GenericController {
|
func buildGenericControllerForBackendSSL() *GenericController {
|
||||||
return &GenericController{
|
gc := &GenericController{
|
||||||
|
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
|
||||||
cfg: &Configuration{
|
cfg: &Configuration{
|
||||||
Client: buildSimpleClientSetForBackendSSL(),
|
Client: buildSimpleClientSetForBackendSSL(),
|
||||||
},
|
},
|
||||||
listers: buildListers(),
|
listers: buildListers(),
|
||||||
|
|
||||||
ingController: buildControllerForBackendSSL(),
|
|
||||||
endpController: buildControllerForBackendSSL(),
|
|
||||||
svcController: buildControllerForBackendSSL(),
|
|
||||||
nodeController: buildControllerForBackendSSL(),
|
|
||||||
secrController: buildControllerForBackendSSL(),
|
|
||||||
mapController: buildControllerForBackendSSL(),
|
|
||||||
|
|
||||||
sslCertTracker: newSSLCertTracker(),
|
sslCertTracker: newSSLCertTracker(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
gc.syncQueue = task.NewTaskQueue(gc.syncIngress)
|
||||||
|
return gc
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) {
|
func buildCrtKeyAndCA() ([]byte, []byte, []byte, error) {
|
||||||
|
|
|
@ -34,13 +34,11 @@ import (
|
||||||
extensions "k8s.io/api/extensions/v1beta1"
|
extensions "k8s.io/api/extensions/v1beta1"
|
||||||
"k8s.io/apimachinery/pkg/conversion"
|
"k8s.io/apimachinery/pkg/conversion"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
"k8s.io/client-go/tools/cache"
|
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/client-go/util/flowcontrol"
|
"k8s.io/client-go/util/flowcontrol"
|
||||||
|
|
||||||
|
@ -80,14 +78,8 @@ var (
|
||||||
type GenericController struct {
|
type GenericController struct {
|
||||||
cfg *Configuration
|
cfg *Configuration
|
||||||
|
|
||||||
ingController cache.Controller
|
|
||||||
endpController cache.Controller
|
|
||||||
svcController cache.Controller
|
|
||||||
nodeController cache.Controller
|
|
||||||
secrController cache.Controller
|
|
||||||
mapController cache.Controller
|
|
||||||
|
|
||||||
listers *ingress.StoreLister
|
listers *ingress.StoreLister
|
||||||
|
cacheController *cacheController
|
||||||
|
|
||||||
annotations annotationExtractor
|
annotations annotationExtractor
|
||||||
|
|
||||||
|
@ -166,12 +158,11 @@ func newIngressController(config *Configuration) *GenericController {
|
||||||
Component: "ingress-controller",
|
Component: "ingress-controller",
|
||||||
}),
|
}),
|
||||||
sslCertTracker: newSSLCertTracker(),
|
sslCertTracker: newSSLCertTracker(),
|
||||||
listers: &ingress.StoreLister{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ic.syncQueue = task.NewTaskQueue(ic.syncIngress)
|
ic.syncQueue = task.NewTaskQueue(ic.syncIngress)
|
||||||
|
|
||||||
ic.createListers(config.DisableNodeList)
|
ic.listers, ic.cacheController = ic.createListers(config.DisableNodeList)
|
||||||
|
|
||||||
if config.UpdateStatus {
|
if config.UpdateStatus {
|
||||||
ic.syncStatus = status.NewStatusSyncer(status.Config{
|
ic.syncStatus = status.NewStatusSyncer(status.Config{
|
||||||
|
@ -684,23 +675,23 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
|
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
|
||||||
func (ic GenericController) GetAuthCertificate(secretName string) (*resolver.AuthSSLCert, error) {
|
func (ic GenericController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
|
||||||
if _, exists := ic.sslCertTracker.Get(secretName); !exists {
|
if _, exists := ic.sslCertTracker.Get(name); !exists {
|
||||||
ic.syncSecret(secretName)
|
ic.syncSecret(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := ic.listers.Secret.GetByName(secretName)
|
_, err := ic.listers.Secret.GetByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
|
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bc, exists := ic.sslCertTracker.Get(secretName)
|
bc, exists := ic.sslCertTracker.Get(name)
|
||||||
if !exists {
|
if !exists {
|
||||||
return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", secretName)
|
return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name)
|
||||||
}
|
}
|
||||||
cert := bc.(*ingress.SSLCert)
|
cert := bc.(*ingress.SSLCert)
|
||||||
return &resolver.AuthSSLCert{
|
return &resolver.AuthSSLCert{
|
||||||
Secret: secretName,
|
Secret: name,
|
||||||
CAFileName: cert.CAFileName,
|
CAFileName: cert.CAFileName,
|
||||||
PemSHA: cert.PemSHA,
|
PemSHA: cert.PemSHA,
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -1213,39 +1204,33 @@ func (ic GenericController) Stop() error {
|
||||||
func (ic *GenericController) Start() {
|
func (ic *GenericController) Start() {
|
||||||
glog.Infof("starting Ingress controller")
|
glog.Infof("starting Ingress controller")
|
||||||
|
|
||||||
go ic.ingController.Run(ic.stopCh)
|
ic.cacheController.Run(ic.stopCh)
|
||||||
go ic.endpController.Run(ic.stopCh)
|
|
||||||
go ic.svcController.Run(ic.stopCh)
|
|
||||||
go ic.nodeController.Run(ic.stopCh)
|
|
||||||
go ic.secrController.Run(ic.stopCh)
|
|
||||||
go ic.mapController.Run(ic.stopCh)
|
|
||||||
|
|
||||||
go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh)
|
|
||||||
|
|
||||||
// Wait for all involved caches to be synced, before processing items from the queue is started
|
|
||||||
if !cache.WaitForCacheSync(ic.stopCh,
|
|
||||||
ic.ingController.HasSynced,
|
|
||||||
ic.svcController.HasSynced,
|
|
||||||
ic.endpController.HasSynced,
|
|
||||||
ic.secrController.HasSynced,
|
|
||||||
ic.mapController.HasSynced,
|
|
||||||
ic.nodeController.HasSynced,
|
|
||||||
) {
|
|
||||||
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// initial sync of secrets to avoid unnecessary reloads
|
|
||||||
ic.checkMissingSecrets()
|
|
||||||
|
|
||||||
createDefaultSSLCertificate()
|
createDefaultSSLCertificate()
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
// initial sync of secrets to avoid unnecessary reloads
|
||||||
|
glog.Info("running initial sync of secret")
|
||||||
|
for _, obj := range ic.listers.Ingress.List() {
|
||||||
|
ing := obj.(*extensions.Ingress)
|
||||||
|
|
||||||
|
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
|
||||||
|
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
|
||||||
|
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ic.readSecrets(ing)
|
||||||
|
}
|
||||||
|
|
||||||
go ic.syncQueue.Run(time.Second, ic.stopCh)
|
go ic.syncQueue.Run(time.Second, ic.stopCh)
|
||||||
|
|
||||||
if ic.syncStatus != nil {
|
if ic.syncStatus != nil {
|
||||||
go ic.syncStatus.Run(ic.stopCh)
|
go ic.syncStatus.Run(ic.stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(5 * time.Second)
|
go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh)
|
||||||
|
|
||||||
// force initial sync
|
// force initial sync
|
||||||
ic.syncQueue.Enqueue(&extensions.Ingress{})
|
ic.syncQueue.Enqueue(&extensions.Ingress{})
|
||||||
|
|
||||||
|
|
|
@ -25,14 +25,46 @@ import (
|
||||||
apiv1 "k8s.io/api/core/v1"
|
apiv1 "k8s.io/api/core/v1"
|
||||||
extensions "k8s.io/api/extensions/v1beta1"
|
extensions "k8s.io/api/extensions/v1beta1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
fcache "k8s.io/client-go/tools/cache/testing"
|
fcache "k8s.io/client-go/tools/cache/testing"
|
||||||
|
|
||||||
|
"k8s.io/ingress/core/pkg/ingress"
|
||||||
"k8s.io/ingress/core/pkg/ingress/annotations/class"
|
"k8s.io/ingress/core/pkg/ingress/annotations/class"
|
||||||
"k8s.io/ingress/core/pkg/ingress/annotations/parser"
|
"k8s.io/ingress/core/pkg/ingress/annotations/parser"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (ic *GenericController) createListers(disableNodeLister bool) {
|
type cacheController struct {
|
||||||
|
Ingress cache.Controller
|
||||||
|
Endpoint cache.Controller
|
||||||
|
Service cache.Controller
|
||||||
|
Node cache.Controller
|
||||||
|
Secret cache.Controller
|
||||||
|
Configmap cache.Controller
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheController) Run(stopCh chan struct{}) {
|
||||||
|
go c.Ingress.Run(stopCh)
|
||||||
|
go c.Endpoint.Run(stopCh)
|
||||||
|
go c.Service.Run(stopCh)
|
||||||
|
go c.Node.Run(stopCh)
|
||||||
|
go c.Secret.Run(stopCh)
|
||||||
|
go c.Configmap.Run(stopCh)
|
||||||
|
|
||||||
|
// Wait for all involved caches to be synced, before processing items from the queue is started
|
||||||
|
if !cache.WaitForCacheSync(stopCh,
|
||||||
|
c.Ingress.HasSynced,
|
||||||
|
c.Endpoint.HasSynced,
|
||||||
|
c.Service.HasSynced,
|
||||||
|
c.Node.HasSynced,
|
||||||
|
c.Secret.HasSynced,
|
||||||
|
c.Configmap.HasSynced,
|
||||||
|
) {
|
||||||
|
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.StoreLister, *cacheController) {
|
||||||
// from here to the end of the method all the code is just boilerplate
|
// from here to the end of the method all the code is just boilerplate
|
||||||
// required to watch Ingress, Secrets, ConfigMaps and Endoints.
|
// required to watch Ingress, Secrets, ConfigMaps and Endoints.
|
||||||
// This is used to detect new content, updates or removals and act accordingly
|
// This is used to detect new content, updates or removals and act accordingly
|
||||||
|
@ -166,23 +198,27 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
|
||||||
watchNs = ic.cfg.Namespace
|
watchNs = ic.cfg.Namespace
|
||||||
}
|
}
|
||||||
|
|
||||||
ic.listers.Ingress.Store, ic.ingController = cache.NewInformer(
|
lister := &ingress.StoreLister{}
|
||||||
|
|
||||||
|
controller := &cacheController{}
|
||||||
|
|
||||||
|
lister.Ingress.Store, controller.Ingress = cache.NewInformer(
|
||||||
cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
|
cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
|
||||||
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)
|
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)
|
||||||
|
|
||||||
ic.listers.Endpoint.Store, ic.endpController = cache.NewInformer(
|
lister.Endpoint.Store, controller.Endpoint = cache.NewInformer(
|
||||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
|
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
|
||||||
&apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
|
&apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
|
||||||
|
|
||||||
ic.listers.Secret.Store, ic.secrController = cache.NewInformer(
|
lister.Secret.Store, controller.Secret = cache.NewInformer(
|
||||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
|
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
|
||||||
&apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
|
&apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
|
||||||
|
|
||||||
ic.listers.ConfigMap.Store, ic.mapController = cache.NewInformer(
|
lister.ConfigMap.Store, controller.Configmap = cache.NewInformer(
|
||||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
|
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
|
||||||
&apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
|
&apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
|
||||||
|
|
||||||
ic.listers.Service.Store, ic.svcController = cache.NewInformer(
|
lister.Service.Store, controller.Service = cache.NewInformer(
|
||||||
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
|
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
|
||||||
&apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
|
&apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||||
|
|
||||||
|
@ -192,7 +228,9 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
|
||||||
} else {
|
} else {
|
||||||
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
|
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
|
||||||
}
|
}
|
||||||
ic.listers.Node.Store, ic.nodeController = cache.NewInformer(
|
lister.Node.Store, controller.Node = cache.NewInformer(
|
||||||
nodeListerWatcher,
|
nodeListerWatcher,
|
||||||
&apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
|
&apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
|
||||||
|
|
||||||
|
return lister, controller
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,7 +99,7 @@ func (n DummyController) ConfigureFlags(*pflag.FlagSet) {
|
||||||
func (n DummyController) OverrideFlags(*pflag.FlagSet) {
|
func (n DummyController) OverrideFlags(*pflag.FlagSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n DummyController) SetListers(lister ingress.StoreLister) {
|
func (n DummyController) SetListers(lister *ingress.StoreLister) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue