diff --git a/controllers/nginx/controller.go b/controllers/nginx/controller.go index 52f64dcd6..0124f06fe 100644 --- a/controllers/nginx/controller.go +++ b/controllers/nginx/controller.go @@ -116,7 +116,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + eventBroadcaster.StartRecordingToSink(kubeClient.Events(namespace)) lbc := loadBalancerController{ client: kubeClient, @@ -139,6 +139,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura addIng := obj.(*extensions.Ingress) lbc.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name)) lbc.ingQueue.enqueue(obj) + lbc.svcEpQueue.enqueue(obj) lbc.syncQueue.enqueue(obj) }, DeleteFunc: func(obj interface{}) { @@ -151,6 +152,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura upIng := cur.(*extensions.Ingress) lbc.recorder.Eventf(upIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("%s/%s", upIng.Namespace, upIng.Name)) lbc.ingQueue.enqueue(cur) + lbc.svcEpQueue.enqueue(cur) lbc.syncQueue.enqueue(cur) } }, @@ -170,17 +172,6 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura }, } - svcEventHandler := framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - lbc.svcEpQueue.enqueue(obj) - }, - UpdateFunc: func(old, cur interface{}) { - if !reflect.DeepEqual(old, cur) { - lbc.svcEpQueue.enqueue(cur) - } - }, - } - lbc.ingLister.Store, lbc.ingController = framework.NewInformer( &cache.ListWatch{ ListFunc: ingressListFunc(lbc.client, namespace), @@ -200,7 +191,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura ListFunc: serviceListFunc(lbc.client, namespace), WatchFunc: serviceWatchFunc(lbc.client, namespace), }, - &api.Service{}, resyncPeriod, svcEventHandler) + &api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{}) return &lbc, nil } @@ -263,27 +254,54 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) { return } - svcObj, svcExists, err := lbc.svcLister.GetByKey(key) + glog.V(4).Infof("checking if service %v uses named ports to update annotation %v", key, namedPortAnnotation) + + ingObj, ingExists, err := lbc.ingLister.Store.GetByKey(key) if err != nil { glog.Warningf("error getting service %v: %v", key, err) return } - if !svcExists { + if !ingExists { glog.Warningf("service %v not found", key) return } - svc := svcObj.(*api.Service) - if svc.Spec.Selector == nil { - return - } + ing := ingObj.(*extensions.Ingress) + for _, rule := range ing.Spec.Rules { + if rule.IngressRuleValue.HTTP == nil { + continue + } + for _, path := range rule.HTTP.Paths { + svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName) + svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey) + if err != nil { + glog.Infof("error getting service %v from the cache: %v", svcKey, err) + continue + } + + if !svcExists { + glog.Warningf("service %v does no exists", svcKey) + continue + } + + svc := svcObj.(*api.Service) + if svc.Spec.Selector == nil { + return + } + + lbc.checkSvcForUpdate(svc) + } + } +} + +func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) { pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{ LabelSelector: labels.Set(svc.Spec.Selector).AsSelector(), }) if err != nil { - glog.Errorf("error searching service pods %q: %v", key, err) + glog.Errorf("error searching service pods %v/%v: %v", svc.Namespace, svc.Name, err) return } @@ -291,7 +309,7 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) { for i := range pods.Items { pod := &pods.Items[i] - + glog.V(4).Infof("checking pod %v/%v for named port information", pod.Namespace, pod.Name) for i := range svc.Spec.Ports { servicePort := &svc.Spec.Ports[i] @@ -299,7 +317,7 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) { if err != nil { portNum, err := podutil.FindPort(pod, servicePort) if err != nil { - glog.V(4).Infof("Failed to find port for service %s/%s: %v", svc.Namespace, svc.Name, err) + glog.V(4).Infof("failed to find port for service %s/%s: %v", svc.Namespace, svc.Name, err) continue } @@ -312,16 +330,18 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) { } } - if !reflect.DeepEqual(svc.ObjectMeta.Annotations, namedPorts) { + if svc.ObjectMeta.Annotations == nil { + svc.ObjectMeta.Annotations = map[string]string{} + } + + curNamedPort := svc.ObjectMeta.Annotations[namedPortAnnotation] + if !reflect.DeepEqual(curNamedPort, namedPorts) { data, _ := json.Marshal(namedPorts) - if svc.ObjectMeta.Annotations == nil { - svc.ObjectMeta.Annotations = map[string]string{} - } svc.ObjectMeta.Annotations[namedPortAnnotation] = string(data) glog.Infof("updating service %v with new named port mappings", svc.Name) _, err := lbc.client.Services(svc.Namespace).Update(svc) if err != nil { - glog.Errorf("Error syncing service %q: %v", key, err) + glog.Errorf("error syncing service %v/%v: %v", svc.Namespace, svc.Name, err) } } } @@ -879,6 +899,8 @@ func (lbc *loadBalancerController) Run() { go lbc.endpController.Run(lbc.stopCh) go lbc.svcController.Run(lbc.stopCh) + time.Sleep(1 * time.Second) + go lbc.syncQueue.run(time.Second, lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh) go lbc.svcEpQueue.run(time.Second, lbc.stopCh)