switch to use shared informer

This commit is contained in:
Minhan Xia 2017-08-18 16:24:56 -07:00
parent 46c73032bb
commit 3839faf536
3 changed files with 81 additions and 70 deletions

View file

@ -24,10 +24,11 @@ import (
"github.com/golang/glog"
api_v1 "k8s.io/api/core/v1"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
informerv1 "k8s.io/client-go/informers/core/v1"
informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
@ -53,17 +54,45 @@ var (
storeSyncPollPeriod = 5 * time.Second
)
// ControllerContext holds
type ControllerContext struct {
IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
// Stop is the stop channel shared among controllers
StopCh chan struct{}
}
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration) *ControllerContext {
return &ControllerContext{
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
StopCh: make(chan struct{}),
}
}
func (ctx *ControllerContext) Start() {
go ctx.IngressInformer.Run(ctx.StopCh)
go ctx.ServiceInformer.Run(ctx.StopCh)
go ctx.PodInformer.Run(ctx.StopCh)
go ctx.NodeInformer.Run(ctx.StopCh)
}
// LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct {
client kubernetes.Interface
ingController cache.Controller
nodeController cache.Controller
svcController cache.Controller
podController cache.Controller
ingLister StoreToIngressLister
nodeLister StoreToNodeLister
svcLister StoreToServiceLister
client kubernetes.Interface
ingressSynced cache.InformerSynced
serviceSynced cache.InformerSynced
podSynced cache.InformerSynced
nodeSynced cache.InformerSynced
ingLister StoreToIngressLister
nodeLister StoreToNodeLister
svcLister StoreToServiceLister
// Health checks are the readiness probes of containers on pods.
podLister StoreToPodLister
// TODO: Watch secrets
@ -90,7 +119,7 @@ type LoadBalancerController struct {
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *ClusterManager, resyncPeriod time.Duration, namespace string) (*LoadBalancerController, error) {
func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
@ -99,23 +128,32 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
lbc := LoadBalancerController{
client: kubeClient,
CloudClusterManager: clusterManager,
stopCh: make(chan struct{}),
stopCh: ctx.StopCh,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme,
api_v1.EventSource{Component: "loadbalancer-controller"}),
apiv1.EventSource{Component: "loadbalancer-controller"}),
}
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
lbc.ingQueue = NewTaskQueue(lbc.sync)
lbc.hasSynced = lbc.storesSynced
// Ingress watch handlers
pathHandlers := cache.ResourceEventHandlerFuncs{
lbc.ingressSynced = ctx.IngressInformer.HasSynced
lbc.serviceSynced = ctx.ServiceInformer.HasSynced
lbc.podSynced = ctx.PodInformer.HasSynced
lbc.nodeSynced = ctx.NodeInformer.HasSynced
lbc.ingLister.Store = ctx.IngressInformer.GetStore()
lbc.svcLister.Indexer = ctx.ServiceInformer.GetIndexer()
lbc.podLister.Indexer = ctx.PodInformer.GetIndexer()
lbc.nodeLister.Indexer = ctx.NodeInformer.GetIndexer()
// ingress event handler
ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !isGCEIngress(addIng) {
glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, ingressClassKey)
return
}
lbc.recorder.Eventf(addIng, api_v1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.recorder.Eventf(addIng, apiv1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.ingQueue.enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
@ -137,13 +175,10 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
}
lbc.ingQueue.enqueue(cur)
},
}
lbc.ingLister.Store, lbc.ingController = cache.NewInformer(
cache.NewListWatchFromClient(lbc.client.Extensions().RESTClient(), "ingresses", namespace, fields.Everything()),
&extensions.Ingress{}, resyncPeriod, pathHandlers)
})
// Service watch handlers
svcHandlers := cache.ResourceEventHandlerFuncs{
// service event handler
ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: lbc.enqueueIngressForService,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
@ -151,38 +186,14 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
}
},
// Ingress deletes matter, service deletes don't.
}
})
lbc.svcLister.Indexer, lbc.svcController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "services", namespace, fields.Everything()),
&api_v1.Service{},
resyncPeriod,
svcHandlers,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
lbc.podLister.Indexer, lbc.podController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "pods", namespace, fields.Everything()),
&api_v1.Pod{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
// Node watch handlers
nodeHandlers := cache.ResourceEventHandlerFuncs{
// node event handler
ctx.NodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: lbc.nodeQueue.enqueue,
DeleteFunc: lbc.nodeQueue.enqueue,
// Nodes are updated every 10s and we don't care, so no update handler.
}
lbc.nodeLister.Indexer, lbc.nodeController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "nodes", api_v1.NamespaceAll, fields.Everything()),
&api_v1.Node{},
resyncPeriod,
nodeHandlers,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
})
lbc.tr = &GCETranslator{&lbc}
lbc.tlsLoader = &apiServerTLSLoader{client: lbc.client}
@ -193,7 +204,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
// enqueueIngressForService enqueues all the Ingress' for a Service.
func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
svc := obj.(*api_v1.Service)
svc := obj.(*apiv1.Service)
ings, err := lbc.ingLister.GetServiceIngress(svc)
if err != nil {
glog.V(5).Infof("ignoring service %v: %v", svc.Name, err)
@ -210,10 +221,6 @@ func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
// Run starts the loadbalancer controller.
func (lbc *LoadBalancerController) Run() {
glog.Infof("Starting loadbalancer controller")
go lbc.ingController.Run(lbc.stopCh)
go lbc.nodeController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)
go lbc.podController.Run(lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh)
go lbc.nodeQueue.run(time.Second, lbc.stopCh)
<-lbc.stopCh
@ -250,14 +257,14 @@ func (lbc *LoadBalancerController) storesSynced() bool {
return (
// wait for pods to sync so we don't allocate a default health check when
// an endpoint has a readiness probe.
lbc.podController.HasSynced() &&
lbc.podSynced() &&
// wait for services so we don't thrash on backend creation.
lbc.svcController.HasSynced() &&
lbc.serviceSynced() &&
// wait for nodes so we don't disconnect a backend from an instance
// group just because we don't realize there are nodes in that zone.
lbc.nodeController.HasSynced() &&
lbc.nodeSynced() &&
// Wait for ingresses as a safety measure. We don't really need this.
lbc.ingController.HasSynced())
lbc.ingressSynced())
}
// sync manages Ingress create/updates/deletes.
@ -312,7 +319,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
// TODO: Implement proper backoff for the queue.
eventMsg := "GCE"
if ingExists {
lbc.recorder.Eventf(obj.(*extensions.Ingress), api_v1.EventTypeWarning, eventMsg, err.Error())
lbc.recorder.Eventf(obj.(*extensions.Ingress), apiv1.EventTypeWarning, eventMsg, err.Error())
} else {
err = fmt.Errorf("%v, error: %v", eventMsg, err)
}
@ -333,10 +340,10 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
if urlMap, err := lbc.tr.toURLMap(&ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "UrlMap", err.Error())
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "Status", err.Error())
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
return syncError
@ -354,8 +361,8 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
return err
}
currIng.Status = extensions.IngressStatus{
LoadBalancer: api_v1.LoadBalancerStatus{
Ingress: []api_v1.LoadBalancerIngress{
LoadBalancer: apiv1.LoadBalancerStatus{
Ingress: []apiv1.LoadBalancerIngress{
{IP: ip},
},
},
@ -369,7 +376,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
if _, err := ingClient.UpdateStatus(currIng); err != nil {
return err
}
lbc.recorder.Eventf(currIng, api_v1.EventTypeNormal, "CREATE", "ip: %v", ip)
lbc.recorder.Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip)
}
}
// Update annotations through /update endpoint
@ -437,11 +444,11 @@ func (lbc *LoadBalancerController) syncNodes(key string) error {
}
func getNodeReadyPredicate() listers.NodeConditionPredicate {
return func(node *api_v1.Node) bool {
return func(node *apiv1.Node) bool {
for ix := range node.Status.Conditions {
condition := &node.Status.Conditions[ix]
if condition.Type == api_v1.NodeReady {
return condition.Status == api_v1.ConditionTrue
if condition.Type == apiv1.NodeReady {
return condition.Status == apiv1.ConditionTrue
}
}
return false

View file

@ -53,7 +53,8 @@ func defaultBackendName(clusterName string) string {
// newLoadBalancerController create a loadbalancer controller.
func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController {
kubeClient := fake.NewSimpleClientset()
lb, err := NewLoadBalancerController(kubeClient, cm.ClusterManager, 1*time.Second, api_v1.NamespaceAll)
ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second)
lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager)
if err != nil {
t.Fatalf("%v", err)
}

View file

@ -256,8 +256,10 @@ func main() {
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
}
ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod)
// Start loadbalancer controller
lbc, err := controller.NewLoadBalancerController(kubeClient, clusterManager, *resyncPeriod, *watchNamespace)
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager)
if err != nil {
glog.Fatalf("%v", err)
}
@ -268,6 +270,7 @@ func main() {
go registerHandlers(lbc)
go handleSigterm(lbc, *deleteAllOnQuit)
ctx.Start()
lbc.Run()
for {
glog.Infof("Handled quit, awaiting pod deletion.")