Fix issues with named ports

This commit is contained in:
Manuel de Brito Fontes 2016-04-14 20:42:37 -03:00
parent f05eec6781
commit 16b4af504b

View file

@ -17,6 +17,7 @@ limitations under the License.
package main package main
import ( import (
"encoding/json"
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
@ -28,11 +29,13 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
podutil "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -43,12 +46,33 @@ import (
const ( const (
defUpstreamName = "upstream-default-backend" defUpstreamName = "upstream-default-backend"
defServerName = "_" defServerName = "_"
namedPortAnnotation = "kubernetes.io/ingress-named-ports"
) )
var ( var (
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
) )
type namedPortMapping map[string]string
func (npm namedPortMapping) getPort(name string) (string, bool) {
val, ok := npm.getMappings()[name]
return val, ok
}
func (npm namedPortMapping) getMappings() map[string]string {
data := npm[namedPortAnnotation]
var mapping map[string]string
if data == "" {
return mapping
}
if err := json.Unmarshal([]byte(data), &mapping); err != nil {
glog.Errorf("unexpected error reading annotations: %v", err)
}
return mapping
}
// loadBalancerController watches the kubernetes api and adds/removes services // loadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer // from the loadbalancer
type loadBalancerController struct { type loadBalancerController struct {
@ -74,6 +98,10 @@ type loadBalancerController struct {
// this avoids a sync execution in the ResourceEventHandlerFuncs // this avoids a sync execution in the ResourceEventHandlerFuncs
ingQueue *taskQueue ingQueue *taskQueue
// used to update the annotation that matches a service using one or
// more named ports to an endpoint port
svcEpQueue *taskQueue
// stopLock is used to enforce only a single call to Stop is active. // stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and // Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces. // allowing concurrent stoppers leads to stack traces.
@ -104,6 +132,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
lbc.syncQueue = NewTaskQueue(lbc.sync) lbc.syncQueue = NewTaskQueue(lbc.sync)
lbc.ingQueue = NewTaskQueue(lbc.updateIngressStatus) lbc.ingQueue = NewTaskQueue(lbc.updateIngressStatus)
lbc.svcEpQueue = NewTaskQueue(lbc.updateEpNamedPorts)
ingEventHandler := framework.ResourceEventHandlerFuncs{ ingEventHandler := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
@ -141,6 +170,17 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
}, },
} }
svcEventHandler := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
lbc.svcEpQueue.enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
lbc.svcEpQueue.enqueue(cur)
}
},
}
lbc.ingLister.Store, lbc.ingController = framework.NewInformer( lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: ingressListFunc(lbc.client, namespace), ListFunc: ingressListFunc(lbc.client, namespace),
@ -160,7 +200,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
ListFunc: serviceListFunc(lbc.client, namespace), ListFunc: serviceListFunc(lbc.client, namespace),
WatchFunc: serviceWatchFunc(lbc.client, namespace), WatchFunc: serviceWatchFunc(lbc.client, namespace),
}, },
&api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{}) &api.Service{}, resyncPeriod, svcEventHandler)
return &lbc, nil return &lbc, nil
} }
@ -217,6 +257,75 @@ func (lbc *loadBalancerController) getUDPConfigMap(ns, name string) (*api.Config
return lbc.client.ConfigMaps(ns).Get(name) return lbc.client.ConfigMaps(ns).Get(name)
} }
func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
if !lbc.controllersInSync() {
lbc.svcEpQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
return
}
svcObj, svcExists, err := lbc.svcLister.GetByKey(key)
if err != nil {
glog.Warningf("error getting service %v: %v", key, err)
return
}
if !svcExists {
glog.Warningf("service %v not found", key)
return
}
svc := svcObj.(*api.Service)
if svc.Spec.Selector == nil {
return
}
pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{
LabelSelector: labels.Set(svc.Spec.Selector).AsSelector(),
})
if err != nil {
glog.Errorf("error searching service pods %q: %v", key, err)
return
}
namedPorts := map[string]string{}
for i := range pods.Items {
pod := &pods.Items[i]
for i := range svc.Spec.Ports {
servicePort := &svc.Spec.Ports[i]
_, err := strconv.Atoi(servicePort.TargetPort.StrVal)
if err != nil {
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
glog.V(4).Infof("Failed to find port for service %s/%s: %v", svc.Namespace, svc.Name, err)
continue
}
if servicePort.TargetPort.StrVal == "" {
continue
}
namedPorts[servicePort.TargetPort.StrVal] = fmt.Sprintf("%v", portNum)
}
}
}
if !reflect.DeepEqual(svc.ObjectMeta.Annotations, namedPorts) {
data, _ := json.Marshal(namedPorts)
if svc.ObjectMeta.Annotations == nil {
svc.ObjectMeta.Annotations = map[string]string{}
}
svc.ObjectMeta.Annotations[namedPortAnnotation] = string(data)
glog.Infof("updating service %v with new named port mappings", svc.Name)
_, err := lbc.client.Services(svc.Namespace).Update(svc)
if err != nil {
glog.Errorf("Error syncing service %q: %v", key, err)
}
}
}
func (lbc *loadBalancerController) sync(key string) { func (lbc *loadBalancerController) sync(key string) {
if !lbc.controllersInSync() { if !lbc.controllersInSync() {
lbc.syncQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced")) lbc.syncQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
@ -373,7 +482,12 @@ func (lbc *loadBalancerController) getServices(data map[string]string, proto api
var endps []nginx.UpstreamServer var endps []nginx.UpstreamServer
targetPort, err := strconv.Atoi(svcPort[1]) targetPort, err := strconv.Atoi(svcPort[1])
if err != nil { if err != nil {
endps = lbc.getEndpoints(svc, intstr.FromString(svcPort[1]), proto) for _, sp := range svc.Spec.Ports {
if sp.Name == svcPort[1] {
endps = lbc.getEndpoints(svc, sp.TargetPort, proto)
break
}
}
} else { } else {
// we need to use the TargetPort (where the endpoints are running) // we need to use the TargetPort (where the endpoints are running)
for _, sp := range svc.Spec.Ports { for _, sp := range svc.Spec.Ports {
@ -461,7 +575,7 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
for _, path := range rule.HTTP.Paths { for _, path := range rule.HTTP.Paths {
upsName := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.IntValue()) upsName := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.StrVal)
ups := upstreams[upsName] ups := upstreams[upsName]
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName) svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
@ -479,8 +593,13 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
svc := svcObj.(*api.Service) svc := svcObj.(*api.Service)
for _, servicePort := range svc.Spec.Ports { for _, servicePort := range svc.Spec.Ports {
if servicePort.Port == path.Backend.ServicePort.IntValue() { port := servicePort.TargetPort
endps := lbc.getEndpoints(svc, servicePort.TargetPort, api.ProtocolTCP) if servicePort.Name != "" {
port = intstr.FromString(servicePort.Name)
}
if port == path.Backend.ServicePort {
endps := lbc.getEndpoints(svc, port, api.ProtocolTCP)
if len(endps) == 0 { if len(endps) == 0 {
glog.Warningf("service %v does no have any active endpoints", svcKey) glog.Warningf("service %v does no have any active endpoints", svcKey)
} }
@ -546,7 +665,7 @@ func (lbc *loadBalancerController) createUpstreams(data []interface{}) map[strin
} }
for _, path := range rule.HTTP.Paths { for _, path := range rule.HTTP.Paths {
name := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.IntValue()) name := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.StrVal)
if _, ok := upstreams[name]; !ok { if _, ok := upstreams[name]; !ok {
upstreams[name] = nginx.NewUpstream(name) upstreams[name] = nginx.NewUpstream(name)
} }
@ -666,8 +785,16 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints
targetPort = epPort.Port targetPort = epPort.Port
} }
case intstr.String: case intstr.String:
if epPort.Name == servicePort.StrVal { if val, ok := namedPortMapping(s.ObjectMeta.Annotations).getPort(servicePort.StrVal); ok {
targetPort = epPort.Port port, err := strconv.Atoi(val)
if err != nil {
glog.Warningf("%v is not valid as a port", val)
continue
}
if epPort.Protocol == proto {
targetPort = port
}
} }
} }
@ -754,6 +881,7 @@ func (lbc *loadBalancerController) Run() {
go lbc.syncQueue.run(time.Second, lbc.stopCh) go lbc.syncQueue.run(time.Second, lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh)
go lbc.svcEpQueue.run(time.Second, lbc.stopCh)
<-lbc.stopCh <-lbc.stopCh
glog.Infof("shutting down NGINX loadbalancer controller") glog.Infof("shutting down NGINX loadbalancer controller")