ingress-nginx-helm/internal/ingress/controller/nginx.go

1198 lines
32 KiB
Go
Raw Normal View History

/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
2017-10-06 20:33:32 +00:00
package controller
import (
"bytes"
"encoding/json"
2016-11-29 01:39:17 +00:00
"errors"
"fmt"
"io/ioutil"
2016-11-29 01:39:17 +00:00
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
2019-08-15 18:57:51 +00:00
"reflect"
"strconv"
"strings"
2017-11-05 01:18:28 +00:00
"sync"
2016-11-29 01:39:17 +00:00
"syscall"
"text/template"
2016-11-29 01:39:17 +00:00
"time"
proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels"
2017-09-17 18:42:31 +00:00
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
2017-11-05 01:18:28 +00:00
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
2019-05-18 10:08:05 +00:00
"k8s.io/klog"
adm_controler "k8s.io/ingress-nginx/internal/admission/controller"
"k8s.io/ingress-nginx/internal/file"
2017-11-07 22:02:12 +00:00
"k8s.io/ingress-nginx/internal/ingress"
2019-03-11 15:57:28 +00:00
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
2017-11-07 22:02:12 +00:00
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/ingress/controller/process"
"k8s.io/ingress-nginx/internal/ingress/controller/store"
2017-11-07 22:02:12 +00:00
ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template"
2018-07-07 17:46:18 +00:00
"k8s.io/ingress-nginx/internal/ingress/metric"
2017-11-07 22:02:12 +00:00
"k8s.io/ingress-nginx/internal/ingress/status"
2018-11-20 20:29:20 +00:00
"k8s.io/ingress-nginx/internal/k8s"
2017-11-07 22:02:12 +00:00
ing_net "k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/net/dns"
"k8s.io/ingress-nginx/internal/net/ssl"
2019-01-21 14:29:36 +00:00
"k8s.io/ingress-nginx/internal/nginx"
2017-11-07 22:02:12 +00:00
"k8s.io/ingress-nginx/internal/task"
"k8s.io/ingress-nginx/internal/watch"
2016-11-29 01:39:17 +00:00
)
const (
2019-01-21 14:29:36 +00:00
tempNginxPattern = "nginx-cfg"
)
2017-10-06 20:33:32 +00:00
// NewNGINXController creates a new NGINX Ingress controller.
2019-08-13 21:14:55 +00:00
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
2017-11-05 01:18:28 +00:00
eventBroadcaster := record.NewBroadcaster()
2018-12-05 16:27:55 +00:00
eventBroadcaster.StartLogging(klog.Infof)
2017-11-05 01:18:28 +00:00
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: config.Client.CoreV1().Events(config.Namespace),
})
2017-04-11 14:47:49 +00:00
h, err := dns.GetSystemNameServers()
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Warningf("Error reading system nameservers: %v", err)
2017-04-11 14:47:49 +00:00
}
2017-03-12 15:27:05 +00:00
n := &NGINXController{
2017-11-05 01:18:28 +00:00
isIPV6Enabled: ing_net.IsIPv6Enabled(),
2017-08-29 19:40:03 +00:00
resolver: h,
2017-11-05 01:18:28 +00:00
cfg: config,
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
2017-11-05 01:18:28 +00:00
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
}),
stopCh: make(chan struct{}),
updateCh: channels.NewRingChannel(1024),
ngxErrCh: make(chan error),
2017-11-05 01:18:28 +00:00
stopLock: &sync.Mutex{},
2017-11-06 22:34:30 +00:00
2018-06-13 18:15:45 +00:00
runningConfig: new(ingress.Configuration),
2018-01-23 20:10:02 +00:00
Proxy: &TCPProxy{},
2018-07-07 17:46:18 +00:00
metricCollector: mc,
command: NewNginxCommand(),
}
if n.cfg.ValidationWebhook != "" {
n.validationWebhookServer = &http.Server{
Addr: config.ValidationWebhook,
Handler: adm_controler.NewAdmissionControllerServer(&adm_controler.IngressAdmission{Checker: n}),
TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
}
}
2018-11-20 20:29:20 +00:00
pod, err := k8s.GetPodDetails(config.Client)
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Fatalf("unexpected error obtaining pod information: %v", err)
2018-11-20 20:29:20 +00:00
}
2019-03-08 00:20:34 +00:00
n.podInfo = pod
2018-11-20 20:29:20 +00:00
n.store = store.New(
config.Namespace,
config.ConfigMapName,
config.TCPConfigMapName,
config.UDPConfigMapName,
config.DefaultSSLCertificate,
config.ResyncPeriod,
config.Client,
n.updateCh,
pod,
config.DisableCatchAll)
2017-11-07 16:36:51 +00:00
2017-11-05 01:18:28 +00:00
n.syncQueue = task.NewTaskQueue(n.syncIngress)
2019-03-08 00:20:34 +00:00
2017-11-05 01:18:28 +00:00
if config.UpdateStatus {
2019-03-08 00:20:34 +00:00
n.syncStatus = status.NewStatusSyncer(pod, status.Config{
2017-11-05 01:18:28 +00:00
Client: config.Client,
2017-11-06 01:22:49 +00:00
PublishService: config.PublishService,
PublishStatusAddress: config.PublishStatusAddress,
IngressLister: n.store,
2017-11-05 01:18:28 +00:00
UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
2017-11-06 01:22:49 +00:00
UseNodeInternalIP: config.UseNodeInternalIP,
2017-11-05 01:18:28 +00:00
})
} else {
2018-12-05 16:27:55 +00:00
klog.Warning("Update of Ingress status is disabled (flag --update-status)")
2017-11-05 01:18:28 +00:00
}
2018-04-25 21:53:49 +00:00
onTemplateChange := func() {
2019-08-13 21:14:55 +00:00
template, err := ngx_template.NewTemplate(nginx.TemplatePath)
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
2018-12-05 16:27:55 +00:00
klog.Errorf(`
-------------------------------------------------------------------------------
2018-06-13 18:15:45 +00:00
Error loading new template: %v
-------------------------------------------------------------------------------
`, err)
return
}
n.t = template
2018-12-05 16:27:55 +00:00
klog.Info("New NGINX configuration template loaded.")
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
}
2019-08-13 21:14:55 +00:00
ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Fatalf("Invalid NGINX configuration template: %v", err)
}
n.t = ngxTpl
2016-11-29 01:39:17 +00:00
2019-08-13 21:14:55 +00:00
_, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
2018-06-16 20:22:59 +00:00
if err != nil {
2019-08-13 21:14:55 +00:00
klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err)
2018-06-16 20:22:59 +00:00
}
2018-06-16 20:22:59 +00:00
filesToWatch := []string{}
err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
if err != nil {
2018-06-16 20:22:59 +00:00
return err
}
if info.IsDir() {
return nil
}
2018-06-16 20:22:59 +00:00
filesToWatch = append(filesToWatch, path)
return nil
})
2018-06-16 20:22:59 +00:00
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Fatalf("Error creating file watchers: %v", err)
2018-06-16 20:22:59 +00:00
}
2018-06-16 20:22:59 +00:00
for _, f := range filesToWatch {
_, err = watch.NewFileWatcher(f, func() {
2018-12-05 16:27:55 +00:00
klog.Infof("File %v changed. Reloading NGINX", f)
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
})
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Fatalf("Error creating file watcher for %v: %v", f, err)
}
}
2017-08-28 16:06:58 +00:00
return n
}
2018-06-13 18:15:45 +00:00
// NGINXController describes a NGINX Ingress controller.
type NGINXController struct {
2019-03-08 00:20:34 +00:00
podInfo *k8s.PodInfo
2017-11-05 01:18:28 +00:00
cfg *Configuration
recorder record.EventRecorder
syncQueue *task.Queue
2019-03-08 00:20:34 +00:00
syncStatus status.Syncer
2017-11-05 01:18:28 +00:00
syncRateLimiter flowcontrol.RateLimiter
2018-06-13 18:15:45 +00:00
// stopLock is used to enforce that only a single call to Stop send at
// a given time. We allow stopping through an HTTP endpoint and
2017-11-05 01:18:28 +00:00
// allowing concurrent stoppers leads to stack traces.
stopLock *sync.Mutex
stopCh chan struct{}
updateCh *channels.RingChannel
2017-11-05 01:18:28 +00:00
2018-06-13 18:15:45 +00:00
// ngxErrCh is used to detect errors with the NGINX processes
2017-11-05 01:18:28 +00:00
ngxErrCh chan error
// runningConfig contains the running configuration in the Backend
runningConfig *ingress.Configuration
t ngx_template.TemplateWriter
resolver []net.IP
isIPV6Enabled bool
2017-04-11 14:47:49 +00:00
2017-08-28 16:06:58 +00:00
isShuttingDown bool
2017-11-05 01:18:28 +00:00
Proxy *TCPProxy
2017-08-26 03:46:17 +00:00
store store.Storer
2017-11-06 22:34:30 +00:00
2018-07-07 17:46:18 +00:00
metricCollector metric.Collector
validationWebhookServer *http.Server
command NginxExecTester
}
2018-06-13 18:15:45 +00:00
// Start starts a new NGINX master process running in the foreground.
2017-03-12 15:27:05 +00:00
func (n *NGINXController) Start() {
2018-12-05 16:27:55 +00:00
klog.Info("Starting NGINX Ingress controller")
2017-11-05 01:18:28 +00:00
n.store.Run(n.stopCh)
2019-03-11 15:57:28 +00:00
// we need to use the defined ingress class to allow multiple leaders
// in order to update information about ingress status
electionID := fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.DefaultClass)
if class.IngressClass != "" {
electionID = fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.IngressClass)
}
2019-03-08 00:20:34 +00:00
setupLeaderElection(&leaderElectionConfig{
Client: n.cfg.Client,
2019-03-11 15:57:28 +00:00
ElectionID: electionID,
2019-03-08 00:20:34 +00:00
OnStartedLeading: func(stopCh chan struct{}) {
if n.syncStatus != nil {
go n.syncStatus.Run(stopCh)
}
2019-03-11 15:57:28 +00:00
n.metricCollector.OnStartedLeading(electionID)
// manually update SSL expiration metrics
// (to not wait for a reload)
n.metricCollector.SetSSLExpireTime(n.runningConfig.Servers)
2019-03-08 00:20:34 +00:00
},
OnStoppedLeading: func() {
2019-03-11 15:57:28 +00:00
n.metricCollector.OnStoppedLeading(electionID)
2019-03-08 00:20:34 +00:00
},
PodName: n.podInfo.Name,
PodNamespace: n.podInfo.Namespace,
})
2017-11-05 01:18:28 +00:00
cmd := n.command.ExecCommand()
2017-08-28 16:06:58 +00:00
2018-06-13 18:15:45 +00:00
// put NGINX in another process group to prevent it
2017-08-28 16:06:58 +00:00
// to receive signals meant for the controller
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
}
if n.cfg.EnableSSLPassthrough {
n.setupSSLProxy()
}
2018-12-05 16:27:55 +00:00
klog.Info("Starting NGINX process")
2017-11-05 01:18:28 +00:00
n.start(cmd)
2016-11-29 01:39:17 +00:00
2017-11-27 22:22:59 +00:00
go n.syncQueue.Run(time.Second, n.stopCh)
2017-11-05 01:18:28 +00:00
// force initial sync
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
2017-08-28 16:06:58 +00:00
// In case of error the temporal configuration file will
// be available up to five minutes after the error
go func() {
for {
time.Sleep(5 * time.Minute)
err := cleanTempNginxCfg()
if err != nil {
klog.Infof("Unexpected error removing temporal configuration files: %v", err)
}
}
}()
if n.validationWebhookServer != nil {
klog.Infof("Starting validation webhook on %s with keys %s %s", n.validationWebhookServer.Addr, n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath)
go func() {
klog.Error(n.validationWebhookServer.ListenAndServeTLS("", ""))
}()
}
2017-11-05 01:18:28 +00:00
for {
select {
2018-03-03 12:23:06 +00:00
case err := <-n.ngxErrCh:
2017-11-05 01:18:28 +00:00
if n.isShuttingDown {
2016-11-29 01:39:17 +00:00
break
}
2017-11-05 01:18:28 +00:00
// if the nginx master process dies the workers continue to process requests,
// passing checks but in case of updates in ingress no updates will be
// reflected in the nginx configuration which can lead to confusion and report
// issues because of this behavior.
// To avoid this issue we restart nginx in case of errors.
if process.IsRespawnIfRequired(err) {
process.WaitUntilPortIsAvailable(n.cfg.ListenPorts.HTTP)
// release command resources
cmd.Process.Release()
// start a new nginx master process if the controller is not being stopped
cmd = n.command.ExecCommand()
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
}
2017-11-05 01:18:28 +00:00
n.start(cmd)
}
case event := <-n.updateCh.Out():
if n.isShuttingDown {
break
}
if evt, ok := event.(store.Event); ok {
2018-12-05 16:27:55 +00:00
klog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)
if evt.Type == store.ConfigurationEvent {
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
2018-12-05 16:27:55 +00:00
klog.Warningf("Unexpected event type received %T", event)
2018-01-18 23:04:40 +00:00
}
2017-11-05 01:18:28 +00:00
case <-n.stopCh:
break
2016-11-29 01:39:17 +00:00
}
}
}
2017-08-28 16:06:58 +00:00
// Stop gracefully stops the NGINX master process.
func (n *NGINXController) Stop() error {
n.isShuttingDown = true
2017-11-05 01:18:28 +00:00
n.stopLock.Lock()
defer n.stopLock.Unlock()
if n.syncQueue.IsShuttingDown() {
return fmt.Errorf("shutdown already in progress")
}
2018-12-05 16:27:55 +00:00
klog.Info("Shutting down controller queues")
2017-11-05 01:18:28 +00:00
close(n.stopCh)
go n.syncQueue.Shutdown()
if n.syncStatus != nil {
n.syncStatus.Shutdown()
}
2017-08-28 16:06:58 +00:00
if n.validationWebhookServer != nil {
klog.Info("Stopping admission controller")
err := n.validationWebhookServer.Close()
if err != nil {
return err
}
}
2018-06-12 13:04:26 +00:00
// send stop signal to NGINX
2018-12-05 16:27:55 +00:00
klog.Info("Stopping NGINX process")
cmd := n.command.ExecCommand("-s", "quit")
2017-08-28 16:06:58 +00:00
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
return err
}
2018-06-13 18:15:45 +00:00
// wait for the NGINX process to terminate
2017-11-05 01:18:28 +00:00
timer := time.NewTicker(time.Second * 1)
2017-11-06 22:38:16 +00:00
for range timer.C {
2017-11-05 01:18:28 +00:00
if !process.IsNginxRunning() {
2018-12-05 16:27:55 +00:00
klog.Info("NGINX process has stopped")
2017-11-05 01:18:28 +00:00
timer.Stop()
break
}
}
2017-08-28 16:06:58 +00:00
return nil
}
2017-11-05 01:18:28 +00:00
func (n *NGINXController) start(cmd *exec.Cmd) {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
2018-12-05 16:27:55 +00:00
klog.Fatalf("NGINX error: %v", err)
2017-11-05 01:18:28 +00:00
n.ngxErrCh <- err
2016-11-29 01:39:17 +00:00
return
}
2016-11-29 01:39:17 +00:00
go func() {
2017-11-05 01:18:28 +00:00
n.ngxErrCh <- cmd.Wait()
2016-11-29 01:39:17 +00:00
}()
}
// DefaultEndpoint returns the default endpoint to be use as default server that returns 404.
func (n NGINXController) DefaultEndpoint() ingress.Endpoint {
return ingress.Endpoint{
Address: "127.0.0.1",
2017-11-05 01:18:28 +00:00
Port: fmt.Sprintf("%v", n.cfg.ListenPorts.Default),
2017-09-17 18:42:31 +00:00
Target: &apiv1.ObjectReference{},
}
}
// generateTemplate returns the nginx configuration file content
func (n NGINXController) generateTemplate(cfg ngx_config.Configuration, ingressCfg ingress.Configuration) ([]byte, error) {
2018-01-23 20:10:02 +00:00
if n.cfg.EnableSSLPassthrough {
servers := []*TCPServer{}
for _, pb := range ingressCfg.PassthroughBackends {
svc := pb.Service
if svc == nil {
2018-12-05 16:27:55 +00:00
klog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend)
2018-01-23 20:10:02 +00:00
continue
}
2018-01-23 20:10:02 +00:00
port, err := strconv.Atoi(pb.Port.String())
if err != nil {
for _, sp := range svc.Spec.Ports {
if sp.Name == pb.Port.String() {
port = int(sp.Port)
break
}
}
} else {
for _, sp := range svc.Spec.Ports {
if sp.Port == int32(port) {
port = int(sp.Port)
break
}
}
}
2018-01-23 20:10:02 +00:00
2018-06-13 18:15:45 +00:00
// TODO: Allow PassthroughBackends to specify they support proxy-protocol
2018-01-23 20:10:02 +00:00
servers = append(servers, &TCPServer{
Hostname: pb.Hostname,
IP: svc.Spec.ClusterIP,
Port: port,
ProxyProtocol: false,
})
}
2018-01-23 20:10:02 +00:00
n.Proxy.ServerList = servers
}
2018-06-12 13:04:26 +00:00
// NGINX cannot resize the hash tables used to store server names. For
// this reason we check if the current size is correct for the host
// names defined in the Ingress rules and adjust the value if
// necessary.
// https://trac.nginx.org/nginx/ticket/352
// https://trac.nginx.org/nginx/ticket/631
2017-08-16 07:02:30 +00:00
var longestName int
var serverNameBytes int
2017-08-16 07:02:30 +00:00
for _, srv := range ingressCfg.Servers {
if longestName < len(srv.Hostname) {
longestName = len(srv.Hostname)
}
serverNameBytes += len(srv.Hostname)
}
if cfg.ServerNameHashBucketSize == 0 {
2017-08-16 07:02:30 +00:00
nameHashBucketSize := nginxHashBucketSize(longestName)
2018-12-05 16:27:55 +00:00
klog.V(3).Infof("Adjusting ServerNameHashBucketSize variable to %d", nameHashBucketSize)
cfg.ServerNameHashBucketSize = nameHashBucketSize
}
serverNameHashMaxSize := nextPowerOf2(serverNameBytes)
if cfg.ServerNameHashMaxSize < serverNameHashMaxSize {
2018-12-05 16:27:55 +00:00
klog.V(3).Infof("Adjusting ServerNameHashMaxSize variable to %d", serverNameHashMaxSize)
cfg.ServerNameHashMaxSize = serverNameHashMaxSize
}
if cfg.MaxWorkerOpenFiles == 0 {
// the limit of open files is per worker process
// and we leave some room to avoid consuming all the FDs available
wp, err := strconv.Atoi(cfg.WorkerProcesses)
klog.V(3).Infof("Number of worker processes: %d", wp)
if err != nil {
wp = 1
}
maxOpenFiles := (rlimitMaxNumFiles() / wp) - 1024
klog.V(3).Infof("Maximum number of open file descriptors: %d", maxOpenFiles)
if maxOpenFiles < 1024 {
// this means the value of RLIMIT_NOFILE is too low.
maxOpenFiles = 1024
}
klog.V(3).Infof("Adjusting MaxWorkerOpenFiles variable to %d", maxOpenFiles)
cfg.MaxWorkerOpenFiles = maxOpenFiles
}
if cfg.MaxWorkerConnections == 0 {
2019-07-08 20:10:38 +00:00
maxWorkerConnections := int(float64(cfg.MaxWorkerOpenFiles * 3.0 / 4))
klog.V(3).Infof("Adjusting MaxWorkerConnections variable to %d", maxWorkerConnections)
cfg.MaxWorkerConnections = maxWorkerConnections
}
setHeaders := map[string]string{}
if cfg.ProxySetHeaders != "" {
cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders)
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.ProxySetHeaders, err)
} else {
setHeaders = cmap.Data
}
}
2017-05-18 10:21:03 +00:00
addHeaders := map[string]string{}
if cfg.AddHeaders != "" {
cmap, err := n.store.GetConfigMap(cfg.AddHeaders)
2017-05-18 10:21:03 +00:00
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.AddHeaders, err)
} else {
addHeaders = cmap.Data
2017-05-18 10:21:03 +00:00
}
}
sslDHParam := ""
if cfg.SSLDHParam != "" {
secretName := cfg.SSLDHParam
secret, err := n.store.GetSecret(secretName)
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Warningf("Error reading Secret %q from local store: %v", secretName, err)
} else {
nsSecName := strings.Replace(secretName, "/", "-", -1)
dh, ok := secret.Data["dhparam.pem"]
if ok {
2019-08-13 21:14:55 +00:00
pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh)
if err != nil {
klog.Warningf("Error adding or updating dhparam file %v: %v", nsSecName, err)
} else {
sslDHParam = pemFileName
}
}
}
}
cfg.SSLDHParam = sslDHParam
2019-08-13 21:14:55 +00:00
cfg.DefaultSSLCertificate = n.getDefaultSSLCertificate()
2017-11-05 01:18:28 +00:00
tc := ngx_config.TemplateConfig{
2019-08-13 21:14:55 +00:00
ProxySetHeaders: setHeaders,
AddHeaders: addHeaders,
BacklogSize: sysctlSomaxconn(),
Backends: ingressCfg.Backends,
PassthroughBackends: ingressCfg.PassthroughBackends,
Servers: ingressCfg.Servers,
TCPBackends: ingressCfg.TCPEndpoints,
UDPBackends: ingressCfg.UDPEndpoints,
Cfg: cfg,
IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6,
NginxStatusIpv4Whitelist: cfg.NginxStatusIpv4Whitelist,
NginxStatusIpv6Whitelist: cfg.NginxStatusIpv6Whitelist,
RedirectServers: buildRedirects(ingressCfg.Servers),
IsSSLPassthroughEnabled: n.cfg.EnableSSLPassthrough,
ListenPorts: n.cfg.ListenPorts,
PublishService: n.GetPublishService(),
EnableMetrics: n.cfg.EnableMetrics,
2019-01-21 14:29:36 +00:00
HealthzURI: nginx.HealthPath,
PID: nginx.PID,
StatusSocket: nginx.StatusSocket,
StatusPath: nginx.StatusPath,
StreamSocket: nginx.StreamSocket,
}
2018-07-07 17:46:18 +00:00
tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
return n.t.Write(tc)
}
// testTemplate checks if the NGINX configuration inside the byte array is valid
// running the command "nginx -t" using a temporal file.
func (n NGINXController) testTemplate(cfg []byte) error {
if len(cfg) == 0 {
return fmt.Errorf("invalid NGINX configuration (empty)")
}
tmpfile, err := ioutil.TempFile("", tempNginxPattern)
if err != nil {
return err
}
defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), cfg, file.ReadWriteByUser)
if err != nil {
return err
}
out, err := n.command.Test(tmpfile.Name())
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
oe := fmt.Sprintf(`
-------------------------------------------------------------------------------
Error: %v
%v
-------------------------------------------------------------------------------
`, err, string(out))
return errors.New(oe)
}
os.Remove(tmpfile.Name())
return nil
}
// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
content, err := n.generateTemplate(cfg, ingressCfg)
2017-06-11 19:56:40 +00:00
if err != nil {
return err
}
if cfg.EnableOpentracing {
err := createOpentracingCfg(cfg)
if err != nil {
return err
}
}
2017-06-11 19:56:40 +00:00
err = n.testTemplate(content)
if err != nil {
return err
}
2018-12-05 16:27:55 +00:00
if klog.V(2) {
2017-11-05 01:18:28 +00:00
src, _ := ioutil.ReadFile(cfgPath)
if !bytes.Equal(src, content) {
tmpfile, err := ioutil.TempFile("", "new-nginx-cfg")
if err != nil {
return err
}
defer tmpfile.Close()
2018-06-12 12:40:40 +00:00
err = ioutil.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
2017-11-05 01:18:28 +00:00
if err != nil {
return err
}
diffOutput, err := exec.Command("diff", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
if ws.ExitStatus() == 2 {
klog.Warningf("Failed to executing diff command: %v", err)
}
}
}
2017-11-05 01:18:28 +00:00
2018-12-05 16:27:55 +00:00
klog.Infof("NGINX configuration diff:\n%v", string(diffOutput))
2017-11-05 01:18:28 +00:00
2018-06-13 18:15:45 +00:00
// we do not defer the deletion of temp files in order
// to keep them around for inspection in case of error
2017-11-05 01:18:28 +00:00
os.Remove(tmpfile.Name())
}
}
2017-06-11 19:56:40 +00:00
2018-06-12 12:40:40 +00:00
err = ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser)
if err != nil {
2017-06-11 19:56:40 +00:00
return err
}
o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
2017-06-11 19:56:40 +00:00
if err != nil {
2017-06-23 13:55:45 +00:00
return fmt.Errorf("%v\n%v", err, string(o))
}
2017-06-11 19:56:40 +00:00
return nil
}
2018-06-13 18:15:45 +00:00
// nginxHashBucketSize computes the correct NGINX hash_bucket_size for a hash
// with the given longest key.
func nginxHashBucketSize(longestString int) int {
2018-06-13 18:15:45 +00:00
// see https://github.com/kubernetes/ingress-nginxs/issues/623 for an explanation
wordSize := 8 // Assume 64 bit CPU
n := longestString + 2
aligned := (n + wordSize - 1) & ^(wordSize - 1)
rawSize := wordSize + wordSize + aligned
return nextPowerOf2(rawSize)
}
// http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
// https://play.golang.org/p/TVSyCcdxUh
func nextPowerOf2(v int) int {
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v++
return v
}
func (n *NGINXController) setupSSLProxy() {
cfg := n.store.GetBackendConfiguration()
sslPort := n.cfg.ListenPorts.HTTPS
proxyPort := n.cfg.ListenPorts.SSLProxy
2018-12-05 16:27:55 +00:00
klog.Info("Starting TLS proxy for SSL Passthrough")
n.Proxy = &TCPProxy{
Default: &TCPServer{
Hostname: "localhost",
IP: "127.0.0.1",
Port: proxyPort,
ProxyProtocol: true,
},
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", sslPort))
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Fatalf("%v", err)
}
proxyList := &proxyproto.Listener{Listener: listener, ProxyHeaderTimeout: cfg.ProxyProtocolHeaderTimeout}
2018-06-13 18:15:45 +00:00
// accept TCP connections on the configured HTTPS port
go func() {
for {
var conn net.Conn
var err error
if n.store.GetBackendConfiguration().UseProxyProtocol {
2018-06-13 18:15:45 +00:00
// wrap the listener in order to decode Proxy
// Protocol before handling the connection
conn, err = proxyList.Accept()
} else {
conn, err = listener.Accept()
}
if err != nil {
2018-12-05 16:27:55 +00:00
klog.Warningf("Error accepting TCP connection: %v", err)
continue
}
2018-12-05 16:27:55 +00:00
klog.V(3).Infof("Handling connection from remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
go n.Proxy.Handle(conn)
}
}()
}
// Helper function to clear Certificates from the ingress configuration since they should be ignored when
// checking if the new configuration changes can be applied dynamically if dynamic certificates is on
func clearCertificates(config *ingress.Configuration) {
var clearedServers []*ingress.Server
for _, server := range config.Servers {
copyOfServer := *server
2019-08-13 21:14:55 +00:00
if copyOfServer.SSLCert != nil {
copyOfServer.SSLCert = &ingress.SSLCert{PemFileName: copyOfServer.SSLCert.PemFileName}
}
clearedServers = append(clearedServers, &copyOfServer)
}
config.Servers = clearedServers
}
// Helper function to clear endpoints from the ingress configuration since they should be ignored when
// checking if the new configuration changes can be applied dynamically.
func clearL4serviceEndpoints(config *ingress.Configuration) {
var clearedTCPL4Services []ingress.L4Service
var clearedUDPL4Services []ingress.L4Service
for _, service := range config.TCPEndpoints {
copyofService := ingress.L4Service{
Port: service.Port,
Backend: service.Backend,
Endpoints: []ingress.Endpoint{},
Service: nil,
}
clearedTCPL4Services = append(clearedTCPL4Services, copyofService)
}
for _, service := range config.UDPEndpoints {
copyofService := ingress.L4Service{
Port: service.Port,
Backend: service.Backend,
Endpoints: []ingress.Endpoint{},
Service: nil,
}
clearedUDPL4Services = append(clearedUDPL4Services, copyofService)
}
config.TCPEndpoints = clearedTCPL4Services
config.UDPEndpoints = clearedUDPL4Services
}
2018-06-13 18:15:45 +00:00
// IsDynamicConfigurationEnough returns whether a Configuration can be
// dynamically applied, without reloading the backend.
func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool {
copyOfRunningConfig := *n.runningConfig
copyOfPcfg := *pcfg
copyOfRunningConfig.Backends = []*ingress.Backend{}
copyOfPcfg.Backends = []*ingress.Backend{}
clearL4serviceEndpoints(&copyOfRunningConfig)
clearL4serviceEndpoints(&copyOfPcfg)
copyOfRunningConfig.ControllerPodsCount = 0
copyOfPcfg.ControllerPodsCount = 0
2019-08-13 21:14:55 +00:00
clearCertificates(&copyOfRunningConfig)
clearCertificates(&copyOfPcfg)
return copyOfRunningConfig.Equal(&copyOfPcfg)
}
2018-06-13 18:15:45 +00:00
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
2019-08-15 18:57:51 +00:00
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
if backendsChanged {
err := configureBackends(pcfg.Backends)
if err != nil {
return err
}
2019-08-15 18:57:51 +00:00
}
2019-08-15 18:57:51 +00:00
streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
if streamConfigurationChanged {
err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
if err != nil {
return err
}
}
2019-08-15 18:57:51 +00:00
if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount {
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
ControllerPodsCount: pcfg.ControllerPodsCount,
})
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
}
2019-08-15 18:57:51 +00:00
serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
if serversChanged {
err := configureCertificates(pcfg.Servers)
if err != nil {
return err
}
2019-01-21 14:29:36 +00:00
}
2019-08-15 18:57:51 +00:00
return nil
}
func updateStreamConfiguration(TCPEndpoints []ingress.L4Service, UDPEndpoints []ingress.L4Service) error {
2018-11-16 20:33:56 +00:00
streams := make([]ingress.Backend, 0)
2019-08-15 18:57:51 +00:00
for _, ep := range TCPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
2018-11-16 20:33:56 +00:00
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
2019-08-15 18:57:51 +00:00
for _, ep := range UDPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
2018-11-16 20:33:56 +00:00
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
2019-08-15 18:57:51 +00:00
conn, err := net.Dial("unix", nginx.StreamSocket)
2018-11-16 20:52:46 +00:00
if err != nil {
return err
}
2019-08-15 18:57:51 +00:00
defer conn.Close()
2018-11-16 20:52:46 +00:00
2019-08-15 18:57:51 +00:00
buf, err := json.Marshal(streams)
if err != nil {
return err
}
2019-08-15 18:57:51 +00:00
_, err = conn.Write(buf)
if err != nil {
return err
2019-01-21 14:29:36 +00:00
}
2019-08-15 18:57:51 +00:00
_, err = fmt.Fprintf(conn, "\r\n")
2019-08-13 21:14:55 +00:00
if err != nil {
return err
2018-11-16 20:52:46 +00:00
}
return nil
}
2019-08-15 18:57:51 +00:00
func configureBackends(rawBackends []*ingress.Backend) error {
backends := make([]*ingress.Backend, len(rawBackends))
2018-11-16 20:52:46 +00:00
2019-08-15 18:57:51 +00:00
for i, backend := range rawBackends {
var service *apiv1.Service
if backend.Service != nil {
service = &apiv1.Service{Spec: backend.Service.Spec}
}
luaBackend := &ingress.Backend{
Name: backend.Name,
Port: backend.Port,
SSLPassthrough: backend.SSLPassthrough,
SessionAffinity: backend.SessionAffinity,
UpstreamHashBy: backend.UpstreamHashBy,
LoadBalancing: backend.LoadBalancing,
Service: service,
NoServer: backend.NoServer,
TrafficShapingPolicy: backend.TrafficShapingPolicy,
AlternativeBackends: backend.AlternativeBackends,
}
var endpoints []ingress.Endpoint
for _, endpoint := range backend.Endpoints {
endpoints = append(endpoints, ingress.Endpoint{
Address: endpoint.Address,
Port: endpoint.Port,
})
}
luaBackend.Endpoints = endpoints
backends[i] = luaBackend
}
2019-08-15 18:57:51 +00:00
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
if err != nil {
return err
}
2019-08-15 18:57:51 +00:00
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
2018-11-16 20:33:56 +00:00
}
return nil
}
// configureCertificates JSON encodes certificates and POSTs it to an internal HTTP endpoint
// that is handled by Lua
2019-08-15 18:57:51 +00:00
func configureCertificates(rawServers []*ingress.Server) error {
2019-08-13 21:14:55 +00:00
servers := make([]*ingress.Server, 0)
2019-08-15 18:57:51 +00:00
for _, server := range rawServers {
2019-08-13 21:14:55 +00:00
if server.SSLCert == nil {
continue
}
servers = append(servers, &ingress.Server{
Hostname: server.Hostname,
2019-08-13 21:14:55 +00:00
SSLCert: &ingress.SSLCert{
PemCertKey: server.SSLCert.PemCertKey,
},
})
if server.Alias != "" && ssl.IsValidHostname(server.Alias, server.SSLCert.CN) {
servers = append(servers, &ingress.Server{
Hostname: server.Alias,
2019-08-13 21:14:55 +00:00
SSLCert: &ingress.SSLCert{
PemCertKey: server.SSLCert.PemCertKey,
},
})
}
}
2019-08-15 18:57:51 +00:00
redirects := buildRedirects(rawServers)
for _, redirect := range redirects {
2019-08-13 21:14:55 +00:00
if redirect.SSLCert == nil {
continue
}
servers = append(servers, &ingress.Server{
Hostname: redirect.From,
2019-08-13 21:14:55 +00:00
SSLCert: &ingress.SSLCert{
PemCertKey: redirect.SSLCert.PemCertKey,
},
})
}
2019-01-21 14:29:36 +00:00
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/servers", "application/json", servers)
if err != nil {
return err
}
2019-01-21 14:29:36 +00:00
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
return nil
}
const zipkinTmpl = `{
"service_name": "{{ .ZipkinServiceName }}",
"collector_host": "{{ .ZipkinCollectorHost }}",
"collector_port": {{ .ZipkinCollectorPort }},
"sample_rate": {{ .ZipkinSampleRate }}
}`
const jaegerTmpl = `{
"service_name": "{{ .JaegerServiceName }}",
"sampler": {
"type": "{{ .JaegerSamplerType }}",
"param": {{ .JaegerSamplerParam }},
"samplingServerURL": "{{ .JaegerSamplerHost }}:{{ .JaegerSamplerPort }}/sampling"
},
"reporter": {
"localAgentHostPort": "{{ .JaegerCollectorHost }}:{{ .JaegerCollectorPort }}"
}
}`
const datadogTmpl = `{
"service": "{{ .DatadogServiceName }}",
"agent_host": "{{ .DatadogCollectorHost }}",
"agent_port": {{ .DatadogCollectorPort }},
"operation_name_override": "{{ .DatadogOperationNameOverride }}"
}`
func createOpentracingCfg(cfg ngx_config.Configuration) error {
var tmpl *template.Template
var err error
if cfg.ZipkinCollectorHost != "" {
tmpl, err = template.New("zipkin").Parse(zipkinTmpl)
if err != nil {
return err
}
} else if cfg.JaegerCollectorHost != "" {
tmpl, err = template.New("jaeger").Parse(jaegerTmpl)
if err != nil {
return err
}
} else if cfg.DatadogCollectorHost != "" {
tmpl, err = template.New("datadog").Parse(datadogTmpl)
if err != nil {
return err
}
} else {
tmpl, _ = template.New("empty").Parse("{}")
}
tmplBuf := bytes.NewBuffer(make([]byte, 0))
err = tmpl.Execute(tmplBuf, cfg)
if err != nil {
return err
}
// Expand possible environment variables before writing the configuration to file.
2019-07-08 20:10:38 +00:00
expanded := os.ExpandEnv(tmplBuf.String())
return ioutil.WriteFile("/etc/nginx/opentracing.json", []byte(expanded), file.ReadWriteByUser)
}
func cleanTempNginxCfg() error {
var files []string
err := filepath.Walk(os.TempDir(), func(path string, info os.FileInfo, err error) error {
if info.IsDir() && os.TempDir() != path {
return filepath.SkipDir
}
dur, _ := time.ParseDuration("-5m")
fiveMinutesAgo := time.Now().Add(dur)
if strings.HasPrefix(info.Name(), tempNginxPattern) && info.ModTime().Before(fiveMinutesAgo) {
files = append(files, path)
}
return nil
})
if err != nil {
return err
}
for _, file := range files {
err := os.Remove(file)
if err != nil {
return err
}
}
return nil
}
type redirect struct {
From string
To string
2019-08-13 21:14:55 +00:00
SSLCert *ingress.SSLCert
}
func buildRedirects(servers []*ingress.Server) []*redirect {
names := sets.String{}
redirectServers := make([]*redirect, 0)
for _, srv := range servers {
if !srv.RedirectFromToWWW {
continue
}
to := srv.Hostname
var from string
if strings.HasPrefix(to, "www.") {
from = strings.TrimPrefix(to, "www.")
} else {
from = fmt.Sprintf("www.%v", to)
}
if names.Has(to) {
continue
}
klog.V(3).Infof("Creating redirect from %q to %q", from, to)
found := false
for _, esrv := range servers {
if esrv.Hostname == from {
found = true
break
}
}
if found {
klog.Warningf("Already exists an Ingress with %q hostname. Skipping creation of redirection from %q to %q.", from, from, to)
continue
}
r := &redirect{
From: from,
To: to,
}
2019-08-13 21:14:55 +00:00
if srv.SSLCert != nil {
if ssl.IsValidHostname(from, srv.SSLCert.CN) {
r.SSLCert = srv.SSLCert
} else {
klog.Warningf("the server %v has SSL configured but the SSL certificate does not contains a CN for %v. Redirects will not work for HTTPS to HTTPS", from, to)
}
}
redirectServers = append(redirectServers, r)
names.Insert(to)
}
return redirectServers
}