Only update service annotations if it contains named ports
This commit is contained in:
parent
4d25306b52
commit
96a66aa6fa
2 changed files with 35 additions and 80 deletions
|
@ -57,12 +57,15 @@ var (
|
|||
|
||||
type namedPortMapping map[string]string
|
||||
|
||||
// getPort returns the port defined in a named port
|
||||
func (npm namedPortMapping) getPort(name string) (string, bool) {
|
||||
val, ok := npm.getMappings()[name]
|
||||
val, ok := npm.getPortMappings()[name]
|
||||
return val, ok
|
||||
}
|
||||
|
||||
func (npm namedPortMapping) getMappings() map[string]string {
|
||||
// getPortMappings returns the map containing the
|
||||
// mapping of named port names and the port number
|
||||
func (npm namedPortMapping) getPortMappings() map[string]string {
|
||||
data := npm[namedPortAnnotation]
|
||||
var mapping map[string]string
|
||||
if data == "" {
|
||||
|
@ -100,10 +103,6 @@ type loadBalancerController struct {
|
|||
// this avoids a sync execution in the ResourceEventHandlerFuncs
|
||||
ingQueue *taskQueue
|
||||
|
||||
// used to update the annotation that matches a service using one or
|
||||
// more named ports to an endpoint port
|
||||
svcEpQueue *taskQueue
|
||||
|
||||
// stopLock is used to enforce only a single call to Stop is active.
|
||||
// Needed because we allow stopping through an http endpoint and
|
||||
// allowing concurrent stoppers leads to stack traces.
|
||||
|
@ -136,14 +135,12 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
|
|||
|
||||
lbc.syncQueue = NewTaskQueue(lbc.sync)
|
||||
lbc.ingQueue = NewTaskQueue(lbc.updateIngressStatus)
|
||||
lbc.svcEpQueue = NewTaskQueue(lbc.updateEpNamedPorts)
|
||||
|
||||
ingEventHandler := framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
addIng := obj.(*extensions.Ingress)
|
||||
lbc.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
|
||||
lbc.ingQueue.enqueue(obj)
|
||||
lbc.svcEpQueue.enqueue(obj)
|
||||
lbc.syncQueue.enqueue(obj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
|
@ -156,7 +153,6 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
|
|||
upIng := cur.(*extensions.Ingress)
|
||||
lbc.recorder.Eventf(upIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("%s/%s", upIng.Namespace, upIng.Name))
|
||||
lbc.ingQueue.enqueue(cur)
|
||||
lbc.svcEpQueue.enqueue(cur)
|
||||
lbc.syncQueue.enqueue(cur)
|
||||
}
|
||||
},
|
||||
|
@ -252,84 +248,26 @@ func (lbc *loadBalancerController) getUDPConfigMap(ns, name string) (*api.Config
|
|||
return lbc.client.ConfigMaps(ns).Get(name)
|
||||
}
|
||||
|
||||
func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
|
||||
if !lbc.controllersInSync() {
|
||||
time.Sleep(podStoreSyncedPollPeriod)
|
||||
lbc.svcEpQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("checking if service %v uses named ports to update annotation %v", key, namedPortAnnotation)
|
||||
|
||||
ingObj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Warningf("error getting service %v: %v", key, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !ingExists {
|
||||
glog.Warningf("service %v not found", key)
|
||||
return
|
||||
}
|
||||
|
||||
ing := ingObj.(*extensions.Ingress)
|
||||
for _, rule := range ing.Spec.Rules {
|
||||
if rule.IngressRuleValue.HTTP == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, path := range rule.HTTP.Paths {
|
||||
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
|
||||
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
|
||||
if err != nil {
|
||||
glog.Infof("error getting service %v from the cache: %v", svcKey, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !svcExists {
|
||||
glog.Warningf("service %v does no exists", svcKey)
|
||||
continue
|
||||
}
|
||||
|
||||
svc := svcObj.(*api.Service)
|
||||
if svc.Spec.Selector == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// check to avoid a call to checkSvcForUpdate if the port is not a string
|
||||
_, err = strconv.Atoi(path.Backend.ServicePort.StrVal)
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = lbc.checkSvcForUpdate(svc)
|
||||
if err != nil {
|
||||
lbc.svcEpQueue.requeue(key, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkSvcForUpdate verifies if one of the running pods for a service contains
|
||||
// named port. If the annotation in the service does not exists or is not equals
|
||||
// to the port mapping obtained from the pod the service must be updated to reflect
|
||||
// the current state
|
||||
func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {
|
||||
func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) (map[string]string, error) {
|
||||
// get the pods associated with the service
|
||||
// TODO: switch this to a watch
|
||||
pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{
|
||||
LabelSelector: labels.Set(svc.Spec.Selector).AsSelector(),
|
||||
})
|
||||
|
||||
namedPorts := map[string]string{}
|
||||
if err != nil {
|
||||
return fmt.Errorf("error searching service pods %v/%v: %v", svc.Namespace, svc.Name, err)
|
||||
return namedPorts, fmt.Errorf("error searching service pods %v/%v: %v", svc.Namespace, svc.Name, err)
|
||||
}
|
||||
|
||||
if len(pods.Items) == 0 {
|
||||
return nil
|
||||
return namedPorts, nil
|
||||
}
|
||||
|
||||
namedPorts := map[string]string{}
|
||||
// we need to check only one pod searching for named ports
|
||||
pod := &pods.Items[0]
|
||||
glog.V(4).Infof("checking pod %v/%v for named port information", pod.Namespace, pod.Name)
|
||||
|
@ -362,7 +300,7 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {
|
|||
|
||||
newSvc, err := lbc.client.Services(svc.Namespace).Get(svc.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting service %v/%v: %v", svc.Namespace, svc.Name, err)
|
||||
return namedPorts, fmt.Errorf("error getting service %v/%v: %v", svc.Namespace, svc.Name, err)
|
||||
}
|
||||
|
||||
if newSvc.ObjectMeta.Annotations == nil {
|
||||
|
@ -371,13 +309,15 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {
|
|||
|
||||
newSvc.ObjectMeta.Annotations[namedPortAnnotation] = string(data)
|
||||
glog.Infof("updating service %v with new named port mappings", svc.Name)
|
||||
_, err = lbc.client.Services(svc.Namespace).Update(svc)
|
||||
_, err = lbc.client.Services(svc.Namespace).Update(newSvc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error syncing service %v/%v: %v", svc.Namespace, svc.Name, err)
|
||||
return namedPorts, fmt.Errorf("error syncing service %v/%v: %v", svc.Namespace, svc.Name, err)
|
||||
}
|
||||
|
||||
return newSvc.ObjectMeta.Annotations, nil
|
||||
}
|
||||
|
||||
return nil
|
||||
return namedPorts, nil
|
||||
}
|
||||
|
||||
func (lbc *loadBalancerController) sync(key string) {
|
||||
|
@ -889,14 +829,30 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints
|
|||
targetPort = epPort.Port
|
||||
}
|
||||
case intstr.String:
|
||||
if val, ok := namedPortMapping(s.ObjectMeta.Annotations).getPort(servicePort.StrVal); ok {
|
||||
namedPorts := s.ObjectMeta.Annotations
|
||||
val, ok := namedPortMapping(namedPorts).getPort(servicePort.StrVal)
|
||||
if ok {
|
||||
port, err := strconv.Atoi(val)
|
||||
if err != nil {
|
||||
glog.Warningf("%v is not valid as a port", val)
|
||||
continue
|
||||
}
|
||||
|
||||
if epPort.Protocol == proto {
|
||||
targetPort = port
|
||||
} else {
|
||||
newnp, err := lbc.checkSvcForUpdate(s)
|
||||
if err != nil {
|
||||
glog.Warningf("error mapping service ports: %v", err)
|
||||
continue
|
||||
}
|
||||
val, ok := namedPortMapping(newnp).getPort(servicePort.StrVal)
|
||||
if ok {
|
||||
port, err := strconv.Atoi(val)
|
||||
if err != nil {
|
||||
glog.Warningf("%v is not valid as a port", val)
|
||||
continue
|
||||
}
|
||||
|
||||
targetPort = port
|
||||
}
|
||||
}
|
||||
|
@ -988,7 +944,6 @@ func (lbc *loadBalancerController) Run() {
|
|||
|
||||
go lbc.syncQueue.run(time.Second, lbc.stopCh)
|
||||
go lbc.ingQueue.run(time.Second, lbc.stopCh)
|
||||
go lbc.svcEpQueue.run(time.Second, lbc.stopCh)
|
||||
|
||||
<-lbc.stopCh
|
||||
glog.Infof("shutting down NGINX loadbalancer controller")
|
||||
|
|
|
@ -40,7 +40,7 @@ spec:
|
|||
- containerPort: 80
|
||||
hostPort: 80
|
||||
- containerPort: 443
|
||||
hostPort: 4430
|
||||
hostPort: 443
|
||||
args:
|
||||
- /nginx-ingress-controller
|
||||
- --default-backend-service=default/default-http-backend
|
||||
|
|
Loading…
Reference in a new issue