From ae52257c3adeb1baffd1f0f7dcf6b7ac6f9dadeb Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Wed, 1 Jun 2016 10:39:12 -0400 Subject: [PATCH] Watch for updates in configuration configmaps --- controllers/nginx/controller.go | 45 +++++++++++++++++++++++++++-- controllers/nginx/nginx.tmpl | 20 ++++++------- controllers/nginx/nginx/main.go | 4 +-- controllers/nginx/nginx/template.go | 12 ++++---- controllers/nginx/nginx/utils.go | 4 +-- controllers/nginx/utils.go | 5 ++++ 6 files changed, 66 insertions(+), 24 deletions(-) diff --git a/controllers/nginx/controller.go b/controllers/nginx/controller.go index 060e8cafe..1338426b2 100644 --- a/controllers/nginx/controller.go +++ b/controllers/nginx/controller.go @@ -89,9 +89,11 @@ type loadBalancerController struct { ingController *framework.Controller endpController *framework.Controller svcController *framework.Controller + mapController *framework.Controller ingLister StoreToIngressLister svcLister cache.StoreToServiceLister endpLister cache.StoreToEndpointsLister + mapLister StoreToMapLister nginx *nginx.Manager podInfo *podInfo defaultSvc string @@ -176,6 +178,20 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura }, } + mapEventHandler := framework.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + upCmap := cur.(*api.ConfigMap) + mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name) + // updates to configuration configmaps can trigger an update + if mapKey == lbc.nxgConfigMap || mapKey == lbc.tcpConfigMap || mapKey == lbc.udpConfigMap { + lbc.recorder.Eventf(upCmap, api.EventTypeNormal, "UPDATE", mapKey) + lbc.syncQueue.enqueue(cur) + } + } + }, + } + lbc.ingLister.Store, lbc.ingController = framework.NewInformer( &cache.ListWatch{ ListFunc: ingressListFunc(lbc.client, namespace), @@ -197,6 +213,13 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura }, &api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{}) + lbc.mapLister.Store, lbc.mapController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: mapListFunc(lbc.client, namespace), + WatchFunc: mapWatchFunc(lbc.client, namespace), + }, + &api.ConfigMap{}, resyncPeriod, mapEventHandler) + return &lbc, nil } @@ -236,20 +259,34 @@ func endpointsWatchFunc(c *client.Client, ns string) func(options api.ListOption } } +func mapListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) { + return func(opts api.ListOptions) (runtime.Object, error) { + return c.ConfigMaps(ns).List(opts) + } +} + +func mapWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) { + return func(options api.ListOptions) (watch.Interface, error) { + return c.ConfigMaps(ns).Watch(options) + } +} + func (lbc *loadBalancerController) controllersInSync() bool { - return lbc.ingController.HasSynced() && lbc.svcController.HasSynced() && lbc.endpController.HasSynced() + return lbc.ingController.HasSynced() && lbc.svcController.HasSynced() && + lbc.endpController.HasSynced() && lbc.mapController.HasSynced() } func (lbc *loadBalancerController) getConfigMap(ns, name string) (*api.ConfigMap, error) { + // TODO: check why lbc.mapLister.Store.GetByKey(mapKey) is not stable (random content) return lbc.client.ConfigMaps(ns).Get(name) } func (lbc *loadBalancerController) getTCPConfigMap(ns, name string) (*api.ConfigMap, error) { - return lbc.client.ConfigMaps(ns).Get(name) + return lbc.getConfigMap(ns, name) } func (lbc *loadBalancerController) getUDPConfigMap(ns, name string) (*api.ConfigMap, error) { - return lbc.client.ConfigMaps(ns).Get(name) + return lbc.getConfigMap(ns, name) } // checkSvcForUpdate verifies if one of the running pods for a service contains @@ -336,6 +373,7 @@ func (lbc *loadBalancerController) sync(key string) { ns, name, _ := parseNsName(lbc.nxgConfigMap) cfg, err := lbc.getConfigMap(ns, name) if err != nil { + glog.V(3).Infof("unexpected error searching configmap %v: %v", lbc.nxgConfigMap, err) cfg = &api.ConfigMap{} } @@ -980,6 +1018,7 @@ func (lbc *loadBalancerController) Run() { go lbc.ingController.Run(lbc.stopCh) go lbc.endpController.Run(lbc.stopCh) go lbc.svcController.Run(lbc.stopCh) + go lbc.mapController.Run(lbc.stopCh) go lbc.syncQueue.run(time.Second, lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh) diff --git a/controllers/nginx/nginx.tmpl b/controllers/nginx/nginx.tmpl index 55b5f84d8..0cc11c0c1 100644 --- a/controllers/nginx/nginx.tmpl +++ b/controllers/nginx/nginx.tmpl @@ -134,14 +134,13 @@ http { {{- if .customErrors }} # Custom error pages proxy_intercept_errors on; - {{ end -}} - - {{- range $errCode := $cfg.customHttpErrors }} - error_page {{ $errCode }} = @custom_{{ $errCode }}; {{ end }} + {{- range $errCode := $cfg.customHttpErrors }} + error_page {{ $errCode }} = @custom_{{ $errCode }};{{ end }} + # In case of errors try the next upstream server before returning an error - proxy_next_upstream error timeout invalid_header http_502 http_503 http_504 {{ if $cfg.retryNonIdempotent }}non_idempotent{{ end }}; + proxy_next_upstream error timeout invalid_header http_502 http_503 http_504{{ if $cfg.retryNonIdempotent }} non_idempotent{{ end }}; {{range $name, $upstream := .upstreams}} upstream {{$upstream.Name}} { @@ -151,14 +150,13 @@ http { least_conn; {{- end }} {{ range $server := $upstream.Backends }}server {{ $server.Address }}:{{ $server.Port }} max_fails={{ $server.MaxFails }} fail_timeout={{ $server.FailTimeout }}; - {{ end }} } {{ end }} {{/* build all the required rate limit zones. Each annotation requires a dedicated zone */}} {{/* 1MB -> 16 thousand 64-byte states or about 8 thousand 128-byte states */}} - {{ range $zone := (buildRateLimitZones .servers) }} + {{- range $zone := (buildRateLimitZones .servers) }} {{ $zone }} {{ end }} @@ -171,7 +169,7 @@ http { # PEM sha: {{ $server.SSLPemChecksum }} ssl_certificate {{ $server.SSLCertificate }}; ssl_certificate_key {{ $server.SSLCertificateKey }}; - {{ end }} + {{- end }} {{ if (and $server.SSL $cfg.hsts) -}} if ($scheme = http) { @@ -179,19 +177,19 @@ http { } more_set_headers "Strict-Transport-Security: max-age={{ $cfg.hstsMaxAge }}{{ if $cfg.hstsIncludeSubdomains }}; includeSubDomains{{ end }}; preload"; - {{ end -}} + {{- end }} {{ if $cfg.enableVtsStatus }}vhost_traffic_status_filter_by_set_key $geoip_country_code country::$server_name;{{ end }} {{- range $location := $server.Locations }} - {{- $path := buildLocation $location }} + {{ $path := buildLocation $location }} location {{ $path }} { {{/* if the location contains a rate limit annotation, create one */}} {{ $limits := buildRateLimit $location }} {{- range $limit := $limits }} {{ $limit }}{{ end }} - {{ if $location.Auth.Secured -}} + {{ if $location.Auth.Secured }} {{ if eq $location.Auth.Type "basic" }} auth_basic "{{ $location.Auth.Realm }}"; auth_basic_user_file {{ $location.Auth.File }}; diff --git a/controllers/nginx/nginx/main.go b/controllers/nginx/nginx/main.go index 783da6c9c..2b0ec9935 100644 --- a/controllers/nginx/nginx/main.go +++ b/controllers/nginx/nginx/main.go @@ -129,7 +129,7 @@ type Configuration struct { // http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_intercept_errors // http://nginx.org/en/docs/http/ngx_http_core_module.html#error_page // By default this is disabled - CustomHTTPErrors []int + CustomHTTPErrors []int `structs:"custom-http-errors,-"` // Time during which a keep-alive client connection will stay open on the server side. // The zero value disables keep-alive client connections @@ -295,7 +295,7 @@ func newDefaultNginxCfg() Configuration { WorkerProcesses: strconv.Itoa(runtime.NumCPU()), VtsStatusZoneSize: "10m", UseHTTP2: true, - CustomHTTPErrors: []int{}, + CustomHTTPErrors: make([]int, 0), } if glog.V(5) { diff --git a/controllers/nginx/nginx/template.go b/controllers/nginx/nginx/template.go index a272eaae1..0e12f1d04 100644 --- a/controllers/nginx/nginx/template.go +++ b/controllers/nginx/nginx/template.go @@ -197,14 +197,14 @@ func buildRateLimitZones(input interface{}) []string { for _, server := range servers { for _, loc := range server.Locations { - if loc.RateLimit.Connections.Limit != -1 { - zone := fmt.Sprintf("limit_conn_zone $binary_remote_addr zone=%v:%v;", + if loc.RateLimit.Connections.Limit > 0 { + zone := fmt.Sprintf("limit_conn_zone $binary_remote_addr zone=%v:%vm;", loc.RateLimit.Connections.Name, loc.RateLimit.Connections.SharedSize) zones = append(zones, zone) } - if loc.RateLimit.RPS.Limit != -1 { - zone := fmt.Sprintf("limit_conn_zone $binary_remote_addr zone=%v:%v rate=%vr/s;", + if loc.RateLimit.RPS.Limit > 0 { + zone := fmt.Sprintf("limit_conn_zone $binary_remote_addr zone=%v:%vm rate=%vr/s;", loc.RateLimit.Connections.Name, loc.RateLimit.Connections.SharedSize, loc.RateLimit.Connections.Limit) zones = append(zones, zone) } @@ -224,13 +224,13 @@ func buildRateLimit(input interface{}) []string { return limits } - if loc.RateLimit.Connections.Limit != -1 { + if loc.RateLimit.Connections.Limit > 0 { limit := fmt.Sprintf("limit_conn %v %v;", loc.RateLimit.Connections.Name, loc.RateLimit.Connections.Limit) limits = append(limits, limit) } - if loc.RateLimit.RPS.Limit != -1 { + if loc.RateLimit.RPS.Limit > 0 { limit := fmt.Sprintf("limit_req zone=%v burst=%v nodelay;", loc.RateLimit.Connections.Name, loc.RateLimit.Connections.Burst) limits = append(limits, limit) diff --git a/controllers/nginx/nginx/utils.go b/controllers/nginx/nginx/utils.go index dc7ba62e0..2cd7546b3 100644 --- a/controllers/nginx/nginx/utils.go +++ b/controllers/nginx/nginx/utils.go @@ -96,7 +96,7 @@ func (ngx *Manager) ReadConfig(config *api.ConfigMap) Configuration { Metadata: metadata, }) - var cErrors []int + cErrors := make([]int, 0) if val, ok := config.Data[customHTTPErrors]; ok { delete(config.Data, customHTTPErrors) for _, i := range strings.Split(val, ",") { @@ -138,7 +138,7 @@ func (ngx *Manager) ReadConfig(config *api.ConfigMap) Configuration { } func (ngx *Manager) filterErrors(errCodes []int) []int { - var fa []int + fa := make([]int, 0) for _, errCode := range errCodes { if errCode > 299 && errCode < 600 { fa = append(fa, errCode) diff --git a/controllers/nginx/utils.go b/controllers/nginx/utils.go index 196ba331d..ecf8ebea3 100644 --- a/controllers/nginx/utils.go +++ b/controllers/nginx/utils.go @@ -37,6 +37,11 @@ type StoreToIngressLister struct { cache.Store } +// StoreToMapLister makes a Store that lists Secrets. +type StoreToMapLister struct { + cache.Store +} + // taskQueue manages a work queue through an independent worker that // invokes the given sync function for every work item inserted. type taskQueue struct {