From 2632fe566baff7f844fbcc0eba6b380248daabc3 Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Wed, 30 Mar 2016 20:12:37 -0300 Subject: [PATCH] Update Ingress status information in nginx controller --- controllers/nginx/controller.go | 110 +++++++++++++++++++++++++++++++- controllers/nginx/main.go | 22 ++++++- controllers/nginx/utils.go | 21 ++++++ 3 files changed, 148 insertions(+), 5 deletions(-) diff --git a/controllers/nginx/controller.go b/controllers/nginx/controller.go index 8f09eafea..08bd13776 100644 --- a/controllers/nginx/controller.go +++ b/controllers/nginx/controller.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" @@ -64,6 +65,8 @@ type loadBalancerController struct { tcpConfigMap string udpConfigMap string + recorder record.EventRecorder + syncQueue *taskQueue // stopLock is used to enforce only a single call to Stop is active. @@ -77,6 +80,11 @@ type loadBalancerController struct { // newLoadBalancerController creates a controller for nginx loadbalancer func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Duration, defaultSvc, namespace, nxgConfigMapName, tcpConfigMapName, udpConfigMapName string, lbRuntimeInfo *lbInfo) (*loadBalancerController, error) { + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + lbc := loadBalancerController{ client: kubeClient, stopCh: make(chan struct{}), @@ -86,10 +94,33 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura tcpConfigMap: tcpConfigMapName, udpConfigMap: udpConfigMapName, defaultSvc: defaultSvc, + recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "loadbalancer-controller"}), } lbc.syncQueue = NewTaskQueue(lbc.sync) + ingEventHandler := framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + addIng := obj.(*extensions.Ingress) + lbc.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name)) + lbc.updateIngressStatus(addIng) + lbc.syncQueue.enqueue(obj) + }, + DeleteFunc: func(obj interface{}) { + upIng := obj.(*extensions.Ingress) + lbc.recorder.Eventf(upIng, api.EventTypeNormal, "DELETE", fmt.Sprintf("%s/%s", upIng.Namespace, upIng.Name)) + lbc.syncQueue.enqueue(obj) + }, + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + upIng := cur.(*extensions.Ingress) + lbc.recorder.Eventf(upIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("%s/%s", upIng.Namespace, upIng.Name)) + lbc.updateIngressStatus(upIng) + lbc.syncQueue.enqueue(cur) + } + }, + } + eventHandler := framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { lbc.syncQueue.enqueue(obj) @@ -109,7 +140,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura ListFunc: ingressListFunc(lbc.client, namespace), WatchFunc: ingressWatchFunc(lbc.client, namespace), }, - &extensions.Ingress{}, resyncPeriod, eventHandler) + &extensions.Ingress{}, resyncPeriod, ingEventHandler) lbc.endpLister.Store, lbc.endpController = framework.NewInformer( &cache.ListWatch{ @@ -206,6 +237,39 @@ func (lbc *loadBalancerController) sync(key string) { }) } +func (lbc *loadBalancerController) updateIngressStatus(ing *extensions.Ingress) { + ingClient := lbc.client.Extensions().Ingress(ing.Namespace) + currIng, err := ingClient.Get(ing.Name) + if err != nil { + glog.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err) + return + } + + lbIPs := ing.Status.LoadBalancer.Ingress + if len(lbIPs) > 0 && !lbc.isStatusIPDefined(lbIPs) { + glog.Infof("Updating loadbalancer %v/%v with IP %v", ing.Namespace, ing.Name, lbc.lbInfo.Address) + currIng.Status.LoadBalancer.Ingress = append(currIng.Status.LoadBalancer.Ingress, api.LoadBalancerIngress{ + IP: lbc.lbInfo.Address, + }) + if _, err := ingClient.UpdateStatus(currIng); err != nil { + lbc.recorder.Eventf(currIng, api.EventTypeWarning, "UPDATE", "error: %v", err) + return + } + + lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", lbc.lbInfo.Address) + } +} + +func (lbc *loadBalancerController) isStatusIPDefined(lbings []api.LoadBalancerIngress) bool { + for _, lbing := range lbings { + if lbing.IP == lbc.lbInfo.Address { + return true + } + } + + return false +} + func (lbc *loadBalancerController) getTCPServices() []*nginx.Location { if lbc.tcpConfigMap == "" { // no configmap for TCP services @@ -571,17 +635,59 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints } // Stop stops the loadbalancer controller. -func (lbc *loadBalancerController) Stop() { +func (lbc *loadBalancerController) Stop() error { // Stop is invoked from the http endpoint. lbc.stopLock.Lock() defer lbc.stopLock.Unlock() // Only try draining the workqueue if we haven't already. if !lbc.shutdown { + + lbc.removeFromIngress() + close(lbc.stopCh) glog.Infof("shutting down controller queues") lbc.shutdown = true lbc.syncQueue.shutdown() + + return nil + } + + return fmt.Errorf("shutdown already in progress") +} + +func (lbc *loadBalancerController) removeFromIngress() { + ings := lbc.ingLister.Store.List() + glog.Infof("updating %v Ingress rule/s", len(ings)) + for _, cur := range ings { + ing := cur.(*extensions.Ingress) + + ingClient := lbc.client.Extensions().Ingress(ing.Namespace) + currIng, err := ingClient.Get(ing.Name) + if err != nil { + glog.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err) + continue + } + + lbIPs := ing.Status.LoadBalancer.Ingress + if len(lbIPs) > 0 && lbc.isStatusIPDefined(lbIPs) { + glog.Infof("Updating loadbalancer %v/%v. Removing IP %v", ing.Namespace, ing.Name, lbc.lbInfo.Address) + + for idx, lbStatus := range currIng.Status.LoadBalancer.Ingress { + if lbStatus.IP == lbc.lbInfo.Address { + currIng.Status.LoadBalancer.Ingress = append(currIng.Status.LoadBalancer.Ingress[:idx], + currIng.Status.LoadBalancer.Ingress[idx+1:]...) + break + } + } + + if _, err := ingClient.UpdateStatus(currIng); err != nil { + lbc.recorder.Eventf(currIng, api.EventTypeWarning, "UPDATE", "error: %v", err) + continue + } + + lbc.recorder.Eventf(currIng, api.EventTypeNormal, "DELETE", "ip: %v", lbc.lbInfo.Address) + } } } diff --git a/controllers/nginx/main.go b/controllers/nginx/main.go index 0511f49e5..60d94dc65 100644 --- a/controllers/nginx/main.go +++ b/controllers/nginx/main.go @@ -22,6 +22,8 @@ import ( "net/http" "net/http/pprof" "os" + "os/signal" + "syscall" "time" "github.com/golang/glog" @@ -32,7 +34,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/healthz" - "k8s.io/kubernetes/pkg/runtime" ) const ( @@ -111,6 +112,7 @@ func main() { } go registerHandlers(lbc) + go handleSigterm(lbc) lbc.Run() @@ -122,11 +124,10 @@ func main() { // lbInfo contains runtime information about the pod type lbInfo struct { - ObjectName string - DeployType runtime.Object Podname string PodIP string PodNamespace string + Address string } func registerHandlers(lbc *loadBalancerController) { @@ -149,3 +150,18 @@ func registerHandlers(lbc *loadBalancerController) { } glog.Fatal(server.ListenAndServe()) } + +func handleSigterm(lbc *loadBalancerController) { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGTERM) + <-signalChan + glog.Infof("Received SIGTERM, shutting down") + + exitCode := 0 + if err := lbc.Stop(); err != nil { + glog.Infof("Error during shutdown %v", err) + exitCode = 1 + } + glog.Infof("Exiting with %v", exitCode) + os.Exit(exitCode) +} diff --git a/controllers/nginx/utils.go b/controllers/nginx/utils.go index 122a3d810..eaf5b7423 100644 --- a/controllers/nginx/utils.go +++ b/controllers/nginx/utils.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util/wait" @@ -108,10 +109,30 @@ func getLBDetails(kubeClient *unversioned.Client) (*lbInfo, error) { return nil, fmt.Errorf("Unable to get POD information") } + node, err := kubeClient.Nodes().Get(pod.Spec.NodeName) + if err != nil { + return nil, err + } + + var externalIP string + for _, address := range node.Status.Addresses { + if address.Type == api.NodeExternalIP { + if address.Address != "" { + externalIP = address.Address + break + } + } + + if externalIP == "" && address.Type == api.NodeLegacyHostIP { + externalIP = address.Address + } + } + return &lbInfo{ PodIP: podIP, Podname: podName, PodNamespace: podNs, + Address: externalIP, }, nil }