Add helper to verify if the configuration file changed

This commit is contained in:
Manuel de Brito Fontes 2016-03-15 12:31:39 -03:00
parent cad814cbb3
commit 5feb452ce4
8 changed files with 264 additions and 150 deletions

View file

@ -12,7 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM gcr.io/google_containers/nginx-slim:0.3
FROM gcr.io/google_containers/nginx-slim:0.4
RUN apt-get update && apt-get install -y \
diffutils \
--no-install-recommends \
&& rm -rf /var/lib/apt/lists/*
COPY nginx-third-party-lb /
COPY nginx.tmpl /

View file

@ -28,11 +28,11 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
@ -76,7 +76,6 @@ type loadBalancerController struct {
svcLister cache.StoreToServiceLister
configLister StoreToConfigMapLister
endpLister cache.StoreToEndpointsLister
recorder record.EventRecorder
stopCh chan struct{}
nginx *nginx.NginxManager
lbInfo *lbInfo
@ -229,9 +228,7 @@ func (lbc *loadBalancerController) registerHandlers() {
func (lbc *loadBalancerController) sync() {
ings := lbc.ingLister.Store.List()
upstreams, servers, update := lbc.updateNGINX(ings)
if update {
glog.V(2).Infof("syncing NGINX config")
upstreams, servers := lbc.getUpstreamServers(ings)
var kindAnnotations map[string]string
ngxCfgAnn, _ := annotations(kindAnnotations).getNginxConfig()
@ -244,16 +241,15 @@ func (lbc *loadBalancerController) sync() {
tcpServices := getTCPServices(lbc.client, tcpSvcAnn)
lbc.nginx.CheckAndReload(ngxConfig, upstreams, servers, tcpServices)
}
}
func (lbc *loadBalancerController) updateNGINX(data []interface{}) ([]nginx.Upstream, []nginx.Server, bool) {
func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]nginx.Upstream, []nginx.Server) {
pems := make(map[string]string)
upstreams := make(map[string]nginx.Upstream)
var servers []nginx.Server
servers := make(map[string]nginx.Server)
for _, ingIf := range data {
ing := ingIf.(extensions.Ingress)
ing := ingIf.(*extensions.Ingress)
for _, rule := range ing.Spec.Rules {
if rule.IngressRuleValue.HTTP == nil {
@ -261,47 +257,53 @@ func (lbc *loadBalancerController) updateNGINX(data []interface{}) ([]nginx.Upst
}
for _, path := range rule.HTTP.Paths {
name := ing.Namespace + "-" + path.Backend.ServiceName
name := ing.GetNamespace() + "-" + path.Backend.ServiceName
var ups nginx.Upstream
if existent, ok := upstreams[name]; ok {
ups = existent
} else {
ups := nginx.NewUpstreamWithDefaultServer(name)
upstreams[name] = ups
ups = nginx.NewUpstream(name)
}
svcKey := ing.Namespace + "/" + path.Backend.ServiceName
svcKey := ing.GetNamespace() + "/" + path.Backend.ServiceName
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
if err != nil {
glog.Infof("error getting service %v from the cache: %v", svcKey, err)
} else {
if svcExists {
continue
}
if !svcExists {
glog.Warningf("service %v does no exists", svcKey)
continue
}
svc := svcObj.(*api.Service)
if svc.Spec.ClusterIP != "None" && svc.Spec.ClusterIP != "" {
upsServer := nginx.UpstreamServer{Address: svc.Spec.ClusterIP, Port: path.Backend.ServicePort.String()}
ups.Backends = []nginx.UpstreamServer{upsServer}
} else if svc.Spec.ClusterIP == "None" {
endps, err := lbc.endpLister.GetServiceEndpoints(svc)
if err != nil {
glog.Infof("error getting endpoints for service %v from the cache: %v", svc, err)
} else {
upsServers := endpointsToUpstreamServers(endps, path.Backend.ServicePort.IntValue())
if len(upsServers) > 0 {
ups.Backends = upsServers
}
}
for _, servicePort := range svc.Spec.Ports {
if servicePort.Port == path.Backend.ServicePort.IntValue() {
endps := lbc.getEndpoints(svc, servicePort.TargetPort)
if len(endps) == 0 {
glog.Warningf("service %v does no have any active endpoints", svcKey)
}
ups.Backends = append(ups.Backends, endps...)
break
}
}
//upstreams[name] = append(upstreams[name], ups)
upstreams[name] = ups
}
}
for _, rule := range ing.Spec.Rules {
server := nginx.Server{Name: rule.Host}
var server nginx.Server
if existent, ok := servers[rule.Host]; ok {
server = existent
} else {
server = nginx.Server{Name: rule.Host}
}
if pemFile, ok := pems[rule.Host]; ok {
server.SSL = true
@ -313,7 +315,7 @@ func (lbc *loadBalancerController) updateNGINX(data []interface{}) ([]nginx.Upst
for _, path := range rule.HTTP.Paths {
loc := nginx.Location{Path: path.Path}
upsName := ing.GetName() + "-" + path.Backend.ServiceName
upsName := ing.GetNamespace() + "-" + path.Backend.ServiceName
for _, ups := range upstreams {
if upsName == ups.Name {
@ -323,33 +325,63 @@ func (lbc *loadBalancerController) updateNGINX(data []interface{}) ([]nginx.Upst
locations = append(locations, loc)
}
server.Locations = locations
servers = append(servers, server)
server.Locations = append(server.Locations, locations...)
servers[rule.Host] = server
}
}
uValues := make([]nginx.Upstream, 0, len(upstreams))
aUpstreams := make([]nginx.Upstream, 0, len(upstreams))
for _, value := range upstreams {
if len(value.Backends) == 0 {
value.Backends = append(value.Backends, nginx.NewDefaultServer())
}
sort.Sort(nginx.UpstreamServerByAddrPort(value.Backends))
uValues = append(uValues, value)
aUpstreams = append(aUpstreams, value)
}
sort.Sort(nginx.UpstreamByNameServers(uValues))
sort.Sort(nginx.UpstreamByNameServers(aUpstreams))
sort.Sort(nginx.ServerByNamePort(servers))
aServers := make([]nginx.Server, 0, len(servers))
for _, value := range servers {
sort.Sort(nginx.LocationByPath(value.Locations))
aServers = append(aServers, value)
}
sort.Sort(nginx.ServerByName(aServers))
return uValues, servers, true
return aUpstreams, aServers
}
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort intstr.IntOrString) []nginx.UpstreamServer {
ep, err := lbc.endpLister.GetServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return []nginx.UpstreamServer{}
}
func endpointsToUpstreamServers(endps api.Endpoints, servicePort int) []nginx.UpstreamServer {
var upsServers []nginx.UpstreamServer
for _, subset := range endps.Subsets {
for _, port := range subset.Ports {
if port.Port == servicePort {
for _, address := range subset.Addresses {
ups := nginx.UpstreamServer{Address: address.IP, Port: fmt.Sprintf("%v", servicePort)}
upsServers = append(upsServers, ups)
for _, ss := range ep.Subsets {
for _, epPort := range ss.Ports {
var targetPort int
switch servicePort.Type {
case intstr.Int:
if epPort.Port == servicePort.IntValue() {
targetPort = epPort.Port
}
break
case intstr.String:
if epPort.Name == servicePort.StrVal {
targetPort = epPort.Port
}
}
if targetPort == 0 {
continue
}
for _, epAddress := range ss.Addresses {
ups := nginx.UpstreamServer{Address: epAddress.IP, Port: fmt.Sprintf("%v", targetPort)}
upsServers = append(upsServers, ups)
}
}
}
@ -366,23 +398,27 @@ func (lbc *loadBalancerController) Stop() {
// Only try draining the workqueue if we haven't already.
if !lbc.shutdown {
close(lbc.stopCh)
glog.Infof("Shutting down controller queues")
glog.Infof("shutting down controller queues")
lbc.shutdown = true
}
}
// Run starts the loadbalancer controller.
func (lbc *loadBalancerController) Run() {
glog.Infof("Starting NGINX loadbalancer controller")
glog.Infof("starting NGINX loadbalancer controller")
go lbc.nginx.Start()
go lbc.registerHandlers()
go lbc.configController.Run(lbc.stopCh)
go lbc.ingController.Run(lbc.stopCh)
go lbc.endpController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)
// periodic check for changes in configuration
go wait.Until(lbc.sync, 5*time.Second, wait.NeverStop)
time.Sleep(5 * time.Second)
<-lbc.stopCh
glog.Infof("Shutting down NGINX loadbalancer controller")
glog.Infof("shutting down NGINX loadbalancer controller")
}

View file

@ -1,9 +1,3 @@
{{range $name, $upstream := .upstreams}}
upstream {{$upstream.Name}} {
{{range $server := $upstream.UpstreamServers}}
server {{$server.Address}}:{{$server.Port}};{{end}}
}{{end}}
{{ $cfg := .cfg }}{{ $sslCertificates := .sslCertificates }}{{ $defErrorSvc := .defErrorSvc }}{{ $defBackend := .defBackend }}
daemon off;
@ -133,14 +127,13 @@ http {
{{ end }}
{{ if $defErrorSvc }}
# Custom error pages using
# Custom error pages
proxy_intercept_errors on;
error_page 403 @custom_403;
error_page 404 @custom_404;
error_page 405 @custom_405;
error_page 408 @custom_408;
error_page 413 @custom_413;
error_page 500 @custom_500;
error_page 501 @custom_501;
error_page 502 @custom_502;
error_page 503 @custom_503;
@ -179,12 +172,7 @@ http {
#vhost_traffic_status_filter_by_host on;
location / {
set $upstream_host '';
set $upstream_port '';
access_by_lua_block {
require("ingress").content(ngx)
}
proxy_pass http://$upstream_host:$upstream_port$request_uri;
return 200;
}
{{ if $defErrorSvc }}{{ template "CUSTOM_ERRORS" (dict "cfg" $cfg "defErrorSvc" $defErrorSvc) }}{{ end }}
@ -195,7 +183,6 @@ http {
# TODO: support more than one certificate
server {
listen 443 ssl http2 default_server;
{{ range $sslCert := .sslCertificates }}{{ if $sslCert.Default }}
# default certificate in case no match
ssl_certificate "{{ $sslCert.Cert }}";
@ -210,29 +197,38 @@ http {
}
{{ end }}
{{range $name, $upstream := .upstreams}}
upstream {{$upstream.Name}} {
least_conn;
{{range $server := $upstream.Backends}}server {{$server.Address}}:{{$server.Port}};
{{end}}
}
{{end}}
{{ range $server := .servers }}
server {
listen 80;
{{ if $server.SSL }}
listen 443 ssl http2;
{{ if $server.SSL }}listen 443 ssl http2;
ssl_certificate {{ $server.SSLCertificate }};
ssl_certificate_key {{ $server.SSLCertificateKey }};
{{ end }}
ssl_certificate_key {{ $server.SSLCertificateKey }};{{ end }}
server_name {{ $server.Name }};
{{ if $server.SSL }}
if ($scheme = http) {
return 301 https://$host$request_uri;
}
{{ end }}
{{ range $location := $server.Locations }}
location {{ $location.Path }} {
proxy_set_header Host $host;
proxy_pass http://{{ $location.Upstream.Name }};
}{{ end }}
}{{ end }}
}
{{ end }}
{{ if $defErrorSvc }}{{ template "CUSTOM_ERRORS" (dict "cfg" $cfg "defErrorSvc" $defErrorSvc) }}{{ end }}
}
{{ end }}
# default server, including healthcheck
server {
@ -244,20 +240,6 @@ http {
return 200;
}
# route to get the current Ingress configuration used in ingress.lua
location /config {
content_by_lua_block {
require("ingress").config(ngx)
}
}
# route to post the list of Ingress rules to use.
location /update-ingress {
content_by_lua_block {
require("ingress").update_ingress(ngx)
}
}
location /health-check {
access_log off;
proxy_pass http://127.0.0.1:10249/healthz;
@ -320,12 +302,6 @@ stream {
}
}
location @custom_500 {
content_by_lua_block {
openURL(500, dev_error_url)
}
}
location @custom_501 {
content_by_lua_block {
openURL(501, dev_error_url)

View file

@ -30,7 +30,7 @@ const (
// Start starts a nginx (master process) and waits. If the process ends
// we need to kill the controller process and return the reason.
func (ngx *NginxManager) Start() {
glog.Info("Starting nginx...")
glog.Info("Starting NGINX process...")
cmd := exec.Command("nginx")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@ -43,7 +43,9 @@ func (ngx *NginxManager) Start() {
}
}
// Reload the master process receives the signal to reload configuration, it checks
// CheckAndReload verify if the nginx configuration changed and sends a reload
//
// the master process receives the signal to reload configuration, it checks
// the syntax validity of the new configuration file and tries to apply the
// configuration provided in it. If this is a success, the master process starts
// new worker processes and sends messages to old worker processes, requesting them
@ -58,13 +60,13 @@ func (ngx *NginxManager) CheckAndReload(cfg *nginxConfiguration, upstreams []Ups
newCfg, err := ngx.writeCfg(cfg, upstreams, servers, servicesL4)
if err != nil {
glog.Errorf("Failed to write new nginx configuration. Avoiding reload: %v", err)
glog.Errorf("failed to write new nginx configuration. Avoiding reload: %v", err)
return
}
if newCfg {
if err := ngx.shellOut("nginx -s reload"); err == nil {
glog.Info("Change in configuration detected. Reloading...")
glog.Info("change in configuration detected. Reloading...")
}
}
}
@ -74,7 +76,7 @@ func (ngx *NginxManager) CheckAndReload(cfg *nginxConfiguration, upstreams []Ups
func (ngx *NginxManager) shellOut(cmd string) error {
out, err := exec.Command("sh", "-c", cmd).CombinedOutput()
if err != nil {
glog.Errorf("Failed to execute %v: %v", cmd, string(out))
glog.Errorf("failed to execute %v: %v", cmd, string(out))
return err
}

View file

@ -23,6 +23,8 @@ import (
"sync"
"text/template"
"github.com/golang/glog"
"k8s.io/contrib/ingress/controllers/nginx-third-party/ssl"
"k8s.io/kubernetes/pkg/client/record"
@ -237,7 +239,7 @@ type NginxManager struct {
// defaultConfiguration returns the default configuration contained
// in the file default-conf.json
func newDefaultNginxCfg() *nginxConfiguration {
return &nginxConfiguration{
cfg := nginxConfiguration{
BodySize: bodySize,
ErrorLogLevel: errorLevel,
UseHTS: true,
@ -263,6 +265,12 @@ func newDefaultNginxCfg() *nginxConfiguration {
UseGzip: true,
WorkerProcesses: strconv.Itoa(runtime.NumCPU()),
}
if glog.V(5) {
cfg.ErrorLogLevel = "debug"
}
return &cfg
}
// NewManager ...

View file

@ -1,3 +1,19 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nginx
// NGINXController Updates NGINX configuration, starts and reloads NGINX
@ -20,6 +36,7 @@ type Upstream struct {
Backends []UpstreamServer
}
// UpstreamByNameServers Upstream sorter by name
type UpstreamByNameServers []Upstream
func (c UpstreamByNameServers) Len() int { return len(c) }
@ -34,6 +51,7 @@ type UpstreamServer struct {
Port string
}
// UpstreamServerByAddrPort UpstreamServer sorter by address and port
type UpstreamServerByAddrPort []UpstreamServer
func (c UpstreamServerByAddrPort) Len() int { return len(c) }
@ -59,11 +77,12 @@ type Server struct {
SSLCertificateKey string
}
type ServerByNamePort []Server
// ServerByName Server sorter by name
type ServerByName []Server
func (c ServerByNamePort) Len() int { return len(c) }
func (c ServerByNamePort) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c ServerByNamePort) Less(i, j int) bool {
func (c ServerByName) Len() int { return len(c) }
func (c ServerByName) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c ServerByName) Less(i, j int) bool {
return c[i].Name < c[j].Name
}
@ -73,20 +92,24 @@ type Location struct {
Upstream Upstream
}
type locByPathUpstream []Location
// LocationByPath Location sorter by path
type LocationByPath []Location
func (c locByPathUpstream) Len() int { return len(c) }
func (c locByPathUpstream) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c locByPathUpstream) Less(i, j int) bool {
func (c LocationByPath) Len() int { return len(c) }
func (c LocationByPath) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c LocationByPath) Less(i, j int) bool {
return c[i].Path < c[j].Path
}
// NewUpstreamWithDefaultServer creates an upstream with the default server.
// proxy_pass to an upstream with the default server returns 502.
// We use it for services that have no endpoints
func NewUpstreamWithDefaultServer(name string) Upstream {
// NewDefaultServer return an UpstreamServer to be use as default server returns 502.
func NewDefaultServer() UpstreamServer {
return UpstreamServer{Address: "127.0.0.1", Port: "8181"}
}
// NewUpstream creates an upstream without servers.
func NewUpstream(name string) Upstream {
return Upstream{
Name: name,
Backends: []UpstreamServer{UpstreamServer{Address: "127.0.0.1", Port: "8181"}},
Backends: []UpstreamServer{},
}
}

View file

@ -17,10 +17,10 @@ limitations under the License.
package nginx
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"text/template"
"github.com/fatih/structs"
@ -61,11 +61,6 @@ func (ngx *NginxManager) loadTemplate() {
}
func (ngx *NginxManager) writeCfg(cfg *nginxConfiguration, upstreams []Upstream, servers []Server, servicesL4 []Service) (bool, error) {
file, err := os.Create(ngx.ConfigFile)
if err != nil {
return false, err
}
fromMap := structs.Map(cfg)
toMap := structs.Map(ngx.defCfg)
curNginxCfg := merge(toMap, fromMap)
@ -86,7 +81,18 @@ func (ngx *NginxManager) writeCfg(cfg *nginxConfiguration, upstreams []Upstream,
conf["defErrorSvc"] = false
}
if glog.V(2) {
buffer := new(bytes.Buffer)
err := ngx.template.Execute(buffer, conf)
if err != nil {
return false, err
}
changed, err := checkChanges(ngx.ConfigFile, buffer)
if err != nil {
return false, err
}
if glog.V(3) {
b, err := json.Marshal(conf)
if err != nil {
fmt.Println("error:", err)
@ -94,10 +100,5 @@ func (ngx *NginxManager) writeCfg(cfg *nginxConfiguration, upstreams []Upstream,
glog.Infof("nginx configuration: %v", string(b))
}
err = ngx.template.Execute(file, conf)
if err != nil {
return false, err
}
return true, nil
return changed, nil
}

View file

@ -17,10 +17,13 @@ limitations under the License.
package nginx
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"reflect"
"strings"
@ -36,7 +39,7 @@ func (ngx *NginxManager) IsHealthy() error {
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("nginx status is unhealthy")
return fmt.Errorf("NGINX is unhealthy")
}
return nil
@ -67,7 +70,7 @@ func getDnsServers() []string {
}
}
glog.V(2).Infof("Nameservers to use: %v", nameservers)
glog.V(3).Infof("nameservers to use: %v", nameservers)
return nameservers
}
@ -81,8 +84,8 @@ func (ngx *NginxManager) ReadConfig(data string) (*nginxConfiguration, error) {
cfg := nginxConfiguration{}
err := json.Unmarshal([]byte(data), &cfg)
if err != nil {
glog.Errorf("Invalid json: %v", err)
return newDefaultNginxCfg(), fmt.Errorf("Invalid custom nginx configuration: %v", err)
glog.Errorf("invalid json: %v", err)
return newDefaultNginxCfg(), fmt.Errorf("invalid custom nginx configuration: %v", err)
}
return &cfg, nil
@ -116,3 +119,63 @@ func toMap(iface interface{}) (map[string]interface{}, bool) {
return map[string]interface{}{}, false
}
func checkChanges(filename string, data *bytes.Buffer) (bool, error) {
in, err := os.Open(filename)
if err != nil {
return false, err
}
src, err := ioutil.ReadAll(in)
in.Close()
if err != nil {
return false, err
}
res := data.Bytes()
if !bytes.Equal(src, res) {
err = ioutil.WriteFile(filename, res, 0644)
if err != nil {
return false, err
}
dData, err := diff(src, res)
if err != nil {
return false, fmt.Errorf("computing diff: %s", err)
}
if glog.V(2) {
glog.Infof("NGINX configuration diff a/%s b/%s\n", filename, filename)
glog.Infof("%v", string(dData))
}
return len(dData) > 0, nil
}
return false, nil
}
func diff(b1, b2 []byte) (data []byte, err error) {
f1, err := ioutil.TempFile("", "")
if err != nil {
return
}
defer os.Remove(f1.Name())
defer f1.Close()
f2, err := ioutil.TempFile("", "")
if err != nil {
return
}
defer os.Remove(f2.Name())
defer f2.Close()
f1.Write(b1)
f2.Write(b2)
data, err = exec.Command("diff", "-u", f1.Name(), f2.Name()).CombinedOutput()
if len(data) > 0 {
err = nil
}
return
}