Update Ingress status information in nginx controller

This commit is contained in:
Manuel de Brito Fontes 2016-03-30 20:12:37 -03:00
parent 7abc7a77f6
commit 2632fe566b
3 changed files with 148 additions and 5 deletions

View file

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
@ -64,6 +65,8 @@ type loadBalancerController struct {
tcpConfigMap string
udpConfigMap string
recorder record.EventRecorder
syncQueue *taskQueue
// stopLock is used to enforce only a single call to Stop is active.
@ -77,6 +80,11 @@ type loadBalancerController struct {
// newLoadBalancerController creates a controller for nginx loadbalancer
func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Duration, defaultSvc,
namespace, nxgConfigMapName, tcpConfigMapName, udpConfigMapName string, lbRuntimeInfo *lbInfo) (*loadBalancerController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
lbc := loadBalancerController{
client: kubeClient,
stopCh: make(chan struct{}),
@ -86,10 +94,33 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
tcpConfigMap: tcpConfigMapName,
udpConfigMap: udpConfigMapName,
defaultSvc: defaultSvc,
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "loadbalancer-controller"}),
}
lbc.syncQueue = NewTaskQueue(lbc.sync)
ingEventHandler := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
lbc.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.updateIngressStatus(addIng)
lbc.syncQueue.enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
upIng := obj.(*extensions.Ingress)
lbc.recorder.Eventf(upIng, api.EventTypeNormal, "DELETE", fmt.Sprintf("%s/%s", upIng.Namespace, upIng.Name))
lbc.syncQueue.enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upIng := cur.(*extensions.Ingress)
lbc.recorder.Eventf(upIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("%s/%s", upIng.Namespace, upIng.Name))
lbc.updateIngressStatus(upIng)
lbc.syncQueue.enqueue(cur)
}
},
}
eventHandler := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
lbc.syncQueue.enqueue(obj)
@ -109,7 +140,7 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
ListFunc: ingressListFunc(lbc.client, namespace),
WatchFunc: ingressWatchFunc(lbc.client, namespace),
},
&extensions.Ingress{}, resyncPeriod, eventHandler)
&extensions.Ingress{}, resyncPeriod, ingEventHandler)
lbc.endpLister.Store, lbc.endpController = framework.NewInformer(
&cache.ListWatch{
@ -206,6 +237,39 @@ func (lbc *loadBalancerController) sync(key string) {
})
}
func (lbc *loadBalancerController) updateIngressStatus(ing *extensions.Ingress) {
ingClient := lbc.client.Extensions().Ingress(ing.Namespace)
currIng, err := ingClient.Get(ing.Name)
if err != nil {
glog.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
return
}
lbIPs := ing.Status.LoadBalancer.Ingress
if len(lbIPs) > 0 && !lbc.isStatusIPDefined(lbIPs) {
glog.Infof("Updating loadbalancer %v/%v with IP %v", ing.Namespace, ing.Name, lbc.lbInfo.Address)
currIng.Status.LoadBalancer.Ingress = append(currIng.Status.LoadBalancer.Ingress, api.LoadBalancerIngress{
IP: lbc.lbInfo.Address,
})
if _, err := ingClient.UpdateStatus(currIng); err != nil {
lbc.recorder.Eventf(currIng, api.EventTypeWarning, "UPDATE", "error: %v", err)
return
}
lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", lbc.lbInfo.Address)
}
}
func (lbc *loadBalancerController) isStatusIPDefined(lbings []api.LoadBalancerIngress) bool {
for _, lbing := range lbings {
if lbing.IP == lbc.lbInfo.Address {
return true
}
}
return false
}
func (lbc *loadBalancerController) getTCPServices() []*nginx.Location {
if lbc.tcpConfigMap == "" {
// no configmap for TCP services
@ -571,17 +635,59 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints
}
// Stop stops the loadbalancer controller.
func (lbc *loadBalancerController) Stop() {
func (lbc *loadBalancerController) Stop() error {
// Stop is invoked from the http endpoint.
lbc.stopLock.Lock()
defer lbc.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if !lbc.shutdown {
lbc.removeFromIngress()
close(lbc.stopCh)
glog.Infof("shutting down controller queues")
lbc.shutdown = true
lbc.syncQueue.shutdown()
return nil
}
return fmt.Errorf("shutdown already in progress")
}
func (lbc *loadBalancerController) removeFromIngress() {
ings := lbc.ingLister.Store.List()
glog.Infof("updating %v Ingress rule/s", len(ings))
for _, cur := range ings {
ing := cur.(*extensions.Ingress)
ingClient := lbc.client.Extensions().Ingress(ing.Namespace)
currIng, err := ingClient.Get(ing.Name)
if err != nil {
glog.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
continue
}
lbIPs := ing.Status.LoadBalancer.Ingress
if len(lbIPs) > 0 && lbc.isStatusIPDefined(lbIPs) {
glog.Infof("Updating loadbalancer %v/%v. Removing IP %v", ing.Namespace, ing.Name, lbc.lbInfo.Address)
for idx, lbStatus := range currIng.Status.LoadBalancer.Ingress {
if lbStatus.IP == lbc.lbInfo.Address {
currIng.Status.LoadBalancer.Ingress = append(currIng.Status.LoadBalancer.Ingress[:idx],
currIng.Status.LoadBalancer.Ingress[idx+1:]...)
break
}
}
if _, err := ingClient.UpdateStatus(currIng); err != nil {
lbc.recorder.Eventf(currIng, api.EventTypeWarning, "UPDATE", "error: %v", err)
continue
}
lbc.recorder.Eventf(currIng, api.EventTypeNormal, "DELETE", "ip: %v", lbc.lbInfo.Address)
}
}
}

View file

@ -22,6 +22,8 @@ import (
"net/http"
"net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"github.com/golang/glog"
@ -32,7 +34,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/runtime"
)
const (
@ -111,6 +112,7 @@ func main() {
}
go registerHandlers(lbc)
go handleSigterm(lbc)
lbc.Run()
@ -122,11 +124,10 @@ func main() {
// lbInfo contains runtime information about the pod
type lbInfo struct {
ObjectName string
DeployType runtime.Object
Podname string
PodIP string
PodNamespace string
Address string
}
func registerHandlers(lbc *loadBalancerController) {
@ -149,3 +150,18 @@ func registerHandlers(lbc *loadBalancerController) {
}
glog.Fatal(server.ListenAndServe())
}
func handleSigterm(lbc *loadBalancerController) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM)
<-signalChan
glog.Infof("Received SIGTERM, shutting down")
exitCode := 0
if err := lbc.Stop(); err != nil {
glog.Infof("Error during shutdown %v", err)
exitCode = 1
}
glog.Infof("Exiting with %v", exitCode)
os.Exit(exitCode)
}

View file

@ -24,6 +24,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util/wait"
@ -108,10 +109,30 @@ func getLBDetails(kubeClient *unversioned.Client) (*lbInfo, error) {
return nil, fmt.Errorf("Unable to get POD information")
}
node, err := kubeClient.Nodes().Get(pod.Spec.NodeName)
if err != nil {
return nil, err
}
var externalIP string
for _, address := range node.Status.Addresses {
if address.Type == api.NodeExternalIP {
if address.Address != "" {
externalIP = address.Address
break
}
}
if externalIP == "" && address.Type == api.NodeLegacyHostIP {
externalIP = address.Address
}
}
return &lbInfo{
PodIP: podIP,
Podname: podName,
PodNamespace: podNs,
Address: externalIP,
}, nil
}