Use SharedIndexInformers in place of Informers (#2271)

This commit is contained in:
Antoine Cotten 2018-03-29 14:35:01 +02:00 committed by Manuel Alejandro de Brito Fontes
parent 5738ddbdb5
commit b09ecf790b

View file

@ -29,15 +29,15 @@ import (
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
cache_client "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-nginx/internal/file"
@ -120,6 +120,15 @@ type Event struct {
Obj interface{}
}
// Informer defines the required SharedIndexInformers that interact with the API server.
type Informer struct {
Ingress cache.SharedIndexInformer
Endpoint cache.SharedIndexInformer
Service cache.SharedIndexInformer
Secret cache.SharedIndexInformer
ConfigMap cache.SharedIndexInformer
}
// Lister returns the stores for ingresses, services, endpoints, secrets and configmaps.
type Lister struct {
Ingress IngressLister
@ -130,38 +139,34 @@ type Lister struct {
IngressAnnotation IngressAnnotationsLister
}
// Controller defines the required controllers that interact against the api server
type Controller struct {
Ingress cache.Controller
Endpoint cache.Controller
Service cache.Controller
Secret cache.Controller
Configmap cache.Controller
}
// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
go i.Endpoint.Run(stopCh)
go i.Service.Run(stopCh)
go i.Secret.Run(stopCh)
go i.ConfigMap.Run(stopCh)
// Run initiates the synchronization of the controllers against the api server
func (c *Controller) Run(stopCh chan struct{}) {
go c.Endpoint.Run(stopCh)
go c.Service.Run(stopCh)
go c.Secret.Run(stopCh)
go c.Configmap.Run(stopCh)
// Wait for all involved caches to be synced, before processing items from the queue is started
// wait for all involved caches to be synced before processing items
// from the queue
if !cache.WaitForCacheSync(stopCh,
c.Endpoint.HasSynced,
c.Service.HasSynced,
c.Secret.HasSynced,
c.Configmap.HasSynced,
i.Endpoint.HasSynced,
i.Service.HasSynced,
i.Secret.HasSynced,
i.ConfigMap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
// We need to wait before start syncing the ingress rules
// because the rules requires content from other listers
// in big clusters, deltas can keep arriving even after HasSynced
// functions have returned 'true'
time.Sleep(1 * time.Second)
go c.Ingress.Run(stopCh)
// we can start syncing ingress objects only after other caches are
// ready, because ingress rules require content from other listers, and
// 'add' events get triggered in the handlers during caches population.
go i.Ingress.Run(stopCh)
if !cache.WaitForCacheSync(stopCh,
c.Ingress.HasSynced,
i.Ingress.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
@ -176,10 +181,10 @@ type k8sStore struct {
// operation to execute in each OnUpdate invocation
backendConfig ngx_config.Configuration
// cache contains the cache Controllers
cache *Controller
// informer contains the cache Informers
informers *Informer
// listers contains the cache.Store used in the ingress controller
// listers contains the cache.Store interfaces used in the ingress controller
listers *Lister
// sslStore local store of SSL certificates (certificates used in ingress)
@ -214,7 +219,7 @@ func New(checkOCSP bool,
store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
cache: &Controller{},
informers: &Informer{},
listers: &Lister{},
sslStore: NewSSLCertTracker(),
filesystem: fs,
@ -237,6 +242,26 @@ func New(checkOCSP bool,
// k8sStore fulfils resolver.Resolver interface
store.annotations = annotations.NewAnnotationExtractor(store)
store.listers.IngressAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
// create informers factory, enable and assign required informers
infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})
store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()
store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()
store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()
store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
@ -372,7 +397,7 @@ func New(checkOCSP bool,
},
}
eventHandler := cache.ResourceEventHandlerFuncs{
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: CreateEvent,
@ -434,27 +459,11 @@ func New(checkOCSP bool,
},
}
store.listers.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)
store.listers.Ingress.Store, store.cache.Ingress = cache.NewInformer(
cache.NewListWatchFromClient(client.ExtensionsV1beta1().RESTClient(), "ingresses", namespace, fields.Everything()),
&extensions.Ingress{}, resyncPeriod, ingEventHandler)
store.listers.Endpoint.Store, store.cache.Endpoint = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", namespace, fields.Everything()),
&apiv1.Endpoints{}, resyncPeriod, eventHandler)
store.listers.Secret.Store, store.cache.Secret = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "secrets", namespace, fields.Everything()),
&apiv1.Secret{}, resyncPeriod, secrEventHandler)
store.listers.ConfigMap.Store, store.cache.Configmap = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()),
&apiv1.ConfigMap{}, resyncPeriod, mapEventHandler)
store.listers.Service.Store, store.cache.Service = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "services", namespace, fields.Everything()),
&apiv1.Service{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(mapEventHandler)
store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{})
return store
}
@ -611,11 +620,11 @@ func (s *k8sStore) setConfig(cmap *apiv1.ConfigMap) {
}
}
// Run initiates the synchronization of the controllers
// and the initial synchronization of the secrets.
// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s k8sStore) Run(stopCh chan struct{}) {
// start controllers
s.cache.Run(stopCh)
// start informers
s.informers.Run(stopCh)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")