Allow nginx Ingress controller run as DaemonSet

This commit is contained in:
Manuel de Brito Fontes 2016-02-27 12:17:54 -03:00
parent 3da4e74e5a
commit a87efce5c2
5 changed files with 133 additions and 37 deletions

View file

@ -157,14 +157,32 @@ func NewLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
lbc.configLister.Store, lbc.configController = framework.NewInformer( lbc.configLister.Store, lbc.configController = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(api.ListOptions) (runtime.Object, error) { ListFunc: func(api.ListOptions) (runtime.Object, error) {
rc, err := kubeClient.ReplicationControllers(lbInfo.RCNamespace).Get(lbInfo.RCName) switch lbInfo.DeployType.(type) {
case *api.ReplicationController:
rc, err := kubeClient.ReplicationControllers(lbInfo.PodNamespace).Get(lbInfo.ObjectName)
return &api.ReplicationControllerList{ return &api.ReplicationControllerList{
Items: []api.ReplicationController{*rc}, Items: []api.ReplicationController{*rc},
}, err }, err
case *extensions.DaemonSet:
ds, err := kubeClient.Extensions().DaemonSets(lbInfo.PodNamespace).Get(lbInfo.ObjectName)
return &extensions.DaemonSetList{
Items: []extensions.DaemonSet{*ds},
}, err
default:
return nil, errInvalidKind
}
}, },
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": lbInfo.RCName}) switch lbInfo.DeployType.(type) {
return kubeClient.ReplicationControllers(lbInfo.RCNamespace).Watch(options) case *api.ReplicationController:
options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": lbInfo.ObjectName})
return kubeClient.ReplicationControllers(lbInfo.PodNamespace).Watch(options)
case *extensions.DaemonSet:
options.LabelSelector = labels.SelectorFromSet(labels.Set{"name": lbInfo.ObjectName})
return kubeClient.Extensions().DaemonSets(lbInfo.PodNamespace).Watch(options)
default:
return nil, errInvalidKind
}
}, },
}, },
&api.ReplicationController{}, resyncPeriod, configHandlers) &api.ReplicationController{}, resyncPeriod, configHandlers)
@ -218,8 +236,15 @@ func (lbc *loadBalancerController) syncIngress(key string) {
// syncConfig manages changes in nginx configuration. // syncConfig manages changes in nginx configuration.
func (lbc *loadBalancerController) syncConfig(key string) { func (lbc *loadBalancerController) syncConfig(key string) {
// we only need to sync the nginx rc glog.Infof("Syncing nginx configuration")
if key != fmt.Sprintf("%v/%v", lbc.lbInfo.RCNamespace, lbc.lbInfo.RCName) { if !lbc.ingController.HasSynced() {
glog.Infof("deferring sync till endpoints controller has synced")
time.Sleep(100 * time.Millisecond)
}
// we only need to sync nginx
if key != fmt.Sprintf("%v/%v", lbc.lbInfo.PodNamespace, lbc.lbInfo.ObjectName) {
glog.Warningf("skipping sync because the event is not related to a change in configuration")
return return
} }
@ -230,15 +255,24 @@ func (lbc *loadBalancerController) syncConfig(key string) {
} }
if !configExists { if !configExists {
glog.Errorf("Configutation not found: %v", key) glog.Errorf("Configuration not found: %v", key)
return return
} }
glog.V(2).Infof("Syncing config %v", key) glog.V(2).Infof("Syncing config %v", key)
var kindAnnotations map[string]string
switch obj.(type) {
case *api.ReplicationController:
rc := *obj.(*api.ReplicationController) rc := *obj.(*api.ReplicationController)
ngxCfgAnn, _ := annotations(rc.Annotations).getNginxConfig() kindAnnotations = rc.Annotations
tcpSvcAnn, _ := annotations(rc.Annotations).getTcpServices() case *extensions.DaemonSet:
rc := *obj.(*extensions.DaemonSet)
kindAnnotations = rc.Annotations
}
ngxCfgAnn, _ := annotations(kindAnnotations).getNginxConfig()
tcpSvcAnn, _ := annotations(kindAnnotations).getTcpServices()
ngxConfig, err := lbc.ngx.ReadConfig(ngxCfgAnn) ngxConfig, err := lbc.ngx.ReadConfig(ngxCfgAnn)
if err != nil { if err != nil {
@ -321,7 +355,7 @@ func (lbc *loadBalancerController) Run() {
go lbc.configQueue.run(time.Second, lbc.stopCh) go lbc.configQueue.run(time.Second, lbc.stopCh)
// Initial nginx configuration. // Initial nginx configuration.
lbc.syncConfig(lbc.lbInfo.RCName) lbc.syncConfig(lbc.lbInfo.PodNamespace + "/" + lbc.lbInfo.ObjectName)
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)

View file

@ -0,0 +1,47 @@
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: nginx-ingress-lb
spec:
template:
metadata:
labels:
name: nginx-ingress-lb
spec:
containers:
- image: gcr.io/google_containers/nginx-third-party::0.3
name: nginx-ingress-lb
imagePullPolicy: Always
livenessProbe:
httpGet:
path: /healthz
port: 10249
scheme: HTTP
initialDelaySeconds: 30
timeoutSeconds: 5
# use downward API
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
ports:
- containerPort: 80
hostPort: 80
- containerPort: 443
hostPort: 4444
# we expose 8080 to access nginx stats in url /nginx-status
# this is optional
- containerPort: 8080
hostPort: 8081
args:
- /nginx-third-party-lb
- --default-backend-service=default/default-http-backend

View file

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
) )
const ( const (
@ -85,10 +86,10 @@ func main() {
} }
} }
// lbInfo contains runtime information about the pod and replication controller // lbInfo contains runtime information about the pod
type lbInfo struct { type lbInfo struct {
RCNamespace string ObjectName string
RCName string DeployType runtime.Object
Podname string Podname string
PodIP string PodIP string
PodNamespace string PodNamespace string

View file

@ -95,18 +95,19 @@ func getDnsServers() []string {
// ReadConfig obtains the configuration defined by the user or returns the default if it does not // ReadConfig obtains the configuration defined by the user or returns the default if it does not
// exists or if is not a well formed json object // exists or if is not a well formed json object
func (ngx *NginxManager) ReadConfig(data string) (cfg *nginxConfiguration, err error) { func (ngx *NginxManager) ReadConfig(data string) (*nginxConfiguration, error) {
err = json.Unmarshal([]byte(data), &cfg) if data == "" {
if err != nil { return newDefaultNginxCfg(), nil
glog.Errorf("Invalid json: %v", err)
cfg = &nginxConfiguration{}
err = fmt.Errorf("Invalid custom nginx configuration: %v", err)
return
} }
cfg = newDefaultNginxCfg() cfg := nginxConfiguration{}
err = fmt.Errorf("No custom nginx configuration. Using defaults") err := json.Unmarshal([]byte(data), &cfg)
return if err != nil {
glog.Errorf("Invalid json: %v", err)
return newDefaultNginxCfg(), fmt.Errorf("Invalid custom nginx configuration: %v", err)
}
return &cfg, nil
} }
func merge(dst, src map[string]interface{}) map[string]interface{} { func merge(dst, src map[string]interface{}) map[string]interface{} {

View file

@ -18,6 +18,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -35,6 +36,12 @@ import (
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
) )
var (
errMissingPodInfo = fmt.Errorf("Unable to get POD information")
errInvalidKind = fmt.Errorf("Please check the field Kind, only ReplicationController or DaemonSet are allowed")
)
// taskQueue manages a work queue through an independent worker that // taskQueue manages a work queue through an independent worker that
// invokes the given sync function for every work item inserted. // invokes the given sync function for every work item inserted.
type taskQueue struct { type taskQueue struct {
@ -124,27 +131,33 @@ func getLBDetails(kubeClient *unversioned.Client) (rc *lbInfo, err error) {
pod, _ := kubeClient.Pods(podNs).Get(podName) pod, _ := kubeClient.Pods(podNs).Get(podName)
if pod == nil { if pod == nil {
return return nil, errMissingPodInfo
} }
annotations := pod.Annotations["kubernetes.io/created-by"] annotations := pod.Annotations["kubernetes.io/created-by"]
var sref api.SerializedReference var sref api.SerializedReference
err = json.Unmarshal([]byte(annotations), &sref) err = json.Unmarshal([]byte(annotations), &sref)
if err != nil { if err != nil {
return return nil, err
} }
if sref.Reference.Kind == "ReplicationController" {
rc = &lbInfo{ rc = &lbInfo{
RCNamespace: sref.Reference.Namespace, ObjectName: sref.Reference.Name,
RCName: sref.Reference.Name,
PodIP: podIP, PodIP: podIP,
Podname: podName, Podname: podName,
PodNamespace: podNs, PodNamespace: podNs,
} }
}
return switch sref.Reference.Kind {
case "ReplicationController":
rc.DeployType = &api.ReplicationController{}
return rc, nil
case "DaemonSet":
rc.DeployType = &extensions.DaemonSet{}
return rc, nil
default:
return nil, errInvalidKind
}
} }
func getService(kubeClient *unversioned.Client, name string) nginx.Service { func getService(kubeClient *unversioned.Client, name string) nginx.Service {