move ControllerContext to util and enable NEG in main

This commit is contained in:
Minhan Xia 2017-10-02 16:52:02 -07:00
parent b75a9b33f2
commit fb4cc1cbeb
4 changed files with 56 additions and 36 deletions

View file

@ -27,8 +27,6 @@ import (
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
informerv1 "k8s.io/client-go/informers/core/v1"
informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
@ -37,6 +35,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/ingress/controllers/gce/loadbalancers"
"k8s.io/ingress/controllers/gce/utils"
)
var (
@ -54,33 +53,6 @@ var (
storeSyncPollPeriod = 5 * time.Second
)
// ControllerContext holds
type ControllerContext struct {
IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
// Stop is the stop channel shared among controllers
StopCh chan struct{}
}
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration) *ControllerContext {
return &ControllerContext{
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
StopCh: make(chan struct{}),
}
}
func (ctx *ControllerContext) Start() {
go ctx.IngressInformer.Run(ctx.StopCh)
go ctx.ServiceInformer.Run(ctx.StopCh)
go ctx.PodInformer.Run(ctx.StopCh)
go ctx.NodeInformer.Run(ctx.StopCh)
}
// LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct {
@ -119,7 +91,7 @@ type LoadBalancerController struct {
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) {
func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *utils.ControllerContext, clusterManager *ClusterManager, negEnabled bool) (*LoadBalancerController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{

View file

@ -53,8 +53,8 @@ func defaultBackendName(clusterName string) string {
// newLoadBalancerController create a loadbalancer controller.
func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController {
kubeClient := fake.NewSimpleClientset()
ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second)
lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager)
ctx := utils.NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, true)
lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager, true)
if err != nil {
t.Fatalf("%v", err)
}

View file

@ -47,6 +47,7 @@ import (
"k8s.io/ingress/controllers/gce/backends"
"k8s.io/ingress/controllers/gce/controller"
"k8s.io/ingress/controllers/gce/loadbalancers"
neg "k8s.io/ingress/controllers/gce/networkendpointgroup"
"k8s.io/ingress/controllers/gce/storage"
"k8s.io/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/cloudprovider"
@ -250,9 +251,10 @@ func main() {
}
var cloud *gce.GCECloud
var namer *utils.Namer
if *inCluster || *useRealCloud {
// Create cluster manager
namer, err := newNamer(kubeClient, *clusterName, controller.DefaultFirewallName)
namer, err = newNamer(kubeClient, *clusterName, controller.DefaultFirewallName)
if err != nil {
glog.Fatalf("%v", err)
}
@ -287,10 +289,9 @@ func main() {
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
}
ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod)
ctx := utils.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup))
// Start loadbalancer controller
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager)
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup))
if err != nil {
glog.Fatalf("%v", err)
}
@ -299,6 +300,13 @@ func main() {
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName())
}
clusterManager.Init(&controller.GCETranslator{LoadBalancerController: lbc})
// Start NEG controller
if cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup) {
negController, _ := neg.NewController(kubeClient, cloud, ctx, lbc.Translator, namer, *resyncPeriod)
go negController.Run(ctx.StopCh)
}
go registerHandlers(lbc)
go handleSigterm(lbc, *deleteAllOnQuit)

View file

@ -27,6 +27,11 @@ import (
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
informerv1 "k8s.io/client-go/informers/core/v1"
informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"time"
)
const (
@ -87,6 +92,41 @@ const (
ProtocolHTTPS AppProtocol = "HTTPS"
)
// ControllerContext holds
type ControllerContext struct {
IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
EndpointInformer cache.SharedIndexInformer
// Stop is the stop channel shared among controllers
StopCh chan struct{}
}
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext {
context := &ControllerContext{
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
StopCh: make(chan struct{}),
}
if enableEndpointsInformer {
context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
}
return context
}
func (ctx *ControllerContext) Start() {
go ctx.IngressInformer.Run(ctx.StopCh)
go ctx.ServiceInformer.Run(ctx.StopCh)
go ctx.PodInformer.Run(ctx.StopCh)
go ctx.NodeInformer.Run(ctx.StopCh)
if ctx.EndpointInformer != nil {
go ctx.EndpointInformer.Run(ctx.StopCh)
}
}
type AppProtocol string
// Namer handles centralized naming for the cluster.