Add support for default backend in Ingress rule

This commit is contained in:
Manuel de Brito Fontes 2016-10-03 16:29:05 -03:00
parent 190c9a277a
commit a0776997c3
3 changed files with 176 additions and 96 deletions

View file

@ -648,6 +648,9 @@ func (lbc *loadBalancerController) getStreamServices(data map[string]string, pro
return svcs return svcs
} }
// getDefaultUpstream returns an NGINX upstream associated with the
// default backend service. In case of error retrieving information
// configure the upstream to return http code 503.
func (lbc *loadBalancerController) getDefaultUpstream() *ingress.Upstream { func (lbc *loadBalancerController) getDefaultUpstream() *ingress.Upstream {
upstream := &ingress.Upstream{ upstream := &ingress.Upstream{
Name: defUpstreamName, Name: defUpstreamName,
@ -671,7 +674,7 @@ func (lbc *loadBalancerController) getDefaultUpstream() *ingress.Upstream {
endps := lbc.getEndpoints(svc, svc.Spec.Ports[0].TargetPort, api.ProtocolTCP, &healthcheck.Upstream{}) endps := lbc.getEndpoints(svc, svc.Spec.Ports[0].TargetPort, api.ProtocolTCP, &healthcheck.Upstream{})
if len(endps) == 0 { if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey) glog.Warningf("service %v does not have any active endpoints", svcKey)
upstream.Backends = append(upstream.Backends, nginx.NewDefaultServer()) endps = []ingress.UpstreamServer{nginx.NewDefaultServer()}
} else { } else {
upstream.Backends = append(upstream.Backends, endps...) upstream.Backends = append(upstream.Backends, endps...)
} }
@ -679,18 +682,15 @@ func (lbc *loadBalancerController) getDefaultUpstream() *ingress.Upstream {
return upstream return upstream
} }
// getUpstreamServers returns a list of Upstream and Server to be used in NGINX.
// An upstream can be used in multiple servers if the namespace, service name and port are the same
func (lbc *loadBalancerController) getUpstreamServers(ngxCfg config.Configuration, data []interface{}) ([]*ingress.Upstream, []*ingress.Server) { func (lbc *loadBalancerController) getUpstreamServers(ngxCfg config.Configuration, data []interface{}) ([]*ingress.Upstream, []*ingress.Server) {
upstreams := lbc.createUpstreams(ngxCfg, data) upstreams := lbc.createUpstreams(ngxCfg, data)
servers := lbc.createServers(data) servers := lbc.createServers(data, upstreams)
for _, ingIf := range data { for _, ingIf := range data {
ing := ingIf.(*extensions.Ingress) ing := ingIf.(*extensions.Ingress)
for _, rule := range ing.Spec.Rules {
if rule.IngressRuleValue.HTTP == nil {
continue
}
nginxAuth, err := auth.ParseAnnotations(lbc.client, ing, auth.DefAuthDirectory) nginxAuth, err := auth.ParseAnnotations(lbc.client, ing, auth.DefAuthDirectory)
glog.V(3).Infof("nginx auth %v", nginxAuth) glog.V(3).Infof("nginx auth %v", nginxAuth)
if err != nil { if err != nil {
@ -730,6 +730,7 @@ func (lbc *loadBalancerController) getUpstreamServers(ngxCfg config.Configuratio
glog.V(3).Infof("error reading auth request annotation in Ingress %v/%v: %v", ing.GetNamespace(), ing.GetName(), err) glog.V(3).Infof("error reading auth request annotation in Ingress %v/%v: %v", ing.GetNamespace(), ing.GetName(), err)
} }
for _, rule := range ing.Spec.Rules {
host := rule.Host host := rule.Host
if host == "" { if host == "" {
host = defServerName host = defServerName
@ -739,20 +740,40 @@ func (lbc *loadBalancerController) getUpstreamServers(ngxCfg config.Configuratio
server = servers[defServerName] server = servers[defServerName]
} }
if rule.HTTP == nil && host != defServerName {
// no rules, host is not default server.
// check if Ingress rules contains Backend and replace default backend
defBackend := fmt.Sprintf("default-backend-%v-%v-%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String())
ups := upstreams[defBackend]
for _, loc := range server.Locations {
loc.Upstream = *ups
}
continue
}
for _, path := range rule.HTTP.Paths { for _, path := range rule.HTTP.Paths {
upsName := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.String()) upsName := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.String())
ups := upstreams[upsName] ups := upstreams[upsName]
// we need to check if the upstream contains the default backend
if isDefaultUpstream(ups) && ing.Spec.Backend != nil {
defBackend := fmt.Sprintf("default-backend-%v-%v-%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String())
if defUps, ok := upstreams[defBackend]; ok {
ups = defUps
}
}
nginxPath := path.Path nginxPath := path.Path
// if there's no path defined we assume / // if there's no path defined we assume /
// in NGINX / == /*
if nginxPath == "" { if nginxPath == "" {
lbc.recorder.Eventf(ing, api.EventTypeWarning, "MAPPING", lbc.recorder.Eventf(ing, api.EventTypeWarning, "MAPPING",
"Ingress rule '%v/%v' contains no path definition. Assuming /", ing.GetNamespace(), ing.GetName()) "Ingress rule '%v/%v' contains no path definition. Assuming /",
ing.GetNamespace(), ing.GetName())
nginxPath = rootLocation nginxPath = rootLocation
} }
// Validate that there is no another previuous // Validate that there is no previous rule for the same host and path.
// rule for the same host and path.
addLoc := true addLoc := true
for _, loc := range server.Locations { for _, loc := range server.Locations {
if loc.Path == rootLocation && nginxPath == rootLocation && loc.IsDefBackend { if loc.Path == rootLocation && nginxPath == rootLocation && loc.IsDefBackend {
@ -762,6 +783,8 @@ func (lbc *loadBalancerController) getUpstreamServers(ngxCfg config.Configuratio
loc.Redirect = *locRew loc.Redirect = *locRew
loc.SecureUpstream = secUpstream loc.SecureUpstream = secUpstream
loc.Whitelist = *wl loc.Whitelist = *wl
loc.IsDefBackend = false
loc.Upstream = *ups
loc.EnableCORS = eCORS loc.EnableCORS = eCORS
loc.ExternalAuthURL = ra loc.ExternalAuthURL = ra
@ -829,6 +852,20 @@ func (lbc *loadBalancerController) createUpstreams(ngxCfg config.Configuration,
hz := healthcheck.ParseAnnotations(ngxCfg, ing) hz := healthcheck.ParseAnnotations(ngxCfg, ing)
var defBackend string
if ing.Spec.Backend != nil {
defBackend = fmt.Sprintf("default-backend-%v-%v-%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String())
glog.V(3).Infof("creating upstream %v", defBackend)
upstreams[defBackend] = nginx.NewUpstream(defBackend)
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName)
endps, err := lbc.getSvcEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
upstreams[defBackend].Backends = append(upstreams[defBackend].Backends, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", defBackend, err)
}
}
for _, rule := range ing.Spec.Rules { for _, rule := range ing.Spec.Rules {
if rule.IngressRuleValue.HTTP == nil { if rule.IngressRuleValue.HTTP == nil {
continue continue
@ -844,33 +881,12 @@ func (lbc *loadBalancerController) createUpstreams(ngxCfg config.Configuration,
upstreams[name] = nginx.NewUpstream(name) upstreams[name] = nginx.NewUpstream(name)
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName) svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey) endp, err := lbc.getSvcEndpoints(svcKey, path.Backend.ServicePort.String(), hz)
if err != nil { if err != nil {
glog.Infof("error getting service %v from the cache: %v", svcKey, err) glog.Warningf("error obtaining service endpoints: %v", err)
continue continue
} }
upstreams[name].Backends = endp
if !svcExists {
glog.Warningf("service %v does not exists", svcKey)
continue
}
svc := svcObj.(*api.Service)
glog.V(3).Infof("obtaining port information for service %v", svcKey)
bp := path.Backend.ServicePort.String()
for _, servicePort := range svc.Spec.Ports {
// targetPort could be a string, use the name or the port (int)
if strconv.Itoa(int(servicePort.Port)) == bp || servicePort.TargetPort.String() == bp || servicePort.Name == bp {
endps := lbc.getEndpoints(svc, servicePort.TargetPort, api.ProtocolTCP, hz)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
}
upstreams[name].Backends = append(upstreams[name].Backends, endps...)
break
}
}
} }
} }
} }
@ -878,7 +894,42 @@ func (lbc *loadBalancerController) createUpstreams(ngxCfg config.Configuration,
return upstreams return upstreams
} }
func (lbc *loadBalancerController) createServers(data []interface{}) map[string]*ingress.Server { func (lbc *loadBalancerController) getSvcEndpoints(svcKey, backendPort string,
hz *healthcheck.Upstream) ([]ingress.UpstreamServer, error) {
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
var upstreams []ingress.UpstreamServer
if err != nil {
return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
}
if !svcExists {
err = fmt.Errorf("service %v does not exists", svcKey)
return upstreams, err
}
svc := svcObj.(*api.Service)
glog.V(3).Infof("obtaining port information for service %v", svcKey)
for _, servicePort := range svc.Spec.Ports {
// targetPort could be a string, use the name or the port (int)
if strconv.Itoa(int(servicePort.Port)) == backendPort ||
servicePort.TargetPort.String() == backendPort ||
servicePort.Name == backendPort {
endps := lbc.getEndpoints(svc, servicePort.TargetPort, api.ProtocolTCP, hz)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
}
upstreams = append(upstreams, endps...)
break
}
}
return upstreams, nil
}
func (lbc *loadBalancerController) createServers(data []interface{}, upstreams map[string]*ingress.Upstream) map[string]*ingress.Server {
servers := make(map[string]*ingress.Server) servers := make(map[string]*ingress.Server)
pems := lbc.getPemsFromIngress(data) pems := lbc.getPemsFromIngress(data)
@ -921,13 +972,28 @@ func (lbc *loadBalancerController) createServers(data []interface{}) map[string]
host = defServerName host = defServerName
} }
if _, ok := servers[host]; !ok { if _, ok := servers[host]; ok {
glog.V(3).Infof("rule %v/%v uses a host already defined. Skipping server creation", ing.GetNamespace(), ing.GetName())
} else {
locs := []*ingress.Location{} locs := []*ingress.Location{}
locs = append(locs, &ingress.Location{ loc := &ingress.Location{
Path: rootLocation, Path: rootLocation,
IsDefBackend: true, IsDefBackend: true,
Upstream: *lbc.getDefaultUpstream(), Upstream: *lbc.getDefaultUpstream(),
}) }
if ing.Spec.Backend != nil {
defUpstream := fmt.Sprintf("default-backend-%v-%v-%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String())
if backendUpstream, ok := upstreams[defUpstream]; ok {
if host == "" || host == defServerName {
lbc.recorder.Eventf(ing, api.EventTypeWarning, "MAPPING", "error: rules with Spec.Backend are allowed with hostnames")
} else {
loc.Upstream = *backendUpstream
}
}
}
locs = append(locs, loc)
servers[host] = &ingress.Server{Name: host, Locations: locs} servers[host] = &ingress.Server{Name: host, Locations: locs}
} }
@ -1012,7 +1078,11 @@ func (lbc *loadBalancerController) secrReferenced(namespace string, name string)
} }
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination. // getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort intstr.IntOrString, proto api.Protocol, hz *healthcheck.Upstream) []ingress.UpstreamServer { func (lbc *loadBalancerController) getEndpoints(
s *api.Service,
servicePort intstr.IntOrString,
proto api.Protocol,
hz *healthcheck.Upstream) []ingress.UpstreamServer {
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String()) glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := lbc.endpLister.GetServiceEndpoints(s) ep, err := lbc.endpLister.GetServiceEndpoints(s)
if err != nil { if err != nil {
@ -1163,3 +1233,12 @@ func (lbc *loadBalancerController) Run() {
<-lbc.stopCh <-lbc.stopCh
} }
func isDefaultUpstream(ups *ingress.Upstream) bool {
if ups == nil || len(ups.Backends) == 0 {
return false
}
return ups.Backends[0].Address == "127.0.0.1" &&
ups.Backends[0].Port == "8181"
}

View file

@ -184,7 +184,7 @@ http {
server { server {
server_name {{ $server.Name }}; server_name {{ $server.Name }};
listen 80{{ if $cfg.useProxyProtocol }} proxy_protocol{{ end }}; listen 80{{ if $cfg.useProxyProtocol }} proxy_protocol{{ end }};
{{- if $server.SSL }}listen 443 {{ if $cfg.useProxyProtocol }}proxy_protocol{{ end }} ssl {{ if $cfg.enableSpdy }}spdy{{ end }} {{ if $cfg.useHttp2 }}http2{{ end }}; {{ if $server.SSL }}listen 443 {{ if $cfg.useProxyProtocol }}proxy_protocol{{ end }} ssl {{ if $cfg.enableSpdy }}spdy{{ end }} {{ if $cfg.useHttp2 }}http2{{ end }};
{{/* comment PEM sha is required to detect changes in the generated configuration and force a reload */}} {{/* comment PEM sha is required to detect changes in the generated configuration and force a reload */}}
# PEM sha: {{ $server.SSLPemChecksum }} # PEM sha: {{ $server.SSLPemChecksum }}
ssl_certificate {{ $server.SSLCertificate }}; ssl_certificate {{ $server.SSLCertificate }};
@ -195,7 +195,7 @@ http {
more_set_headers "Strict-Transport-Security: max-age={{ $cfg.hstsMaxAge }}{{ if $cfg.hstsIncludeSubdomains }}; includeSubDomains{{ end }}; preload"; more_set_headers "Strict-Transport-Security: max-age={{ $cfg.hstsMaxAge }}{{ if $cfg.hstsIncludeSubdomains }}; includeSubDomains{{ end }}; preload";
{{- end }} {{- end }}
{{- if $cfg.enableVtsStatus }}vhost_traffic_status_filter_by_set_key $geoip_country_code country::$server_name;{{ end -}} {{ if $cfg.enableVtsStatus }}vhost_traffic_status_filter_by_set_key $geoip_country_code country::$server_name;{{ end -}}
{{- range $location := $server.Locations }} {{- range $location := $server.Locations }}
{{ $path := buildLocation $location }} {{ $path := buildLocation $location }}
@ -216,42 +216,42 @@ http {
} }
{{ end }} {{ end }}
location {{ $path }} { location {{ $path }} {
{{ if gt (len $location.Whitelist.CIDR) 0 }} {{- if gt (len $location.Whitelist.CIDR) 0 }}
{{- range $ip := $location.Whitelist.CIDR }} {{- range $ip := $location.Whitelist.CIDR }}
allow {{ $ip }};{{ end }} allow {{ $ip }};{{ end }}
deny all; deny all;
{{ end -}} {{ end -}}
{{ if not (empty $authPath) }} {{- if not (empty $authPath) }}
# this location requires authentication # this location requires authentication
auth_request {{ $authPath }}; auth_request {{ $authPath }};
{{ end }} {{ end -}}
{{ if (and $server.SSL $location.Redirect.SSLRedirect) -}} {{- if (and $server.SSL $location.Redirect.SSLRedirect) }}
# enforce ssl on server side # enforce ssl on server side
if ($scheme = http) { if ($scheme = http) {
return 301 https://$host$request_uri; return 301 https://$host$request_uri;
} }
{{- end }} {{ end -}}
{{/* if the location contains a rate limit annotation, create one */}} {{/* if the location contains a rate limit annotation, create one */}}
{{ $limits := buildRateLimit $location }} {{ $limits := buildRateLimit $location }}
{{- range $limit := $limits }} {{- range $limit := $limits }}
{{ $limit }}{{ end }} {{ $limit }}{{ end }}
{{ if $location.Auth.Secured }} {{- if $location.Auth.Secured }}
{{ if eq $location.Auth.Type "basic" }} {{- if eq $location.Auth.Type "basic" }}
auth_basic "{{ $location.Auth.Realm }}"; auth_basic "{{ $location.Auth.Realm }}";
auth_basic_user_file {{ $location.Auth.File }}; auth_basic_user_file {{ $location.Auth.File }};
{{ else }} {{ else }}
#TODO: add nginx-http-auth-digest module #TODO: add nginx-http-auth-digest module
auth_digest "{{ $location.Auth.Realm }}"; auth_digest "{{ $location.Auth.Realm }}";
auth_digest_user_file {{ $location.Auth.File }}; auth_digest_user_file {{ $location.Auth.File }};
{{ end }} {{ end -}}
proxy_set_header Authorization ""; proxy_set_header Authorization "";
{{- end }} {{ end -}}
{{ if $location.EnableCORS }} {{- if $location.EnableCORS }}
{{ template "CORS" }} {{ template "CORS" }}
{{ end }} {{ end -}}
proxy_set_header Host $host; proxy_set_header Host $host;
@ -282,16 +282,16 @@ http {
proxy_http_version 1.1; proxy_http_version 1.1;
{{/* rewrite only works if the content is not compressed */}} {{/* rewrite only works if the content is not compressed */}}
{{ if $location.Redirect.AddBaseURL -}} {{- if $location.Redirect.AddBaseURL }}
proxy_set_header Accept-Encoding ""; proxy_set_header Accept-Encoding "";
{{- end }} {{ end -}}
set $proxy_upstream_name "{{ $location.Upstream.Name }}"; set $proxy_upstream_name "{{ $location.Upstream.Name }}";
{{- buildProxyPass $location }} {{ buildProxyPass $location }}
} }
{{ end }} {{ end }}
{{ if eq $server.Name "_" }} {{- if eq $server.Name "_" }}
# health checks in cloud providers require the use of port 80 # health checks in cloud providers require the use of port 80
location {{ $cfg.HealthzURL }} { location {{ $cfg.HealthzURL }} {
access_log off; access_log off;
@ -307,7 +307,7 @@ http {
access_log off; access_log off;
stub_status on; stub_status on;
} }
{{ end }} {{ end -}}
{{ template "CUSTOM_ERRORS" $cfg }} {{ template "CUSTOM_ERRORS" $cfg }}
} }
{{ end }} {{ end }}

View file

@ -128,10 +128,11 @@ func (ngx Manager) testTemplate(cfg []byte) error {
return err return err
} }
defer tmpfile.Close() defer tmpfile.Close()
defer os.Remove(tmpfile.Name())
ioutil.WriteFile(tmpfile.Name(), cfg, 0644) ioutil.WriteFile(tmpfile.Name(), cfg, 0644)
if err := ngx.shellOut(fmt.Sprintf("nginx -t -c %v", tmpfile.Name())); err != nil { if err := ngx.shellOut(fmt.Sprintf("nginx -t -c %v", tmpfile.Name())); err != nil {
return fmt.Errorf("invalid nginx configuration: %v", err) return fmt.Errorf("invalid nginx configuration: %v", err)
} }
// in case of error do not remove temporal file
defer os.Remove(tmpfile.Name())
return nil return nil
} }