Use Ingress creation and update events instead services to reduce pod queries
This commit is contained in:
parent
107bf1837b
commit
996e19cdb8
1 changed files with 49 additions and 27 deletions
|
@ -116,7 +116,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
|
|||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
||||
eventBroadcaster.StartRecordingToSink(kubeClient.Events(namespace))
|
||||
|
||||
lbc := loadBalancerController{
|
||||
client: kubeClient,
|
||||
|
@ -139,6 +139,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
|
|||
addIng := obj.(*extensions.Ingress)
|
||||
lbc.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
|
||||
lbc.ingQueue.enqueue(obj)
|
||||
lbc.svcEpQueue.enqueue(obj)
|
||||
lbc.syncQueue.enqueue(obj)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
|
@ -151,6 +152,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
|
|||
upIng := cur.(*extensions.Ingress)
|
||||
lbc.recorder.Eventf(upIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("%s/%s", upIng.Namespace, upIng.Name))
|
||||
lbc.ingQueue.enqueue(cur)
|
||||
lbc.svcEpQueue.enqueue(cur)
|
||||
lbc.syncQueue.enqueue(cur)
|
||||
}
|
||||
},
|
||||
|
@ -170,17 +172,6 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
|
|||
},
|
||||
}
|
||||
|
||||
svcEventHandler := framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
lbc.svcEpQueue.enqueue(obj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
lbc.svcEpQueue.enqueue(cur)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: ingressListFunc(lbc.client, namespace),
|
||||
|
@ -200,7 +191,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
|
|||
ListFunc: serviceListFunc(lbc.client, namespace),
|
||||
WatchFunc: serviceWatchFunc(lbc.client, namespace),
|
||||
},
|
||||
&api.Service{}, resyncPeriod, svcEventHandler)
|
||||
&api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
|
||||
|
||||
return &lbc, nil
|
||||
}
|
||||
|
@ -263,27 +254,54 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
|
|||
return
|
||||
}
|
||||
|
||||
svcObj, svcExists, err := lbc.svcLister.GetByKey(key)
|
||||
glog.V(4).Infof("checking if service %v uses named ports to update annotation %v", key, namedPortAnnotation)
|
||||
|
||||
ingObj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Warningf("error getting service %v: %v", key, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !svcExists {
|
||||
if !ingExists {
|
||||
glog.Warningf("service %v not found", key)
|
||||
return
|
||||
}
|
||||
|
||||
svc := svcObj.(*api.Service)
|
||||
if svc.Spec.Selector == nil {
|
||||
return
|
||||
}
|
||||
ing := ingObj.(*extensions.Ingress)
|
||||
for _, rule := range ing.Spec.Rules {
|
||||
if rule.IngressRuleValue.HTTP == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, path := range rule.HTTP.Paths {
|
||||
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
|
||||
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
|
||||
if err != nil {
|
||||
glog.Infof("error getting service %v from the cache: %v", svcKey, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !svcExists {
|
||||
glog.Warningf("service %v does no exists", svcKey)
|
||||
continue
|
||||
}
|
||||
|
||||
svc := svcObj.(*api.Service)
|
||||
if svc.Spec.Selector == nil {
|
||||
return
|
||||
}
|
||||
|
||||
lbc.checkSvcForUpdate(svc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) {
|
||||
pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{
|
||||
LabelSelector: labels.Set(svc.Spec.Selector).AsSelector(),
|
||||
})
|
||||
if err != nil {
|
||||
glog.Errorf("error searching service pods %q: %v", key, err)
|
||||
glog.Errorf("error searching service pods %v/%v: %v", svc.Namespace, svc.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -291,7 +309,7 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
|
|||
|
||||
for i := range pods.Items {
|
||||
pod := &pods.Items[i]
|
||||
|
||||
glog.V(4).Infof("checking pod %v/%v for named port information", pod.Namespace, pod.Name)
|
||||
for i := range svc.Spec.Ports {
|
||||
servicePort := &svc.Spec.Ports[i]
|
||||
|
||||
|
@ -299,7 +317,7 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
|
|||
if err != nil {
|
||||
portNum, err := podutil.FindPort(pod, servicePort)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Failed to find port for service %s/%s: %v", svc.Namespace, svc.Name, err)
|
||||
glog.V(4).Infof("failed to find port for service %s/%s: %v", svc.Namespace, svc.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -312,16 +330,18 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
|
|||
}
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(svc.ObjectMeta.Annotations, namedPorts) {
|
||||
if svc.ObjectMeta.Annotations == nil {
|
||||
svc.ObjectMeta.Annotations = map[string]string{}
|
||||
}
|
||||
|
||||
curNamedPort := svc.ObjectMeta.Annotations[namedPortAnnotation]
|
||||
if !reflect.DeepEqual(curNamedPort, namedPorts) {
|
||||
data, _ := json.Marshal(namedPorts)
|
||||
if svc.ObjectMeta.Annotations == nil {
|
||||
svc.ObjectMeta.Annotations = map[string]string{}
|
||||
}
|
||||
svc.ObjectMeta.Annotations[namedPortAnnotation] = string(data)
|
||||
glog.Infof("updating service %v with new named port mappings", svc.Name)
|
||||
_, err := lbc.client.Services(svc.Namespace).Update(svc)
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing service %q: %v", key, err)
|
||||
glog.Errorf("error syncing service %v/%v: %v", svc.Namespace, svc.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -879,6 +899,8 @@ func (lbc *loadBalancerController) Run() {
|
|||
go lbc.endpController.Run(lbc.stopCh)
|
||||
go lbc.svcController.Run(lbc.stopCh)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
go lbc.syncQueue.run(time.Second, lbc.stopCh)
|
||||
go lbc.ingQueue.run(time.Second, lbc.stopCh)
|
||||
go lbc.svcEpQueue.run(time.Second, lbc.stopCh)
|
||||
|
|
Loading…
Reference in a new issue