diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index ece388231..c3236b16c 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -24,10 +24,11 @@ import ( "github.com/golang/glog" - api_v1 "k8s.io/api/core/v1" + apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" + informerv1 "k8s.io/client-go/informers/core/v1" + informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1" "k8s.io/client-go/kubernetes" scheme "k8s.io/client-go/kubernetes/scheme" unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" @@ -53,17 +54,45 @@ var ( storeSyncPollPeriod = 5 * time.Second ) +// ControllerContext holds +type ControllerContext struct { + IngressInformer cache.SharedIndexInformer + ServiceInformer cache.SharedIndexInformer + PodInformer cache.SharedIndexInformer + NodeInformer cache.SharedIndexInformer + // Stop is the stop channel shared among controllers + StopCh chan struct{} +} + +func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration) *ControllerContext { + return &ControllerContext{ + IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + StopCh: make(chan struct{}), + } +} + +func (ctx *ControllerContext) Start() { + go ctx.IngressInformer.Run(ctx.StopCh) + go ctx.ServiceInformer.Run(ctx.StopCh) + go ctx.PodInformer.Run(ctx.StopCh) + go ctx.NodeInformer.Run(ctx.StopCh) +} + // LoadBalancerController watches the kubernetes api and adds/removes services // from the loadbalancer, via loadBalancerConfig. type LoadBalancerController struct { - client kubernetes.Interface - ingController cache.Controller - nodeController cache.Controller - svcController cache.Controller - podController cache.Controller - ingLister StoreToIngressLister - nodeLister StoreToNodeLister - svcLister StoreToServiceLister + client kubernetes.Interface + + ingressSynced cache.InformerSynced + serviceSynced cache.InformerSynced + podSynced cache.InformerSynced + nodeSynced cache.InformerSynced + ingLister StoreToIngressLister + nodeLister StoreToNodeLister + svcLister StoreToServiceLister // Health checks are the readiness probes of containers on pods. podLister StoreToPodLister // TODO: Watch secrets @@ -90,7 +119,7 @@ type LoadBalancerController struct { // - clusterManager: A ClusterManager capable of creating all cloud resources // required for L7 loadbalancing. // - resyncPeriod: Watchers relist from the Kubernetes API server this often. -func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *ClusterManager, resyncPeriod time.Duration, namespace string) (*LoadBalancerController, error) { +func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ @@ -99,23 +128,32 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager * lbc := LoadBalancerController{ client: kubeClient, CloudClusterManager: clusterManager, - stopCh: make(chan struct{}), + stopCh: ctx.StopCh, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, - api_v1.EventSource{Component: "loadbalancer-controller"}), + apiv1.EventSource{Component: "loadbalancer-controller"}), } lbc.nodeQueue = NewTaskQueue(lbc.syncNodes) lbc.ingQueue = NewTaskQueue(lbc.sync) lbc.hasSynced = lbc.storesSynced - // Ingress watch handlers - pathHandlers := cache.ResourceEventHandlerFuncs{ + lbc.ingressSynced = ctx.IngressInformer.HasSynced + lbc.serviceSynced = ctx.ServiceInformer.HasSynced + lbc.podSynced = ctx.PodInformer.HasSynced + lbc.nodeSynced = ctx.NodeInformer.HasSynced + + lbc.ingLister.Store = ctx.IngressInformer.GetStore() + lbc.svcLister.Indexer = ctx.ServiceInformer.GetIndexer() + lbc.podLister.Indexer = ctx.PodInformer.GetIndexer() + lbc.nodeLister.Indexer = ctx.NodeInformer.GetIndexer() + // ingress event handler + ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addIng := obj.(*extensions.Ingress) if !isGCEIngress(addIng) { glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, ingressClassKey) return } - lbc.recorder.Eventf(addIng, api_v1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name)) + lbc.recorder.Eventf(addIng, apiv1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name)) lbc.ingQueue.enqueue(obj) }, DeleteFunc: func(obj interface{}) { @@ -137,13 +175,10 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager * } lbc.ingQueue.enqueue(cur) }, - } - lbc.ingLister.Store, lbc.ingController = cache.NewInformer( - cache.NewListWatchFromClient(lbc.client.Extensions().RESTClient(), "ingresses", namespace, fields.Everything()), - &extensions.Ingress{}, resyncPeriod, pathHandlers) + }) - // Service watch handlers - svcHandlers := cache.ResourceEventHandlerFuncs{ + // service event handler + ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: lbc.enqueueIngressForService, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { @@ -151,38 +186,14 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager * } }, // Ingress deletes matter, service deletes don't. - } + }) - lbc.svcLister.Indexer, lbc.svcController = cache.NewIndexerInformer( - cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "services", namespace, fields.Everything()), - &api_v1.Service{}, - resyncPeriod, - svcHandlers, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - lbc.podLister.Indexer, lbc.podController = cache.NewIndexerInformer( - cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "pods", namespace, fields.Everything()), - &api_v1.Pod{}, - resyncPeriod, - cache.ResourceEventHandlerFuncs{}, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - // Node watch handlers - nodeHandlers := cache.ResourceEventHandlerFuncs{ + // node event handler + ctx.NodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: lbc.nodeQueue.enqueue, DeleteFunc: lbc.nodeQueue.enqueue, // Nodes are updated every 10s and we don't care, so no update handler. - } - - lbc.nodeLister.Indexer, lbc.nodeController = cache.NewIndexerInformer( - cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "nodes", api_v1.NamespaceAll, fields.Everything()), - &api_v1.Node{}, - resyncPeriod, - nodeHandlers, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + }) lbc.tr = &GCETranslator{&lbc} lbc.tlsLoader = &apiServerTLSLoader{client: lbc.client} @@ -193,7 +204,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager * // enqueueIngressForService enqueues all the Ingress' for a Service. func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) { - svc := obj.(*api_v1.Service) + svc := obj.(*apiv1.Service) ings, err := lbc.ingLister.GetServiceIngress(svc) if err != nil { glog.V(5).Infof("ignoring service %v: %v", svc.Name, err) @@ -210,10 +221,6 @@ func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) { // Run starts the loadbalancer controller. func (lbc *LoadBalancerController) Run() { glog.Infof("Starting loadbalancer controller") - go lbc.ingController.Run(lbc.stopCh) - go lbc.nodeController.Run(lbc.stopCh) - go lbc.svcController.Run(lbc.stopCh) - go lbc.podController.Run(lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh) go lbc.nodeQueue.run(time.Second, lbc.stopCh) <-lbc.stopCh @@ -250,14 +257,14 @@ func (lbc *LoadBalancerController) storesSynced() bool { return ( // wait for pods to sync so we don't allocate a default health check when // an endpoint has a readiness probe. - lbc.podController.HasSynced() && + lbc.podSynced() && // wait for services so we don't thrash on backend creation. - lbc.svcController.HasSynced() && + lbc.serviceSynced() && // wait for nodes so we don't disconnect a backend from an instance // group just because we don't realize there are nodes in that zone. - lbc.nodeController.HasSynced() && + lbc.nodeSynced() && // Wait for ingresses as a safety measure. We don't really need this. - lbc.ingController.HasSynced()) + lbc.ingressSynced()) } // sync manages Ingress create/updates/deletes. @@ -312,7 +319,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { // TODO: Implement proper backoff for the queue. eventMsg := "GCE" if ingExists { - lbc.recorder.Eventf(obj.(*extensions.Ingress), api_v1.EventTypeWarning, eventMsg, err.Error()) + lbc.recorder.Eventf(obj.(*extensions.Ingress), apiv1.EventTypeWarning, eventMsg, err.Error()) } else { err = fmt.Errorf("%v, error: %v", eventMsg, err) } @@ -333,10 +340,10 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if urlMap, err := lbc.tr.toURLMap(&ing); err != nil { syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err) } else if err := l7.UpdateUrlMap(urlMap); err != nil { - lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "UrlMap", err.Error()) + lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error()) syncError = fmt.Errorf("%v, update url map error: %v", syncError, err) } else if err := lbc.updateIngressStatus(l7, ing); err != nil { - lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "Status", err.Error()) + lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error()) syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err) } return syncError @@ -354,8 +361,8 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing return err } currIng.Status = extensions.IngressStatus{ - LoadBalancer: api_v1.LoadBalancerStatus{ - Ingress: []api_v1.LoadBalancerIngress{ + LoadBalancer: apiv1.LoadBalancerStatus{ + Ingress: []apiv1.LoadBalancerIngress{ {IP: ip}, }, }, @@ -369,7 +376,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing if _, err := ingClient.UpdateStatus(currIng); err != nil { return err } - lbc.recorder.Eventf(currIng, api_v1.EventTypeNormal, "CREATE", "ip: %v", ip) + lbc.recorder.Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip) } } // Update annotations through /update endpoint @@ -437,11 +444,11 @@ func (lbc *LoadBalancerController) syncNodes(key string) error { } func getNodeReadyPredicate() listers.NodeConditionPredicate { - return func(node *api_v1.Node) bool { + return func(node *apiv1.Node) bool { for ix := range node.Status.Conditions { condition := &node.Status.Conditions[ix] - if condition.Type == api_v1.NodeReady { - return condition.Status == api_v1.ConditionTrue + if condition.Type == apiv1.NodeReady { + return condition.Status == apiv1.ConditionTrue } } return false diff --git a/controllers/gce/controller/controller_test.go b/controllers/gce/controller/controller_test.go index f2de56537..b12d01f1d 100644 --- a/controllers/gce/controller/controller_test.go +++ b/controllers/gce/controller/controller_test.go @@ -53,7 +53,8 @@ func defaultBackendName(clusterName string) string { // newLoadBalancerController create a loadbalancer controller. func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController { kubeClient := fake.NewSimpleClientset() - lb, err := NewLoadBalancerController(kubeClient, cm.ClusterManager, 1*time.Second, api_v1.NamespaceAll) + ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second) + lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager) if err != nil { t.Fatalf("%v", err) } diff --git a/controllers/gce/main.go b/controllers/gce/main.go index 4dda53230..f79263c52 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -256,8 +256,10 @@ func main() { clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager } + ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod) + // Start loadbalancer controller - lbc, err := controller.NewLoadBalancerController(kubeClient, clusterManager, *resyncPeriod, *watchNamespace) + lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager) if err != nil { glog.Fatalf("%v", err) } @@ -268,6 +270,7 @@ func main() { go registerHandlers(lbc) go handleSigterm(lbc, *deleteAllOnQuit) + ctx.Start() lbc.Run() for { glog.Infof("Handled quit, awaiting pod deletion.")