Remove dns from nginx. Use upstreams for default backend service

This commit is contained in:
Manuel de Brito Fontes 2016-03-19 17:17:58 -03:00
parent 9b142b56f8
commit 28f9cb0b2b
15 changed files with 173 additions and 225 deletions

View file

@ -9,14 +9,13 @@ This is a nginx Ingress controller that uses [ConfigMap](https://github.com/kube
- nginx 1.9.x with [lua-nginx-module](https://github.com/openresty/lua-nginx-module) - nginx 1.9.x with [lua-nginx-module](https://github.com/openresty/lua-nginx-module)
- SSL support - SSL support
- custom ssl_dhparam (optional). Just mount a secret with a file named `dhparam.pem`. - custom ssl_dhparam (optional). Just mount a secret with a file named `dhparam.pem`.
- support for TCP services (flag `--tcp-services`) - support for TCP services (flag `--tcp-services-configmap`)
- custom nginx configuration using [ConfigMap](https://github.com/kubernetes/kubernetes/blob/master/docs/proposals/configmap.md) - custom nginx configuration using [ConfigMap](https://github.com/kubernetes/kubernetes/blob/master/docs/proposals/configmap.md)
- custom error pages. Using the flag `--custom-error-service` is possible to use a custom compatible [404-server](https://github.com/kubernetes/contrib/tree/master/404-server) image [nginx-error-server](https://github.com/aledbf/contrib/tree/nginx-debug-server/Ingress/images/nginx-error-server) that provides an additional `/errors` route that returns custom content for a particular error code. **This is completely optional** - custom error pages. Using the flag `--custom-error-service` is possible to use a custom compatible [404-server](https://github.com/kubernetes/contrib/tree/master/404-server) image [nginx-error-server](https://github.com/aledbf/contrib/tree/nginx-debug-server/Ingress/images/nginx-error-server) that provides an additional `/errors` route that returns custom content for a particular error code. **This is completely optional**
## Requirements ## Requirements
- default backend [404-server](https://github.com/kubernetes/contrib/tree/master/404-server) (or a custom compatible image) - default backend [404-server](https://github.com/kubernetes/contrib/tree/master/404-server) (or a custom compatible image)
- DNS must be operational and able to resolve default-http-backend.default.svc.cluster.local
## SSL ## SSL
@ -28,7 +27,7 @@ Currently Ingress does not support HTTPS. To bypass this the controller will che
First we need to deploy some application to publish. To keep this simple we will use the [echoheaders app]() that just returns information about the http request as output First we need to deploy some application to publish. To keep this simple we will use the [echoheaders app]() that just returns information about the http request as output
``` ```
kubectl run echoheaders --image=gcr.io/google_containers/echoserver:1.0 --replicas=1 --port=8080 kubectl run echoheaders --image=gcr.io/google_containers/echoserver:1.1 --replicas=1 --port=8080
``` ```
Now we expose the same application in two different services (so we can create different Ingress rules) Now we expose the same application in two different services (so we can create different Ingress rules)
@ -149,19 +148,6 @@ kubectl delete rc nginx-ingress-3rdpartycfg
kubectl create -f examples/rc-tcp.yaml kubectl create -f examples/rc-tcp.yaml
``` ```
Now we add the annotation to the replication controller that indicates with services should be exposed as TCP:
The annotation key is `nginx-ingress.kubernetes.io/tcpservices`. You can expose more than one service using comma as separator.
Each service must contain the namespace, service name and port to be use as public port
```
kubectl annotate rc nginx-ingress-3rdpartycfg "nginx-ingress.kubernetes.io/tcpservices=default/echoheaders-x:9000"
```
*Note:* the only reason to remove and create a new rc is that we cannot open new ports dynamically once the pod is running.
Once we run the `kubectl annotate` command nginx will reload.
Now we can test the new service: Now we can test the new service:
``` ```
$ (sleep 1; echo "GET / HTTP/1.1"; echo "Host: 172.17.4.99:9000"; echo;echo;sleep 2) | telnet 172.17.4.99 9000 $ (sleep 1; echo "GET / HTTP/1.1"; echo "Host: 172.17.4.99:9000"; echo;echo;sleep 2) | telnet 172.17.4.99 9000

View file

@ -55,13 +55,7 @@ const (
// 2. Exposing the service ports as node ports on a pod. // 2. Exposing the service ports as node ports on a pod.
// 3. Adding firewall rules so these ports can ingress traffic. // 3. Adding firewall rules so these ports can ingress traffic.
// Comma separated list of tcp/https defUpstreamName = "upstream-default-backend"
// namespace/serviceName:portToExport pairings. This assumes you've opened up the right
// hostPorts for each service that serves ingress traffic. Te value of portToExport indicates the
// port to listen inside nginx, not the port of the service.
lbTCPServices = "tcpservices"
k8sAnnotationPrefix = "nginx-ingress.kubernetes.io"
) )
// loadBalancerController watches the kubernetes api and adds/removes services // loadBalancerController watches the kubernetes api and adds/removes services
@ -79,6 +73,9 @@ type loadBalancerController struct {
stopCh chan struct{} stopCh chan struct{}
nginx *nginx.NginxManager nginx *nginx.NginxManager
lbInfo *lbInfo lbInfo *lbInfo
nxgConfigMap string
tcpConfigMap string
// stopLock is used to enforce only a single call to Stop is active. // stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and // Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces. // allowing concurrent stoppers leads to stack traces.
@ -86,25 +83,16 @@ type loadBalancerController struct {
shutdown bool shutdown bool
} }
type annotations map[string]string
func (a annotations) getNginxConfig() (string, bool) {
val, ok := a[fmt.Sprintf("%v/%v", k8sAnnotationPrefix, lbConfigName)]
return val, ok
}
func (a annotations) getTCPServices() (string, bool) {
val, ok := a[fmt.Sprintf("%v/%v", k8sAnnotationPrefix, lbTCPServices)]
return val, ok
}
// newLoadBalancerController creates a controller for nginx loadbalancer // newLoadBalancerController creates a controller for nginx loadbalancer
func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Duration, defaultSvc, customErrorSvc nginx.Service, namespace string, lbInfo *lbInfo) (*loadBalancerController, error) { func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Duration, defaultSvc nginx.Service,
namespace, nxgConfigMapName, tcpConfigMapName string, lbInfo *lbInfo) (*loadBalancerController, error) {
lbc := loadBalancerController{ lbc := loadBalancerController{
client: kubeClient, client: kubeClient,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
lbInfo: lbInfo, lbInfo: lbInfo,
nginx: nginx.NewManager(kubeClient, defaultSvc, customErrorSvc), nginx: nginx.NewManager(kubeClient, defaultSvc),
nxgConfigMap: nxgConfigMapName,
tcpConfigMap: tcpConfigMapName,
} }
lbc.ingLister.Store, lbc.ingController = framework.NewInformer( lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
@ -207,6 +195,14 @@ func endpointsWatchFunc(c *client.Client, ns string) func(options api.ListOption
} }
} }
func (lbc *loadBalancerController) getConfigMap(name string) (api.ConfigMap, error) {
return lbc.client.ConfigMaps(lbc.lbInfo.PodNamespace).Get(name)
}
func (lbc *loadBalancerController) getTCPConfigMap(name string) (api.ConfigMap, error) {
return lbc.client.ConfigMaps(lbc.lbInfo.PodNamespace).Get(name)
}
func (lbc *loadBalancerController) registerHandlers() { func (lbc *loadBalancerController) registerHandlers() {
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if err := lbc.nginx.IsHealthy(); err != nil { if err := lbc.nginx.IsHealthy(); err != nil {
@ -230,15 +226,14 @@ func (lbc *loadBalancerController) sync() {
ings := lbc.ingLister.Store.List() ings := lbc.ingLister.Store.List()
upstreams, servers := lbc.getUpstreamServers(ings) upstreams, servers := lbc.getUpstreamServers(ings)
var kindAnnotations map[string]string cfg, err := lbc.getConfigMap(lbc.nxgConfigMap)
ngxCfgAnn, _ := annotations(kindAnnotations).getNginxConfig()
tcpSvcAnn, _ := annotations(kindAnnotations).getTCPServices() ngxConfig, err := lbc.nginx.ReadConfig("")
ngxConfig, err := lbc.nginx.ReadConfig(ngxCfgAnn)
if err != nil { if err != nil {
glog.Warningf("%v", err) glog.Warningf("%v", err)
} }
tcpServices := getTCPServices(lbc.client, tcpSvcAnn) tcpServices := lbc.getTCPServices()
lbc.nginx.CheckAndReload(ngxConfig, upstreams, servers, tcpServices) lbc.nginx.CheckAndReload(ngxConfig, upstreams, servers, tcpServices)
} }
@ -246,6 +241,8 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
upstreams := lbc.createUpstreams(data) upstreams := lbc.createUpstreams(data)
servers := lbc.createServers(data) servers := lbc.createServers(data)
//TODO: add default backend upstream
for _, ingIf := range data { for _, ingIf := range data {
ing := ingIf.(*extensions.Ingress) ing := ingIf.(*extensions.Ingress)
@ -326,6 +323,7 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
func (lbc *loadBalancerController) createUpstreams(data []interface{}) map[string]*nginx.Upstream { func (lbc *loadBalancerController) createUpstreams(data []interface{}) map[string]*nginx.Upstream {
upstreams := make(map[string]*nginx.Upstream) upstreams := make(map[string]*nginx.Upstream)
upstreams[defUpstreamName] = nginx.NewUpstream(defUpstreamName)
for _, ingIf := range data { for _, ingIf := range data {
ing := ingIf.(*extensions.Ingress) ing := ingIf.(*extensions.Ingress)

View file

@ -9,7 +9,7 @@ spec:
name: nginx-ingress-lb name: nginx-ingress-lb
spec: spec:
containers: containers:
- image: gcr.io/google_containers/nginx-third-party::0.3 - image: gcr.io/google_containers/nginx-third-party::0.4
name: nginx-ingress-lb name: nginx-ingress-lb
imagePullPolicy: Always imagePullPolicy: Always
livenessProbe: livenessProbe:

View file

@ -15,7 +15,7 @@ spec:
name: nginx-ingress-lb name: nginx-ingress-lb
spec: spec:
containers: containers:
- image: gcr.io/google_containers/nginx-third-party:0.3 - image: gcr.io/google_containers/nginx-third-party:0.4
name: nginx-ingress-lb name: nginx-ingress-lb
imagePullPolicy: Always imagePullPolicy: Always
livenessProbe: livenessProbe:

View file

@ -28,7 +28,7 @@ spec:
secret: secret:
secretName: dhparam-example secretName: dhparam-example
containers: containers:
- image: gcr.io/google_containers/nginx-third-party:0.3 - image: gcr.io/google_containers/nginx-third-party:0.4
name: nginx-ingress-lb name: nginx-ingress-lb
imagePullPolicy: Always imagePullPolicy: Always
livenessProbe: livenessProbe:

View file

@ -23,7 +23,7 @@ spec:
secret: secret:
secretName: secret-echoheaders-1 secretName: secret-echoheaders-1
containers: containers:
- image: gcr.io/google_containers/nginx-third-party:0.3 - image: gcr.io/google_containers/nginx-third-party:0.4
name: nginx-ingress-lb name: nginx-ingress-lb
imagePullPolicy: Always imagePullPolicy: Always
livenessProbe: livenessProbe:

View file

@ -15,7 +15,7 @@ spec:
name: nginx-ingress-lb name: nginx-ingress-lb
spec: spec:
containers: containers:
- image: gcr.io/google_containers/nginx-third-party:0.3 - image: gcr.io/google_containers/nginx-third-party:0.4
name: nginx-ingress-lb name: nginx-ingress-lb
imagePullPolicy: Always imagePullPolicy: Always
livenessProbe: livenessProbe:

View file

@ -1,13 +1,14 @@
http = require "resty.http" http = require "resty.http"
def_backend = "http://upstream-default-backend"
function openURL(status, page) function openURL(status)
local httpc = http.new() local httpc = http.new()
local res, err = httpc:request_uri(page, { local res, err = httpc:request_uri(def_backend, {
path = "/", path = "/",
method = "GET", method = "GET",
headers = { headers = {
["Content-Type"] = ngx.var.httpReturnType or "text/html", ["Content-Type"] = ngx.var.httpAccept or "html",
} }
}) })
@ -17,7 +18,6 @@ function openURL(status, page)
end end
ngx.status = tonumber(status) ngx.status = tonumber(status)
ngx.header["Content-Type"] = ngx.var.httpReturnType or "text/plain"
if ngx.var.http_cookie then if ngx.var.http_cookie then
ngx.header["Cookie"] = ngx.var.http_cookie ngx.header["Cookie"] = ngx.var.http_cookie
end end

View file

@ -41,10 +41,12 @@ var (
namespace/name. The controller uses the first node port of this Service for namespace/name. The controller uses the first node port of this Service for
the default backend.`) the default backend.`)
customErrorSvc = flags.String("custom-error-service", "", tcpConfigMapName = flags.String("tcp-services-configmap", "",
`Service used that will receive the errors from nginx and serve a custom error page. `Name of the ConfigMap that containes the definition of the TCP services to expose.
Takes the form namespace/name. The controller uses the first node port of this Service The key in the map indicates the external port to be used. The value is the name of the
for the backend.`) service with the format namespace/serviceName and the port of the service could be a number of the
name of the port.
The ports 80 and 443 are not allowed as external ports. This ports are reserved for nginx`)
resyncPeriod = flags.Duration("sync-period", 30*time.Second, resyncPeriod = flags.Duration("sync-period", 30*time.Second,
`Relist and confirm cloud resources this often.`) `Relist and confirm cloud resources this often.`)
@ -69,13 +71,13 @@ func main() {
} }
lbInfo, _ := getLBDetails(kubeClient) lbInfo, _ := getLBDetails(kubeClient)
defSvc, err := getService(kubeClient, *defaultSvc)
if err != nil {
glog.Fatalf("no default backend service found: %v", err)
}
defError, _ := getService(kubeClient, *customErrorSvc)
lbc, err := newLoadBalancerController(kubeClient, *resyncPeriod, defSvc, defError, *watchNamespace, lbInfo) err = isValidService(kubeClient, *defaultSvc)
if err != nil {
glog.Fatalf("no service with name %v found: %v", *defaultSvc, err)
}
lbc, err := newLoadBalancerController(kubeClient, *resyncPeriod, *defaultSvc, *watchNamespace, *nxgConfigMap, *tcpConfigMapName, lbInfo)
if err != nil { if err != nil {
glog.Fatalf("%v", err) glog.Fatalf("%v", err)
} }

View file

@ -1,4 +1,4 @@
{{ $cfg := .cfg }}{{ $defErrorSvc := .defErrorSvc }}{{ $defBackend := .defBackend }} {{ $cfg := .cfg }}
daemon off; daemon off;
worker_processes {{ $cfg.WorkerProcesses }}; worker_processes {{ $cfg.WorkerProcesses }};
@ -14,17 +14,9 @@ events {
http { http {
#vhost_traffic_status_zone shared:vhost_traffic_status:10m; #vhost_traffic_status_zone shared:vhost_traffic_status:10m;
# lus sectrion to return proper error codes when custom pages are used
lua_package_path '.?.lua;./etc/nginx/lua/?.lua;/etc/nginx/lua/vendor/lua-resty-http/lib/?.lua;'; lua_package_path '.?.lua;./etc/nginx/lua/?.lua;/etc/nginx/lua/vendor/lua-resty-http/lib/?.lua;';
init_by_lua_block {
init_by_lua_block {
def_backend = "http://{{ $defBackend.ServiceName }}.{{ $defBackend.Namespace }}.svc.cluster.local:{{ $defBackend.ServicePort }}"
{{ if $defErrorSvc }}{{/* only if exists a custom error service */}}
dev_error_url = "http://{{ $defErrorSvc.ServiceName }}.{{ $defErrorSvc.Namespace }}.svc.cluster.local:{{ $defErrorSvc.ServicePort }}"
{{ else }}
dev_error_url = def_backend
{{ end }}
require("error_page") require("error_page")
} }
@ -58,7 +50,7 @@ http {
{{ end }} {{ end }}
log_format upstreaminfo '{{ if $cfg.UseProxyProtocol }}$proxy_protocol_addr{{ else }}$remote_addr{{ end }} - ' log_format upstreaminfo '{{ if $cfg.UseProxyProtocol }}$proxy_protocol_addr{{ else }}$remote_addr{{ end }} - '
'$remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" ' '[$proxy_add_x_forwarded_for] - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" '
'$request_length $request_time $upstream_addr $upstream_response_length $upstream_response_time $upstream_status'; '$request_length $request_time $upstream_addr $upstream_response_length $upstream_response_time $upstream_status';
access_log /var/log/nginx/access.log upstreaminfo; access_log /var/log/nginx/access.log upstreaminfo;
@ -126,7 +118,6 @@ http {
ssl_dhparam {{ .sslDHParam }}; ssl_dhparam {{ .sslDHParam }};
{{ end }} {{ end }}
{{ if $defErrorSvc }}
# Custom error pages # Custom error pages
proxy_intercept_errors on; proxy_intercept_errors on;
error_page 403 @custom_403; error_page 403 @custom_403;
@ -138,7 +129,6 @@ http {
error_page 502 @custom_502; error_page 502 @custom_502;
error_page 503 @custom_503; error_page 503 @custom_503;
error_page 504 @custom_504; error_page 504 @custom_504;
{{ end }}
# Reverse Proxy configuration # Reverse Proxy configuration
# pass original Host header # pass original Host header
@ -152,30 +142,29 @@ http {
proxy_set_header X-Forwarded-Port $http_x_forwarded_port; proxy_set_header X-Forwarded-Port $http_x_forwarded_port;
proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout {{ .cfg.ProxyConnectTimeout }}s; proxy_connect_timeout {{ .cfg.ProxyConnectTimeout }}s;
proxy_send_timeout {{ .cfg.ProxySendTimeout }}s; proxy_send_timeout {{ .cfg.ProxySendTimeout }}s;
proxy_read_timeout {{ .cfg.ProxyReadTimeout }}s; proxy_read_timeout {{ .cfg.ProxyReadTimeout }}s;
proxy_buffering off; proxy_buffering off;
proxy_http_version 1.1; proxy_http_version 1.1;
# Allow websocket connections # Allow websocket connections
proxy_set_header Upgrade $http_upgrade; proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade; proxy_set_header Connection $connection_upgrade;
# In case of errors try the next upstream server before returning an error # In case of errors try the next upstream server before returning an error
proxy_next_upstream error timeout http_502 http_503 http_504; proxy_next_upstream error timeout http_501 http_502 http_503 http_504;
server { server {
listen 80 default_server{{ if $cfg.UseProxyProtocol }} proxy_protocol{{ end }}; listen 80 default_server{{ if $cfg.UseProxyProtocol }} proxy_protocol{{ end }};
#vhost_traffic_status_filter_by_host on;
location / { location / {
return 200; return 200;
} }
{{ if $defErrorSvc }}{{ template "CUSTOM_ERRORS" (dict "cfg" $cfg "defErrorSvc" $defErrorSvc) }}{{ end }} {{ template "CUSTOM_ERRORS" $cfg }}
} }
{{range $name, $upstream := .upstreams}} {{range $name, $upstream := .upstreams}}
@ -206,7 +195,7 @@ http {
} }
{{ end }} {{ end }}
{{ if $defErrorSvc }}{{ template "CUSTOM_ERRORS" (dict "cfg" $cfg "defErrorSvc" $defErrorSvc) }}{{ end }} {{ template "CUSTOM_ERRORS" $cfg }}
} }
{{ end }} {{ end }}
@ -233,9 +222,9 @@ http {
} }
location / { location / {
proxy_pass http://{{ $defBackend.ServiceName }}.{{ $defBackend.Namespace }}.svc.cluster.local:{{ $defBackend.ServicePort }}; proxy_pass http://upstream-default-backend;
} }
{{ if $defErrorSvc }}{{ template "CUSTOM_ERRORS" (dict "cfg" $cfg "defErrorSvc" $defErrorSvc) }}{{ end }} {{ template "CUSTOM_ERRORS" $cfg }}
} }
# default server for services without endpoints # default server for services without endpoints
@ -244,7 +233,7 @@ http {
location / { location / {
content_by_lua_block { content_by_lua_block {
openURL(503, dev_error_url) openURL(503)
} }
} }
} }
@ -252,12 +241,20 @@ http {
# TCP services # TCP services
stream { stream {
{{range $tcpSvc := .tcpServices }} {{ range $name, $upstream := .tcpUpstreams }}
upstream tcp-{{ $upstream.Name }} {
least_conn;
{{ range $server := $upstream.Backends }}server {{ $server.Address }}:{{ $server.Port }};
{{ end }}
}
{{ end }}
{{ range $tcpSvc := .tcpServices }}
server { server {
listen {{ $tcpSvc.ExposedPort }}; listen {{ $tcpSvc.ExposedPort }};
proxy_connect_timeout {{ $cfg.ProxyConnectTimeout }}s; proxy_connect_timeout {{ $cfg.ProxyConnectTimeout }}s;
proxy_timeout {{ $cfg.ProxyReadTimeout }}s; proxy_timeout {{ $cfg.ProxyReadTimeout }}s;
proxy_pass {{ $tcpSvc.ServiceName }}.{{ $tcpSvc.Namespace }}.svc.cluster.local:{{ $tcpSvc.ServicePort }}; proxy_pass {{ $tcpSvc.Namespace }}-{{ $tcpSvc.ServiceName }}:{{ $tcpSvc.ServicePort }};
} }
{{ end }} {{ end }}
} }
@ -266,55 +263,55 @@ stream {
{{ define "CUSTOM_ERRORS" }} {{ define "CUSTOM_ERRORS" }}
location @custom_403 { location @custom_403 {
content_by_lua_block { content_by_lua_block {
openURL(403, dev_error_url) openURL(403)
} }
} }
location @custom_404 { location @custom_404 {
content_by_lua_block { content_by_lua_block {
openURL(404, dev_error_url) openURL(404)
} }
} }
location @custom_405 { location @custom_405 {
content_by_lua_block { content_by_lua_block {
openURL(405, dev_error_url) openURL(405)
} }
} }
location @custom_408 { location @custom_408 {
content_by_lua_block { content_by_lua_block {
openURL(408, dev_error_url) openURL(408)
} }
} }
location @custom_413 { location @custom_413 {
content_by_lua_block { content_by_lua_block {
openURL(413, dev_error_url) openURL(413)
} }
} }
location @custom_501 { location @custom_501 {
content_by_lua_block { content_by_lua_block {
openURL(501, dev_error_url) openURL(501)
} }
} }
location @custom_502 { location @custom_502 {
content_by_lua_block { content_by_lua_block {
openURL(502, dev_error_url) openURL(502)
} }
} }
location @custom_503 { location @custom_503 {
content_by_lua_block { content_by_lua_block {
openURL(503, dev_error_url) openURL(503)
} }
} }
location @custom_504 { location @custom_504 {
content_by_lua_block { content_by_lua_block {
openURL(504, dev_error_url) openURL(504)
} }
} }
{{ end }} {{ end }}

View file

@ -54,7 +54,7 @@ func (ngx *NginxManager) Start() {
// shut down, stop accepting new connections and continue to service current requests // shut down, stop accepting new connections and continue to service current requests
// until all such requests are serviced. After that, the old worker processes exit. // until all such requests are serviced. After that, the old worker processes exit.
// http://nginx.org/en/docs/beginners_guide.html#control // http://nginx.org/en/docs/beginners_guide.html#control
func (ngx *NginxManager) CheckAndReload(cfg *nginxConfiguration, upstreams []*Upstream, servers []*Server, servicesL4 []Service) { func (ngx *NginxManager) CheckAndReload(cfg *nginxConfiguration, upstreams []*Upstream, servers []*Server, servicesL4 []*Upstream) {
ngx.reloadLock.Lock() ngx.reloadLock.Lock()
defer ngx.reloadLock.Unlock() defer ngx.reloadLock.Unlock()

View file

@ -212,9 +212,7 @@ type Service struct {
// NginxManager ... // NginxManager ...
type NginxManager struct { type NginxManager struct {
defBackend Service
defCfg *nginxConfiguration defCfg *nginxConfiguration
defError Service
defResolver string defResolver string
// path to the configuration file to be used by nginx // path to the configuration file to be used by nginx
@ -272,12 +270,10 @@ func newDefaultNginxCfg() *nginxConfiguration {
} }
// NewManager ... // NewManager ...
func NewManager(kubeClient *client.Client, defaultSvc, customErrorSvc Service) *NginxManager { func NewManager(kubeClient *client.Client) *NginxManager {
ngx := &NginxManager{ ngx := &NginxManager{
ConfigFile: "/etc/nginx/nginx.conf", ConfigFile: "/etc/nginx/nginx.conf",
defBackend: defaultSvc,
defCfg: newDefaultNginxCfg(), defCfg: newDefaultNginxCfg(),
defError: customErrorSvc,
defResolver: strings.Join(getDnsServers(), " "), defResolver: strings.Join(getDnsServers(), " "),
reloadLock: &sync.Mutex{}, reloadLock: &sync.Mutex{},
} }

View file

@ -19,12 +19,12 @@ package nginx
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"text/template" "text/template"
"github.com/fatih/structs" "github.com/fatih/structs"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/imdario/mergo"
) )
var funcMap = template.FuncMap{ var funcMap = template.FuncMap{
@ -36,20 +36,6 @@ var funcMap = template.FuncMap{
return true return true
}, },
"dict": func(values ...interface{}) (map[string]interface{}, error) {
if len(values)%2 != 0 {
return nil, errors.New("invalid dict call")
}
dict := make(map[string]interface{}, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, ok := values[i].(string)
if !ok {
return nil, errors.New("dict keys must be strings")
}
dict[key] = values[i+1]
}
return dict, nil
},
} }
func (ngx *NginxManager) loadTemplate() { func (ngx *NginxManager) loadTemplate() {
@ -57,26 +43,19 @@ func (ngx *NginxManager) loadTemplate() {
ngx.template = tmpl ngx.template = tmpl
} }
func (ngx *NginxManager) writeCfg(cfg *nginxConfiguration, upstreams []*Upstream, servers []*Server, servicesL4 []Service) (bool, error) { func (ngx *NginxManager) writeCfg(cfg *nginxConfiguration, upstreams []*Upstream, servers []*Server, tcpUpstreams []*Upstream) (bool, error) {
fromMap := structs.Map(cfg) fromMap := structs.Map(cfg)
toMap := structs.Map(ngx.defCfg) toMap := structs.Map(ngx.defCfg)
curNginxCfg := merge(toMap, fromMap) curNginxCfg := mergo.MergeWithOverwrite(toMap, fromMap)
conf := make(map[string]interface{}) conf := make(map[string]interface{})
conf["upstreams"] = upstreams conf["upstreams"] = upstreams
conf["servers"] = servers conf["servers"] = servers
conf["tcpServices"] = servicesL4 conf["tcpUpstreams"] = tcpUpstreams
conf["defBackend"] = ngx.defBackend
conf["defResolver"] = ngx.defResolver conf["defResolver"] = ngx.defResolver
conf["sslDHParam"] = ngx.sslDHParam conf["sslDHParam"] = ngx.sslDHParam
conf["cfg"] = curNginxCfg conf["cfg"] = curNginxCfg
if ngx.defError.ServiceName != "" {
conf["defErrorSvc"] = ngx.defError
} else {
conf["defErrorSvc"] = false
}
buffer := new(bytes.Buffer) buffer := new(bytes.Buffer)
err := ngx.template.Execute(buffer, conf) err := ngx.template.Execute(buffer, conf)
if err != nil { if err != nil {
@ -93,7 +72,7 @@ func (ngx *NginxManager) writeCfg(cfg *nginxConfiguration, upstreams []*Upstream
if err != nil { if err != nil {
fmt.Println("error:", err) fmt.Println("error:", err)
} }
glog.Infof("nginx configuration: %v", string(b)) glog.Infof("NGINX configuration: %v", string(b))
} }
return changed, nil return changed, nil

View file

@ -24,10 +24,12 @@ import (
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
"reflect"
"strings" "strings"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/imdario/mergo"
"k8s.io/kubernetes/pkg/api"
) )
// IsHealthy checks if nginx is running // IsHealthy checks if nginx is running
@ -76,48 +78,22 @@ func getDnsServers() []string {
// ReadConfig obtains the configuration defined by the user or returns the default if it does not // ReadConfig obtains the configuration defined by the user or returns the default if it does not
// exists or if is not a well formed json object // exists or if is not a well formed json object
func (ngx *NginxManager) ReadConfig(data string) (*nginxConfiguration, error) { func (ngx *NginxManager) ReadConfig(config *api.ConfigMap) (*nginxConfiguration, error) {
if data == "" { if len(config.Data) == 0 {
return newDefaultNginxCfg(), nil return newDefaultNginxCfg(), nil
} }
cfg := nginxConfiguration{} cfg := newDefaultNginxCfg()
err := json.Unmarshal([]byte(data), &cfg)
data, err := json.Marshal(config.Data)
if err != nil { if err != nil {
glog.Errorf("invalid json: %v", err) err = mergo.Merge(cfg, data)
return newDefaultNginxCfg(), fmt.Errorf("invalid custom nginx configuration: %v", err) if err != nil {
} return cfg, nil
return &cfg, nil
}
func merge(dst, src map[string]interface{}) map[string]interface{} {
for key, srcVal := range src {
if dstVal, ok := dst[key]; ok {
srcMap, srcMapOk := toMap(srcVal)
dstMap, dstMapOk := toMap(dstVal)
if srcMapOk && dstMapOk {
srcVal = merge(dstMap, srcMap)
}
} }
dst[key] = srcVal
} }
return dst return cfg, nil
}
func toMap(iface interface{}) (map[string]interface{}, bool) {
value := reflect.ValueOf(iface)
if value.Kind() == reflect.Map {
m := map[string]interface{}{}
for _, k := range value.MapKeys() {
m[k.String()] = value.MapIndex(k).Interface()
}
return m, true
}
return map[string]interface{}{}, false
} }
func (ngx *NginxManager) needsReload(data *bytes.Buffer) (bool, error) { func (ngx *NginxManager) needsReload(data *bytes.Buffer) (bool, error) {

View file

@ -102,79 +102,93 @@ func getLBDetails(kubeClient *unversioned.Client) (rc *lbInfo, err error) {
} }
} }
func getService(kubeClient *unversioned.Client, name string) (nginx.Service, error) { func isValidService(kubeClient *unversioned.Client, name string) error {
if name == "" { if name == "" {
return nginx.Service{}, fmt.Errorf("Empty string is not a valid service name") return fmt.Errorf("Empty string is not a valid service name")
} }
parts := strings.Split(name, "/") parts := strings.Split(name, "/")
if len(parts) != 2 { if len(parts) != 2 {
glog.Fatalf("Please check the service format (namespace/name) in service %v", name) return fmt.Errorf("Invalid name format (namespace/name) in service '%v'", name)
} }
defaultPort, err := getServicePorts(kubeClient, parts[0], parts[1]) _, err = kubeClient.Services(parts[0]).Get(parts[1])
if err != nil { return err
return nginx.Service{}, fmt.Errorf("Error obtaining service %v: %v", name, err)
}
return nginx.Service{
ServiceName: parts[1],
ServicePort: defaultPort[0], //TODO: which port?
Namespace: parts[0],
}, nil
} }
// getServicePorts returns the ports defined in a service spec // func getService(kubeClient *unversioned.Client, name string) (nginx.Service, error) {
func getServicePorts(kubeClient *unversioned.Client, ns, name string) (ports []string, err error) { // if name == "" {
var svc *api.Service // return nginx.Service{}, fmt.Errorf("Empty string is not a valid service name")
glog.Infof("Checking service %v/%v", ns, name) // }
svc, err = kubeClient.Services(ns).Get(name)
if err != nil {
return
}
for _, p := range svc.Spec.Ports { // parts := strings.Split(name, "/")
if p.Port != 0 { // if len(parts) != 2 {
ports = append(ports, strconv.Itoa(p.Port)) // glog.Fatalf("Please check the service format (namespace/name) in service %v", name)
break // }
}
}
glog.Infof("Ports for %v/%v : %v", ns, name, ports) // defaultPort, err := getServicePorts(kubeClient, parts[0], parts[1])
// if err != nil {
// return nginx.Service{}, fmt.Errorf("Error obtaining service %v: %v", name, err)
// }
return // return nginx.Service{
} // ServiceName: parts[1],
// ServicePort: defaultPort[0], //TODO: which port?
// Namespace: parts[0],
// }, nil
// }
func getTCPServices(kubeClient *unversioned.Client, tcpServices string) []nginx.Service { // // getServicePorts returns the ports defined in a service spec
svcs := []nginx.Service{} // func getServicePorts(kubeClient *unversioned.Client, ns, name string) (ports []string, err error) {
for _, svc := range strings.Split(tcpServices, ",") { // var svc *api.Service
if svc == "" { // glog.Infof("Checking service %v/%v", ns, name)
continue // svc, err = kubeClient.Services(ns).Get(name)
} // if err != nil {
// return
// }
namePort := strings.Split(svc, ":") // for _, p := range svc.Spec.Ports {
if len(namePort) == 2 { // if p.Port != 0 {
tcpSvc, err := getService(kubeClient, namePort[0]) // ports = append(ports, strconv.Itoa(p.Port))
if err != nil { // break
glog.Errorf("%s", err) // }
continue // }
}
// the exposed TCP service cannot use 80 or 443 as ports // glog.Infof("Ports for %v/%v : %v", ns, name, ports)
if namePort[1] == httpPort || namePort[1] == httpsPort {
glog.Errorf("The TCP service %v cannot use ports 80 or 443 (it creates a conflict with nginx)", svc)
continue
}
tcpSvc.ExposedPort = namePort[1] // return
svcs = append(svcs, tcpSvc) // }
} else {
glog.Errorf("TCP services should take the form namespace/name:port not %v from %v", namePort, svc)
}
}
return svcs // func getTCPServices(kubeClient *unversioned.Client, tcpServices string) []nginx.Service {
} // svcs := []nginx.Service{}
// for _, svc := range strings.Split(tcpServices, ",") {
// if svc == "" {
// continue
// }
// namePort := strings.Split(svc, ":")
// if len(namePort) == 2 {
// tcpSvc, err := getService(kubeClient, namePort[0])
// if err != nil {
// glog.Errorf("%s", err)
// continue
// }
// // the exposed TCP service cannot use 80 or 443 as ports
// if namePort[1] == httpPort || namePort[1] == httpsPort {
// glog.Errorf("The TCP service %v cannot use ports 80 or 443 (it creates a conflict with nginx)", svc)
// continue
// }
// tcpSvc.ExposedPort = namePort[1]
// svcs = append(svcs, tcpSvc)
// } else {
// glog.Errorf("TCP services should take the form namespace/name:port not %v from %v", namePort, svc)
// }
// }
// return svcs
// }
func isHostValid(host string, cns []string) bool { func isHostValid(host string, cns []string) bool {
for _, cn := range cns { for _, cn := range cns {