diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index e0a8de1fa..4c6ecb139 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -27,8 +27,6 @@ import ( apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - informerv1 "k8s.io/client-go/informers/core/v1" - informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1" "k8s.io/client-go/kubernetes" scheme "k8s.io/client-go/kubernetes/scheme" unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" @@ -37,6 +35,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/ingress/controllers/gce/loadbalancers" + "k8s.io/ingress/controllers/gce/utils" ) var ( @@ -54,33 +53,6 @@ var ( storeSyncPollPeriod = 5 * time.Second ) -// ControllerContext holds -type ControllerContext struct { - IngressInformer cache.SharedIndexInformer - ServiceInformer cache.SharedIndexInformer - PodInformer cache.SharedIndexInformer - NodeInformer cache.SharedIndexInformer - // Stop is the stop channel shared among controllers - StopCh chan struct{} -} - -func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration) *ControllerContext { - return &ControllerContext{ - IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - StopCh: make(chan struct{}), - } -} - -func (ctx *ControllerContext) Start() { - go ctx.IngressInformer.Run(ctx.StopCh) - go ctx.ServiceInformer.Run(ctx.StopCh) - go ctx.PodInformer.Run(ctx.StopCh) - go ctx.NodeInformer.Run(ctx.StopCh) -} - // LoadBalancerController watches the kubernetes api and adds/removes services // from the loadbalancer, via loadBalancerConfig. type LoadBalancerController struct { @@ -119,7 +91,7 @@ type LoadBalancerController struct { // - clusterManager: A ClusterManager capable of creating all cloud resources // required for L7 loadbalancing. // - resyncPeriod: Watchers relist from the Kubernetes API server this often. -func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) { +func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *utils.ControllerContext, clusterManager *ClusterManager, negEnabled bool) (*LoadBalancerController, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ diff --git a/controllers/gce/controller/controller_test.go b/controllers/gce/controller/controller_test.go index b12d01f1d..a2003aeea 100644 --- a/controllers/gce/controller/controller_test.go +++ b/controllers/gce/controller/controller_test.go @@ -53,8 +53,8 @@ func defaultBackendName(clusterName string) string { // newLoadBalancerController create a loadbalancer controller. func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController { kubeClient := fake.NewSimpleClientset() - ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second) - lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager) + ctx := utils.NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, true) + lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager, true) if err != nil { t.Fatalf("%v", err) } diff --git a/controllers/gce/main.go b/controllers/gce/main.go index 35d9145bd..8d05f583e 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -47,6 +47,7 @@ import ( "k8s.io/ingress/controllers/gce/backends" "k8s.io/ingress/controllers/gce/controller" "k8s.io/ingress/controllers/gce/loadbalancers" + neg "k8s.io/ingress/controllers/gce/networkendpointgroup" "k8s.io/ingress/controllers/gce/storage" "k8s.io/ingress/controllers/gce/utils" "k8s.io/kubernetes/pkg/cloudprovider" @@ -250,9 +251,10 @@ func main() { } var cloud *gce.GCECloud + var namer *utils.Namer if *inCluster || *useRealCloud { // Create cluster manager - namer, err := newNamer(kubeClient, *clusterName, controller.DefaultFirewallName) + namer, err = newNamer(kubeClient, *clusterName, controller.DefaultFirewallName) if err != nil { glog.Fatalf("%v", err) } @@ -287,10 +289,9 @@ func main() { clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager } - ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod) - + ctx := utils.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)) // Start loadbalancer controller - lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager) + lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)) if err != nil { glog.Fatalf("%v", err) } @@ -299,6 +300,13 @@ func main() { glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName()) } clusterManager.Init(&controller.GCETranslator{LoadBalancerController: lbc}) + + // Start NEG controller + if cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup) { + negController, _ := neg.NewController(kubeClient, cloud, ctx, lbc.Translator, namer, *resyncPeriod) + go negController.Run(ctx.StopCh) + } + go registerHandlers(lbc) go handleSigterm(lbc, *deleteAllOnQuit) diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index c0486f557..929b3bb85 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -27,6 +27,11 @@ import ( "github.com/golang/glog" compute "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" + informerv1 "k8s.io/client-go/informers/core/v1" + informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "time" ) const ( @@ -87,6 +92,41 @@ const ( ProtocolHTTPS AppProtocol = "HTTPS" ) +// ControllerContext holds +type ControllerContext struct { + IngressInformer cache.SharedIndexInformer + ServiceInformer cache.SharedIndexInformer + PodInformer cache.SharedIndexInformer + NodeInformer cache.SharedIndexInformer + EndpointInformer cache.SharedIndexInformer + // Stop is the stop channel shared among controllers + StopCh chan struct{} +} + +func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext { + context := &ControllerContext{ + IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + StopCh: make(chan struct{}), + } + if enableEndpointsInformer { + context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + } + return context +} + +func (ctx *ControllerContext) Start() { + go ctx.IngressInformer.Run(ctx.StopCh) + go ctx.ServiceInformer.Run(ctx.StopCh) + go ctx.PodInformer.Run(ctx.StopCh) + go ctx.NodeInformer.Run(ctx.StopCh) + if ctx.EndpointInformer != nil { + go ctx.EndpointInformer.Run(ctx.StopCh) + } +} + type AppProtocol string // Namer handles centralized naming for the cluster.