From a87efce5c2c1ff4b10c8f6fd8a9064f10c8febba Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Sat, 27 Feb 2016 12:17:54 -0300 Subject: [PATCH] Allow nginx Ingress controller run as DaemonSet --- controllers/nginx-third-party/controller.go | 60 +++++++++++++++---- .../examples/as-daemonset.yaml | 47 +++++++++++++++ controllers/nginx-third-party/main.go | 7 ++- controllers/nginx-third-party/nginx/utils.go | 21 +++---- controllers/nginx-third-party/utils.go | 35 +++++++---- 5 files changed, 133 insertions(+), 37 deletions(-) create mode 100644 controllers/nginx-third-party/examples/as-daemonset.yaml diff --git a/controllers/nginx-third-party/controller.go b/controllers/nginx-third-party/controller.go index 860ac753b..1f2b32dec 100644 --- a/controllers/nginx-third-party/controller.go +++ b/controllers/nginx-third-party/controller.go @@ -157,14 +157,32 @@ func NewLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura lbc.configLister.Store, lbc.configController = framework.NewInformer( &cache.ListWatch{ ListFunc: func(api.ListOptions) (runtime.Object, error) { - rc, err := kubeClient.ReplicationControllers(lbInfo.RCNamespace).Get(lbInfo.RCName) - return &api.ReplicationControllerList{ - Items: []api.ReplicationController{*rc}, - }, err + switch lbInfo.DeployType.(type) { + case *api.ReplicationController: + rc, err := kubeClient.ReplicationControllers(lbInfo.PodNamespace).Get(lbInfo.ObjectName) + return &api.ReplicationControllerList{ + Items: []api.ReplicationController{*rc}, + }, err + case *extensions.DaemonSet: + ds, err := kubeClient.Extensions().DaemonSets(lbInfo.PodNamespace).Get(lbInfo.ObjectName) + return &extensions.DaemonSetList{ + Items: []extensions.DaemonSet{*ds}, + }, err + default: + return nil, errInvalidKind + } }, WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": lbInfo.RCName}) - return kubeClient.ReplicationControllers(lbInfo.RCNamespace).Watch(options) + switch lbInfo.DeployType.(type) { + case *api.ReplicationController: + options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": lbInfo.ObjectName}) + return kubeClient.ReplicationControllers(lbInfo.PodNamespace).Watch(options) + case *extensions.DaemonSet: + options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": lbInfo.ObjectName}) + return kubeClient.Extensions().DaemonSets(lbInfo.PodNamespace).Watch(options) + default: + return nil, errInvalidKind + } }, }, &api.ReplicationController{}, resyncPeriod, configHandlers) @@ -218,8 +236,15 @@ func (lbc *loadBalancerController) syncIngress(key string) { // syncConfig manages changes in nginx configuration. func (lbc *loadBalancerController) syncConfig(key string) { - // we only need to sync the nginx rc - if key != fmt.Sprintf("%v/%v", lbc.lbInfo.RCNamespace, lbc.lbInfo.RCName) { + glog.Infof("Syncing nginx configuration") + if !lbc.ingController.HasSynced() { + glog.Infof("deferring sync till endpoints controller has synced") + time.Sleep(100 * time.Millisecond) + } + + // we only need to sync nginx + if key != fmt.Sprintf("%v/%v", lbc.lbInfo.PodNamespace, lbc.lbInfo.ObjectName) { + glog.Warningf("skipping sync because the event is not related to a change in configuration") return } @@ -230,15 +255,24 @@ func (lbc *loadBalancerController) syncConfig(key string) { } if !configExists { - glog.Errorf("Configutation not found: %v", key) + glog.Errorf("Configuration not found: %v", key) return } glog.V(2).Infof("Syncing config %v", key) - rc := *obj.(*api.ReplicationController) - ngxCfgAnn, _ := annotations(rc.Annotations).getNginxConfig() - tcpSvcAnn, _ := annotations(rc.Annotations).getTcpServices() + var kindAnnotations map[string]string + switch obj.(type) { + case *api.ReplicationController: + rc := *obj.(*api.ReplicationController) + kindAnnotations = rc.Annotations + case *extensions.DaemonSet: + rc := *obj.(*extensions.DaemonSet) + kindAnnotations = rc.Annotations + } + + ngxCfgAnn, _ := annotations(kindAnnotations).getNginxConfig() + tcpSvcAnn, _ := annotations(kindAnnotations).getTcpServices() ngxConfig, err := lbc.ngx.ReadConfig(ngxCfgAnn) if err != nil { @@ -321,7 +355,7 @@ func (lbc *loadBalancerController) Run() { go lbc.configQueue.run(time.Second, lbc.stopCh) // Initial nginx configuration. - lbc.syncConfig(lbc.lbInfo.RCName) + lbc.syncConfig(lbc.lbInfo.PodNamespace + "/" + lbc.lbInfo.ObjectName) time.Sleep(5 * time.Second) diff --git a/controllers/nginx-third-party/examples/as-daemonset.yaml b/controllers/nginx-third-party/examples/as-daemonset.yaml new file mode 100644 index 000000000..31c9dbb8f --- /dev/null +++ b/controllers/nginx-third-party/examples/as-daemonset.yaml @@ -0,0 +1,47 @@ +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + name: nginx-ingress-lb +spec: + template: + metadata: + labels: + name: nginx-ingress-lb + spec: + containers: + - image: gcr.io/google_containers/nginx-third-party::0.3 + name: nginx-ingress-lb + imagePullPolicy: Always + livenessProbe: + httpGet: + path: /healthz + port: 10249 + scheme: HTTP + initialDelaySeconds: 30 + timeoutSeconds: 5 + # use downward API + env: + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + ports: + - containerPort: 80 + hostPort: 80 + - containerPort: 443 + hostPort: 4444 + # we expose 8080 to access nginx stats in url /nginx-status + # this is optional + - containerPort: 8080 + hostPort: 8081 + args: + - /nginx-third-party-lb + - --default-backend-service=default/default-http-backend diff --git a/controllers/nginx-third-party/main.go b/controllers/nginx-third-party/main.go index b3925f623..a5cec4001 100644 --- a/controllers/nginx-third-party/main.go +++ b/controllers/nginx-third-party/main.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/runtime" ) const ( @@ -85,10 +86,10 @@ func main() { } } -// lbInfo contains runtime information about the pod and replication controller +// lbInfo contains runtime information about the pod type lbInfo struct { - RCNamespace string - RCName string + ObjectName string + DeployType runtime.Object Podname string PodIP string PodNamespace string diff --git a/controllers/nginx-third-party/nginx/utils.go b/controllers/nginx-third-party/nginx/utils.go index 4c06197ae..5bad3819b 100644 --- a/controllers/nginx-third-party/nginx/utils.go +++ b/controllers/nginx-third-party/nginx/utils.go @@ -95,18 +95,19 @@ func getDnsServers() []string { // ReadConfig obtains the configuration defined by the user or returns the default if it does not // exists or if is not a well formed json object -func (ngx *NginxManager) ReadConfig(data string) (cfg *nginxConfiguration, err error) { - err = json.Unmarshal([]byte(data), &cfg) - if err != nil { - glog.Errorf("Invalid json: %v", err) - cfg = &nginxConfiguration{} - err = fmt.Errorf("Invalid custom nginx configuration: %v", err) - return +func (ngx *NginxManager) ReadConfig(data string) (*nginxConfiguration, error) { + if data == "" { + return newDefaultNginxCfg(), nil } - cfg = newDefaultNginxCfg() - err = fmt.Errorf("No custom nginx configuration. Using defaults") - return + cfg := nginxConfiguration{} + err := json.Unmarshal([]byte(data), &cfg) + if err != nil { + glog.Errorf("Invalid json: %v", err) + return newDefaultNginxCfg(), fmt.Errorf("Invalid custom nginx configuration: %v", err) + } + + return &cfg, nil } func merge(dst, src map[string]interface{}) map[string]interface{} { diff --git a/controllers/nginx-third-party/utils.go b/controllers/nginx-third-party/utils.go index 546d69c50..13ae1955a 100644 --- a/controllers/nginx-third-party/utils.go +++ b/controllers/nginx-third-party/utils.go @@ -18,6 +18,7 @@ package main import ( "encoding/json" + "fmt" "os" "strconv" "strings" @@ -35,6 +36,12 @@ import ( "k8s.io/kubernetes/pkg/util/workqueue" ) +var ( + errMissingPodInfo = fmt.Errorf("Unable to get POD information") + + errInvalidKind = fmt.Errorf("Please check the field Kind, only ReplicationController or DaemonSet are allowed") +) + // taskQueue manages a work queue through an independent worker that // invokes the given sync function for every work item inserted. type taskQueue struct { @@ -124,27 +131,33 @@ func getLBDetails(kubeClient *unversioned.Client) (rc *lbInfo, err error) { pod, _ := kubeClient.Pods(podNs).Get(podName) if pod == nil { - return + return nil, errMissingPodInfo } annotations := pod.Annotations["kubernetes.io/created-by"] var sref api.SerializedReference err = json.Unmarshal([]byte(annotations), &sref) if err != nil { - return + return nil, err } - if sref.Reference.Kind == "ReplicationController" { - rc = &lbInfo{ - RCNamespace: sref.Reference.Namespace, - RCName: sref.Reference.Name, - PodIP: podIP, - Podname: podName, - PodNamespace: podNs, - } + rc = &lbInfo{ + ObjectName: sref.Reference.Name, + PodIP: podIP, + Podname: podName, + PodNamespace: podNs, } - return + switch sref.Reference.Kind { + case "ReplicationController": + rc.DeployType = &api.ReplicationController{} + return rc, nil + case "DaemonSet": + rc.DeployType = &extensions.DaemonSet{} + return rc, nil + default: + return nil, errInvalidKind + } } func getService(kubeClient *unversioned.Client, name string) nginx.Service {