Watch for updates in configuration configmaps

This commit is contained in:
Manuel de Brito Fontes 2016-06-01 10:39:12 -04:00
parent 71ca55440b
commit ae52257c3a
6 changed files with 66 additions and 24 deletions

View file

@ -89,9 +89,11 @@ type loadBalancerController struct {
ingController *framework.Controller ingController *framework.Controller
endpController *framework.Controller endpController *framework.Controller
svcController *framework.Controller svcController *framework.Controller
mapController *framework.Controller
ingLister StoreToIngressLister ingLister StoreToIngressLister
svcLister cache.StoreToServiceLister svcLister cache.StoreToServiceLister
endpLister cache.StoreToEndpointsLister endpLister cache.StoreToEndpointsLister
mapLister StoreToMapLister
nginx *nginx.Manager nginx *nginx.Manager
podInfo *podInfo podInfo *podInfo
defaultSvc string defaultSvc string
@ -176,6 +178,20 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
}, },
} }
mapEventHandler := framework.ResourceEventHandlerFuncs{
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upCmap := cur.(*api.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
// updates to configuration configmaps can trigger an update
if mapKey == lbc.nxgConfigMap || mapKey == lbc.tcpConfigMap || mapKey == lbc.udpConfigMap {
lbc.recorder.Eventf(upCmap, api.EventTypeNormal, "UPDATE", mapKey)
lbc.syncQueue.enqueue(cur)
}
}
},
}
lbc.ingLister.Store, lbc.ingController = framework.NewInformer( lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: ingressListFunc(lbc.client, namespace), ListFunc: ingressListFunc(lbc.client, namespace),
@ -197,6 +213,13 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
}, },
&api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{}) &api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
lbc.mapLister.Store, lbc.mapController = framework.NewInformer(
&cache.ListWatch{
ListFunc: mapListFunc(lbc.client, namespace),
WatchFunc: mapWatchFunc(lbc.client, namespace),
},
&api.ConfigMap{}, resyncPeriod, mapEventHandler)
return &lbc, nil return &lbc, nil
} }
@ -236,20 +259,34 @@ func endpointsWatchFunc(c *client.Client, ns string) func(options api.ListOption
} }
} }
func mapListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.ConfigMaps(ns).List(opts)
}
}
func mapWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.ConfigMaps(ns).Watch(options)
}
}
func (lbc *loadBalancerController) controllersInSync() bool { func (lbc *loadBalancerController) controllersInSync() bool {
return lbc.ingController.HasSynced() && lbc.svcController.HasSynced() && lbc.endpController.HasSynced() return lbc.ingController.HasSynced() && lbc.svcController.HasSynced() &&
lbc.endpController.HasSynced() && lbc.mapController.HasSynced()
} }
func (lbc *loadBalancerController) getConfigMap(ns, name string) (*api.ConfigMap, error) { func (lbc *loadBalancerController) getConfigMap(ns, name string) (*api.ConfigMap, error) {
// TODO: check why lbc.mapLister.Store.GetByKey(mapKey) is not stable (random content)
return lbc.client.ConfigMaps(ns).Get(name) return lbc.client.ConfigMaps(ns).Get(name)
} }
func (lbc *loadBalancerController) getTCPConfigMap(ns, name string) (*api.ConfigMap, error) { func (lbc *loadBalancerController) getTCPConfigMap(ns, name string) (*api.ConfigMap, error) {
return lbc.client.ConfigMaps(ns).Get(name) return lbc.getConfigMap(ns, name)
} }
func (lbc *loadBalancerController) getUDPConfigMap(ns, name string) (*api.ConfigMap, error) { func (lbc *loadBalancerController) getUDPConfigMap(ns, name string) (*api.ConfigMap, error) {
return lbc.client.ConfigMaps(ns).Get(name) return lbc.getConfigMap(ns, name)
} }
// checkSvcForUpdate verifies if one of the running pods for a service contains // checkSvcForUpdate verifies if one of the running pods for a service contains
@ -336,6 +373,7 @@ func (lbc *loadBalancerController) sync(key string) {
ns, name, _ := parseNsName(lbc.nxgConfigMap) ns, name, _ := parseNsName(lbc.nxgConfigMap)
cfg, err := lbc.getConfigMap(ns, name) cfg, err := lbc.getConfigMap(ns, name)
if err != nil { if err != nil {
glog.V(3).Infof("unexpected error searching configmap %v: %v", lbc.nxgConfigMap, err)
cfg = &api.ConfigMap{} cfg = &api.ConfigMap{}
} }
@ -980,6 +1018,7 @@ func (lbc *loadBalancerController) Run() {
go lbc.ingController.Run(lbc.stopCh) go lbc.ingController.Run(lbc.stopCh)
go lbc.endpController.Run(lbc.stopCh) go lbc.endpController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh) go lbc.svcController.Run(lbc.stopCh)
go lbc.mapController.Run(lbc.stopCh)
go lbc.syncQueue.run(time.Second, lbc.stopCh) go lbc.syncQueue.run(time.Second, lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh)

View file

@ -134,14 +134,13 @@ http {
{{- if .customErrors }} {{- if .customErrors }}
# Custom error pages # Custom error pages
proxy_intercept_errors on; proxy_intercept_errors on;
{{ end -}}
{{- range $errCode := $cfg.customHttpErrors }}
error_page {{ $errCode }} = @custom_{{ $errCode }};
{{ end }} {{ end }}
{{- range $errCode := $cfg.customHttpErrors }}
error_page {{ $errCode }} = @custom_{{ $errCode }};{{ end }}
# In case of errors try the next upstream server before returning an error # In case of errors try the next upstream server before returning an error
proxy_next_upstream error timeout invalid_header http_502 http_503 http_504 {{ if $cfg.retryNonIdempotent }}non_idempotent{{ end }}; proxy_next_upstream error timeout invalid_header http_502 http_503 http_504{{ if $cfg.retryNonIdempotent }} non_idempotent{{ end }};
{{range $name, $upstream := .upstreams}} {{range $name, $upstream := .upstreams}}
upstream {{$upstream.Name}} { upstream {{$upstream.Name}} {
@ -151,14 +150,13 @@ http {
least_conn; least_conn;
{{- end }} {{- end }}
{{ range $server := $upstream.Backends }}server {{ $server.Address }}:{{ $server.Port }} max_fails={{ $server.MaxFails }} fail_timeout={{ $server.FailTimeout }}; {{ range $server := $upstream.Backends }}server {{ $server.Address }}:{{ $server.Port }} max_fails={{ $server.MaxFails }} fail_timeout={{ $server.FailTimeout }};
{{ end }} {{ end }}
} }
{{ end }} {{ end }}
{{/* build all the required rate limit zones. Each annotation requires a dedicated zone */}} {{/* build all the required rate limit zones. Each annotation requires a dedicated zone */}}
{{/* 1MB -> 16 thousand 64-byte states or about 8 thousand 128-byte states */}} {{/* 1MB -> 16 thousand 64-byte states or about 8 thousand 128-byte states */}}
{{ range $zone := (buildRateLimitZones .servers) }} {{- range $zone := (buildRateLimitZones .servers) }}
{{ $zone }} {{ $zone }}
{{ end }} {{ end }}
@ -171,7 +169,7 @@ http {
# PEM sha: {{ $server.SSLPemChecksum }} # PEM sha: {{ $server.SSLPemChecksum }}
ssl_certificate {{ $server.SSLCertificate }}; ssl_certificate {{ $server.SSLCertificate }};
ssl_certificate_key {{ $server.SSLCertificateKey }}; ssl_certificate_key {{ $server.SSLCertificateKey }};
{{ end }} {{- end }}
{{ if (and $server.SSL $cfg.hsts) -}} {{ if (and $server.SSL $cfg.hsts) -}}
if ($scheme = http) { if ($scheme = http) {
@ -179,19 +177,19 @@ http {
} }
more_set_headers "Strict-Transport-Security: max-age={{ $cfg.hstsMaxAge }}{{ if $cfg.hstsIncludeSubdomains }}; includeSubDomains{{ end }}; preload"; more_set_headers "Strict-Transport-Security: max-age={{ $cfg.hstsMaxAge }}{{ if $cfg.hstsIncludeSubdomains }}; includeSubDomains{{ end }}; preload";
{{ end -}} {{- end }}
{{ if $cfg.enableVtsStatus }}vhost_traffic_status_filter_by_set_key $geoip_country_code country::$server_name;{{ end }} {{ if $cfg.enableVtsStatus }}vhost_traffic_status_filter_by_set_key $geoip_country_code country::$server_name;{{ end }}
{{- range $location := $server.Locations }} {{- range $location := $server.Locations }}
{{- $path := buildLocation $location }} {{ $path := buildLocation $location }}
location {{ $path }} { location {{ $path }} {
{{/* if the location contains a rate limit annotation, create one */}} {{/* if the location contains a rate limit annotation, create one */}}
{{ $limits := buildRateLimit $location }} {{ $limits := buildRateLimit $location }}
{{- range $limit := $limits }} {{- range $limit := $limits }}
{{ $limit }}{{ end }} {{ $limit }}{{ end }}
{{ if $location.Auth.Secured -}} {{ if $location.Auth.Secured }}
{{ if eq $location.Auth.Type "basic" }} {{ if eq $location.Auth.Type "basic" }}
auth_basic "{{ $location.Auth.Realm }}"; auth_basic "{{ $location.Auth.Realm }}";
auth_basic_user_file {{ $location.Auth.File }}; auth_basic_user_file {{ $location.Auth.File }};

View file

@ -129,7 +129,7 @@ type Configuration struct {
// http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_intercept_errors // http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_intercept_errors
// http://nginx.org/en/docs/http/ngx_http_core_module.html#error_page // http://nginx.org/en/docs/http/ngx_http_core_module.html#error_page
// By default this is disabled // By default this is disabled
CustomHTTPErrors []int CustomHTTPErrors []int `structs:"custom-http-errors,-"`
// Time during which a keep-alive client connection will stay open on the server side. // Time during which a keep-alive client connection will stay open on the server side.
// The zero value disables keep-alive client connections // The zero value disables keep-alive client connections
@ -295,7 +295,7 @@ func newDefaultNginxCfg() Configuration {
WorkerProcesses: strconv.Itoa(runtime.NumCPU()), WorkerProcesses: strconv.Itoa(runtime.NumCPU()),
VtsStatusZoneSize: "10m", VtsStatusZoneSize: "10m",
UseHTTP2: true, UseHTTP2: true,
CustomHTTPErrors: []int{}, CustomHTTPErrors: make([]int, 0),
} }
if glog.V(5) { if glog.V(5) {

View file

@ -197,14 +197,14 @@ func buildRateLimitZones(input interface{}) []string {
for _, server := range servers { for _, server := range servers {
for _, loc := range server.Locations { for _, loc := range server.Locations {
if loc.RateLimit.Connections.Limit != -1 { if loc.RateLimit.Connections.Limit > 0 {
zone := fmt.Sprintf("limit_conn_zone $binary_remote_addr zone=%v:%v;", zone := fmt.Sprintf("limit_conn_zone $binary_remote_addr zone=%v:%vm;",
loc.RateLimit.Connections.Name, loc.RateLimit.Connections.SharedSize) loc.RateLimit.Connections.Name, loc.RateLimit.Connections.SharedSize)
zones = append(zones, zone) zones = append(zones, zone)
} }
if loc.RateLimit.RPS.Limit != -1 { if loc.RateLimit.RPS.Limit > 0 {
zone := fmt.Sprintf("limit_conn_zone $binary_remote_addr zone=%v:%v rate=%vr/s;", zone := fmt.Sprintf("limit_conn_zone $binary_remote_addr zone=%v:%vm rate=%vr/s;",
loc.RateLimit.Connections.Name, loc.RateLimit.Connections.SharedSize, loc.RateLimit.Connections.Limit) loc.RateLimit.Connections.Name, loc.RateLimit.Connections.SharedSize, loc.RateLimit.Connections.Limit)
zones = append(zones, zone) zones = append(zones, zone)
} }
@ -224,13 +224,13 @@ func buildRateLimit(input interface{}) []string {
return limits return limits
} }
if loc.RateLimit.Connections.Limit != -1 { if loc.RateLimit.Connections.Limit > 0 {
limit := fmt.Sprintf("limit_conn %v %v;", limit := fmt.Sprintf("limit_conn %v %v;",
loc.RateLimit.Connections.Name, loc.RateLimit.Connections.Limit) loc.RateLimit.Connections.Name, loc.RateLimit.Connections.Limit)
limits = append(limits, limit) limits = append(limits, limit)
} }
if loc.RateLimit.RPS.Limit != -1 { if loc.RateLimit.RPS.Limit > 0 {
limit := fmt.Sprintf("limit_req zone=%v burst=%v nodelay;", limit := fmt.Sprintf("limit_req zone=%v burst=%v nodelay;",
loc.RateLimit.Connections.Name, loc.RateLimit.Connections.Burst) loc.RateLimit.Connections.Name, loc.RateLimit.Connections.Burst)
limits = append(limits, limit) limits = append(limits, limit)

View file

@ -96,7 +96,7 @@ func (ngx *Manager) ReadConfig(config *api.ConfigMap) Configuration {
Metadata: metadata, Metadata: metadata,
}) })
var cErrors []int cErrors := make([]int, 0)
if val, ok := config.Data[customHTTPErrors]; ok { if val, ok := config.Data[customHTTPErrors]; ok {
delete(config.Data, customHTTPErrors) delete(config.Data, customHTTPErrors)
for _, i := range strings.Split(val, ",") { for _, i := range strings.Split(val, ",") {
@ -138,7 +138,7 @@ func (ngx *Manager) ReadConfig(config *api.ConfigMap) Configuration {
} }
func (ngx *Manager) filterErrors(errCodes []int) []int { func (ngx *Manager) filterErrors(errCodes []int) []int {
var fa []int fa := make([]int, 0)
for _, errCode := range errCodes { for _, errCode := range errCodes {
if errCode > 299 && errCode < 600 { if errCode > 299 && errCode < 600 {
fa = append(fa, errCode) fa = append(fa, errCode)

View file

@ -37,6 +37,11 @@ type StoreToIngressLister struct {
cache.Store cache.Store
} }
// StoreToMapLister makes a Store that lists Secrets.
type StoreToMapLister struct {
cache.Store
}
// taskQueue manages a work queue through an independent worker that // taskQueue manages a work queue through an independent worker that
// invokes the given sync function for every work item inserted. // invokes the given sync function for every work item inserted.
type taskQueue struct { type taskQueue struct {