From 16b4af504bbea99fd0475f19cd8beddd10157b26 Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Thu, 14 Apr 2016 20:42:37 -0300 Subject: [PATCH] Fix issues with named ports --- controllers/nginx/controller.go | 148 +++++++++++++++++++++++++++++--- 1 file changed, 138 insertions(+), 10 deletions(-) diff --git a/controllers/nginx/controller.go b/controllers/nginx/controller.go index 522b02937..52f64dcd6 100644 --- a/controllers/nginx/controller.go +++ b/controllers/nginx/controller.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "encoding/json" "fmt" "reflect" "sort" @@ -28,11 +29,13 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + podutil "k8s.io/kubernetes/pkg/api/pod" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/watch" @@ -41,14 +44,35 @@ import ( ) const ( - defUpstreamName = "upstream-default-backend" - defServerName = "_" + defUpstreamName = "upstream-default-backend" + defServerName = "_" + namedPortAnnotation = "kubernetes.io/ingress-named-ports" ) var ( keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc ) +type namedPortMapping map[string]string + +func (npm namedPortMapping) getPort(name string) (string, bool) { + val, ok := npm.getMappings()[name] + return val, ok +} + +func (npm namedPortMapping) getMappings() map[string]string { + data := npm[namedPortAnnotation] + var mapping map[string]string + if data == "" { + return mapping + } + if err := json.Unmarshal([]byte(data), &mapping); err != nil { + glog.Errorf("unexpected error reading annotations: %v", err) + } + + return mapping +} + // loadBalancerController watches the kubernetes api and adds/removes services // from the loadbalancer type loadBalancerController struct { @@ -74,6 +98,10 @@ type loadBalancerController struct { // this avoids a sync execution in the ResourceEventHandlerFuncs ingQueue *taskQueue + // used to update the annotation that matches a service using one or + // more named ports to an endpoint port + svcEpQueue *taskQueue + // stopLock is used to enforce only a single call to Stop is active. // Needed because we allow stopping through an http endpoint and // allowing concurrent stoppers leads to stack traces. @@ -104,6 +132,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura lbc.syncQueue = NewTaskQueue(lbc.sync) lbc.ingQueue = NewTaskQueue(lbc.updateIngressStatus) + lbc.svcEpQueue = NewTaskQueue(lbc.updateEpNamedPorts) ingEventHandler := framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -141,6 +170,17 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura }, } + svcEventHandler := framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + lbc.svcEpQueue.enqueue(obj) + }, + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + lbc.svcEpQueue.enqueue(cur) + } + }, + } + lbc.ingLister.Store, lbc.ingController = framework.NewInformer( &cache.ListWatch{ ListFunc: ingressListFunc(lbc.client, namespace), @@ -160,7 +200,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura ListFunc: serviceListFunc(lbc.client, namespace), WatchFunc: serviceWatchFunc(lbc.client, namespace), }, - &api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{}) + &api.Service{}, resyncPeriod, svcEventHandler) return &lbc, nil } @@ -217,6 +257,75 @@ func (lbc *loadBalancerController) getUDPConfigMap(ns, name string) (*api.Config return lbc.client.ConfigMaps(ns).Get(name) } +func (lbc *loadBalancerController) updateEpNamedPorts(key string) { + if !lbc.controllersInSync() { + lbc.svcEpQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced")) + return + } + + svcObj, svcExists, err := lbc.svcLister.GetByKey(key) + if err != nil { + glog.Warningf("error getting service %v: %v", key, err) + return + } + + if !svcExists { + glog.Warningf("service %v not found", key) + return + } + + svc := svcObj.(*api.Service) + if svc.Spec.Selector == nil { + return + } + + pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{ + LabelSelector: labels.Set(svc.Spec.Selector).AsSelector(), + }) + if err != nil { + glog.Errorf("error searching service pods %q: %v", key, err) + return + } + + namedPorts := map[string]string{} + + for i := range pods.Items { + pod := &pods.Items[i] + + for i := range svc.Spec.Ports { + servicePort := &svc.Spec.Ports[i] + + _, err := strconv.Atoi(servicePort.TargetPort.StrVal) + if err != nil { + portNum, err := podutil.FindPort(pod, servicePort) + if err != nil { + glog.V(4).Infof("Failed to find port for service %s/%s: %v", svc.Namespace, svc.Name, err) + continue + } + + if servicePort.TargetPort.StrVal == "" { + continue + } + + namedPorts[servicePort.TargetPort.StrVal] = fmt.Sprintf("%v", portNum) + } + } + } + + if !reflect.DeepEqual(svc.ObjectMeta.Annotations, namedPorts) { + data, _ := json.Marshal(namedPorts) + if svc.ObjectMeta.Annotations == nil { + svc.ObjectMeta.Annotations = map[string]string{} + } + svc.ObjectMeta.Annotations[namedPortAnnotation] = string(data) + glog.Infof("updating service %v with new named port mappings", svc.Name) + _, err := lbc.client.Services(svc.Namespace).Update(svc) + if err != nil { + glog.Errorf("Error syncing service %q: %v", key, err) + } + } +} + func (lbc *loadBalancerController) sync(key string) { if !lbc.controllersInSync() { lbc.syncQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced")) @@ -373,7 +482,12 @@ func (lbc *loadBalancerController) getServices(data map[string]string, proto api var endps []nginx.UpstreamServer targetPort, err := strconv.Atoi(svcPort[1]) if err != nil { - endps = lbc.getEndpoints(svc, intstr.FromString(svcPort[1]), proto) + for _, sp := range svc.Spec.Ports { + if sp.Name == svcPort[1] { + endps = lbc.getEndpoints(svc, sp.TargetPort, proto) + break + } + } } else { // we need to use the TargetPort (where the endpoints are running) for _, sp := range svc.Spec.Ports { @@ -461,7 +575,7 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng for _, path := range rule.HTTP.Paths { - upsName := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.IntValue()) + upsName := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.StrVal) ups := upstreams[upsName] svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName) @@ -479,8 +593,13 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng svc := svcObj.(*api.Service) for _, servicePort := range svc.Spec.Ports { - if servicePort.Port == path.Backend.ServicePort.IntValue() { - endps := lbc.getEndpoints(svc, servicePort.TargetPort, api.ProtocolTCP) + port := servicePort.TargetPort + if servicePort.Name != "" { + port = intstr.FromString(servicePort.Name) + } + + if port == path.Backend.ServicePort { + endps := lbc.getEndpoints(svc, port, api.ProtocolTCP) if len(endps) == 0 { glog.Warningf("service %v does no have any active endpoints", svcKey) } @@ -546,7 +665,7 @@ func (lbc *loadBalancerController) createUpstreams(data []interface{}) map[strin } for _, path := range rule.HTTP.Paths { - name := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.IntValue()) + name := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.StrVal) if _, ok := upstreams[name]; !ok { upstreams[name] = nginx.NewUpstream(name) } @@ -666,8 +785,16 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints targetPort = epPort.Port } case intstr.String: - if epPort.Name == servicePort.StrVal { - targetPort = epPort.Port + if val, ok := namedPortMapping(s.ObjectMeta.Annotations).getPort(servicePort.StrVal); ok { + port, err := strconv.Atoi(val) + if err != nil { + glog.Warningf("%v is not valid as a port", val) + continue + } + + if epPort.Protocol == proto { + targetPort = port + } } } @@ -754,6 +881,7 @@ func (lbc *loadBalancerController) Run() { go lbc.syncQueue.run(time.Second, lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh) + go lbc.svcEpQueue.run(time.Second, lbc.stopCh) <-lbc.stopCh glog.Infof("shutting down NGINX loadbalancer controller")