Remove GenericController and add tests

This commit is contained in:
Manuel de Brito Fontes 2017-11-04 22:18:28 -03:00
parent 1701bfc334
commit 86f39d9deb
39 changed files with 1131 additions and 1325 deletions

64
cmd/nginx/flag_test.go Normal file
View file

@ -0,0 +1,64 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"os"
"testing"
)
// resetForTesting clears all flag state and sets the usage function as directed.
// After calling resetForTesting, parse errors in flag handling will not
// exit the program.
// Extracted from https://github.com/golang/go/blob/master/src/flag/export_test.go
func resetForTesting(usage func()) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
flag.Usage = usage
}
func TestMandatoryFlag(t *testing.T) {
_, _, err := parseFlags()
if err == nil {
t.Fatalf("expected and error about default backend service")
}
}
func TestDefaults(t *testing.T) {
resetForTesting(func() { t.Fatal("bad parse") })
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"cmd", "--default-backend-service", "namespace/test", "--http-port", "0", "--https-port", "0"}
showVersion, conf, err := parseFlags()
if err != nil {
t.Fatalf("unexpected error parsing default flags: %v", err)
}
if showVersion {
t.Fatal("expected false but true was returned for flag show-version")
}
if conf == nil {
t.Fatal("expected a configuration but nil returned")
}
}
func TestSetupSSLProxy(t *testing.T) {
// TODO
}

191
cmd/nginx/flags.go Normal file
View file

@ -0,0 +1,191 @@
package main
import (
"flag"
"fmt"
"os"
"time"
"github.com/golang/glog"
"github.com/spf13/pflag"
apiv1 "k8s.io/api/core/v1"
"k8s.io/ingress-nginx/pkg/ingress/controller"
ngx_config "k8s.io/ingress-nginx/pkg/ingress/controller/config"
ing_net "k8s.io/ingress-nginx/pkg/net"
)
const (
defIngressClass = "nginx"
)
func parseFlags() (bool, *controller.Configuration, error) {
var (
flags = pflag.NewFlagSet("", pflag.ExitOnError)
apiserverHost = flags.String("apiserver-host", "", "The address of the Kubernetes Apiserver "+
"to connect to in the format of protocol://address:port, e.g., "+
"http://localhost:8080. If not specified, the assumption is that the binary runs inside a "+
"Kubernetes cluster and local discovery is attempted.")
kubeConfigFile = flags.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.")
defaultSvc = flags.String("default-backend-service", "",
`Service used to serve a 404 page for the default backend. Takes the form
namespace/name. The controller uses the first node port of this Service for
the default backend.`)
ingressClass = flags.String("ingress-class", "",
`Name of the ingress class to route through this controller.`)
configMap = flags.String("configmap", "",
`Name of the ConfigMap that contains the custom configuration to use`)
publishSvc = flags.String("publish-service", "",
`Service fronting the ingress controllers. Takes the form
namespace/name. The controller will set the endpoint records on the
ingress objects to reflect those on the service.`)
tcpConfigMapName = flags.String("tcp-services-configmap", "",
`Name of the ConfigMap that contains the definition of the TCP services to expose.
The key in the map indicates the external port to be used. The value is the name of the
service with the format namespace/serviceName and the port of the service could be a
number of the name of the port.
The ports 80 and 443 are not allowed as external ports. This ports are reserved for the backend`)
udpConfigMapName = flags.String("udp-services-configmap", "",
`Name of the ConfigMap that contains the definition of the UDP services to expose.
The key in the map indicates the external port to be used. The value is the name of the
service with the format namespace/serviceName and the port of the service could be a
number of the name of the port.`)
resyncPeriod = flags.Duration("sync-period", 600*time.Second,
`Relist and confirm cloud resources this often. Default is 10 minutes`)
watchNamespace = flags.String("watch-namespace", apiv1.NamespaceAll,
`Namespace to watch for Ingress. Default is to watch all namespaces`)
profiling = flags.Bool("profiling", true, `Enable profiling via web interface host:port/debug/pprof/`)
defSSLCertificate = flags.String("default-ssl-certificate", "", `Name of the secret
that contains a SSL certificate to be used as default for a HTTPS catch-all server`)
defHealthzURL = flags.String("health-check-path", "/healthz", `Defines
the URL to be used as health check inside in the default server in NGINX.`)
updateStatus = flags.Bool("update-status", true, `Indicates if the
ingress controller should update the Ingress status IP/hostname. Default is true`)
electionID = flags.String("election-id", "ingress-controller-leader", `Election id to use for status update.`)
forceIsolation = flags.Bool("force-namespace-isolation", false,
`Force namespace isolation. This flag is required to avoid the reference of secrets or
configmaps located in a different namespace than the specified in the flag --watch-namespace.`)
disableNodeList = flags.Bool("disable-node-list", false,
`Disable querying nodes. If --force-namespace-isolation is true, this should also be set.`)
updateStatusOnShutdown = flags.Bool("update-status-on-shutdown", true, `Indicates if the
ingress controller should update the Ingress status IP/hostname when the controller
is being stopped. Default is true`)
sortBackends = flags.Bool("sort-backends", false,
`Defines if backends and it's endpoints should be sorted`)
useNodeInternalIP = flags.Bool("report-node-internal-ip-address", false,
`Defines if the nodes IP address to be returned in the ingress status should be the internal instead of the external IP address`)
showVersion = flags.Bool("version", false,
`Shows release information about the NGINX Ingress controller`)
enableSSLPassthrough = flags.Bool("enable-ssl-passthrough", false, `Enable SSL passthrough feature. Default is disabled`)
httpPort = flags.Int("http-port", 80, `Indicates the port to use for HTTP traffic`)
httpsPort = flags.Int("https-port", 443, `Indicates the port to use for HTTPS traffic`)
statusPort = flags.Int("status-port", 18080, `Indicates the TCP port to use for exposing the nginx status page`)
sslProxyPort = flags.Int("ssl-passtrough-proxy-port", 442, `Default port to use internally for SSL when SSL Passthgough is enabled`)
defServerPort = flags.Int("default-server-port", 8181, `Default port to use for exposing the default server (catch all)`)
healthzPort = flags.Int("healthz-port", 10254, "port for healthz endpoint.")
)
flag.Set("logtostderr", "true")
flags.AddGoFlagSet(flag.CommandLine)
flags.Parse(os.Args)
flag.Set("logtostderr", "true")
// Workaround for this issue:
// https://github.com/kubernetes/kubernetes/issues/17162
flag.CommandLine.Parse([]string{})
if *showVersion {
return true, nil, nil
}
if *defaultSvc == "" {
return false, nil, fmt.Errorf("Please specify --default-backend-service")
}
if *ingressClass != "" {
glog.Infof("Watching for ingress class: %s", *ingressClass)
if *ingressClass != defIngressClass {
glog.Warningf("only Ingress with class \"%v\" will be processed by this ingress controller", *ingressClass)
}
}
// check port collisions
if !ing_net.IsPortAvailable(*httpPort) {
return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --http-port", *httpPort)
}
if !ing_net.IsPortAvailable(*httpsPort) {
return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --https-port", *httpsPort)
}
if !ing_net.IsPortAvailable(*statusPort) {
return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --status-port", *statusPort)
}
if !ing_net.IsPortAvailable(*defServerPort) {
return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --default-server-port", *defServerPort)
}
if *enableSSLPassthrough && !ing_net.IsPortAvailable(*sslProxyPort) {
return false, nil, fmt.Errorf("Port %v is already in use. Please check the flag --ssl-passtrough-proxy-port", *sslProxyPort)
}
config := &controller.Configuration{
APIServerHost: *apiserverHost,
KubeConfigFile: *kubeConfigFile,
UpdateStatus: *updateStatus,
ElectionID: *electionID,
EnableProfiling: *profiling,
EnableSSLPassthrough: *enableSSLPassthrough,
ResyncPeriod: *resyncPeriod,
DefaultService: *defaultSvc,
IngressClass: *ingressClass,
Namespace: *watchNamespace,
ConfigMapName: *configMap,
TCPConfigMapName: *tcpConfigMapName,
UDPConfigMapName: *udpConfigMapName,
DefaultSSLCertificate: *defSSLCertificate,
DefaultHealthzURL: *defHealthzURL,
PublishService: *publishSvc,
ForceNamespaceIsolation: *forceIsolation,
DisableNodeList: *disableNodeList,
UpdateStatusOnShutdown: *updateStatusOnShutdown,
SortBackends: *sortBackends,
UseNodeInternalIP: *useNodeInternalIP,
ListenPorts: &ngx_config.ListenPorts{
Default: *defServerPort,
Health: *healthzPort,
HTTP: *httpPort,
HTTPS: *httpsPort,
SSLProxy: *sslProxyPort,
Status: *statusPort,
},
}
return false, config, nil
}

View file

@ -17,29 +17,125 @@ limitations under the License.
package main
import (
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
"k8s.io/ingress-nginx/pkg/nginx/controller"
proxyproto "github.com/armon/go-proxyproto"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/ingress-nginx/pkg/ingress"
"k8s.io/ingress-nginx/pkg/ingress/controller"
"k8s.io/ingress-nginx/pkg/k8s"
"k8s.io/ingress-nginx/pkg/net/ssl"
"k8s.io/ingress-nginx/version"
)
func main() {
// start a new nginx controller
ngx := controller.NewNGINXController()
fmt.Println(version.String())
showVersion, conf, err := parseFlags()
if showVersion {
os.Exit(0)
}
if err != nil {
glog.Fatal(err)
}
kubeClient, err := createApiserverClient(conf.APIServerHost, conf.KubeConfigFile)
if err != nil {
handleFatalInitError(err)
}
ns, name, err := k8s.ParseNameNS(conf.DefaultService)
if err != nil {
glog.Fatalf("invalid format for service %v: %v", conf.DefaultService, err)
}
_, err = kubeClient.Core().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
if strings.Contains(err.Error(), "cannot get services in the namespace") {
glog.Fatalf("✖ It seems the cluster it is running with Authorization enabled (like RBAC) and there is no permissions for the ingress controller. Please check the configuration")
}
glog.Fatalf("no service with name %v found: %v", conf.DefaultService, err)
}
glog.Infof("validated %v as the default backend", conf.DefaultService)
if conf.PublishService != "" {
ns, name, err := k8s.ParseNameNS(conf.PublishService)
if err != nil {
glog.Fatalf("invalid service format: %v", err)
}
svc, err := kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
glog.Fatalf("unexpected error getting information about service %v: %v", conf.PublishService, err)
}
if len(svc.Status.LoadBalancer.Ingress) == 0 {
if len(svc.Spec.ExternalIPs) > 0 {
glog.Infof("service %v validated as assigned with externalIP", conf.PublishService)
} else {
// We could poll here, but we instead just exit and rely on k8s to restart us
glog.Fatalf("service %s does not (yet) have ingress points", conf.PublishService)
}
} else {
glog.Infof("service %v validated as source of Ingress status", conf.PublishService)
}
}
if conf.Namespace != "" {
_, err = kubeClient.CoreV1().Namespaces().Get(conf.Namespace, metav1.GetOptions{})
if err != nil {
glog.Fatalf("no watchNamespace with name %v found: %v", conf.Namespace, err)
}
}
if conf.ResyncPeriod.Seconds() < 10 {
glog.Fatalf("resync period (%vs) is too low", conf.ResyncPeriod.Seconds())
}
// create directory that will contains the SSL Certificates
err = os.MkdirAll(ingress.DefaultSSLDirectory, 0655)
if err != nil {
glog.Errorf("Failed to mkdir SSL directory: %v", err)
}
// create the default SSL certificate (dummy)
sha, pem := createDefaultSSLCertificate()
conf.FakeCertificatePath = pem
conf.FakeCertificateSHA = sha
conf.Client = kubeClient
conf.DefaultIngressClass = defIngressClass
ngx := controller.NewNGINXController(conf)
if conf.EnableSSLPassthrough {
setupSSLProxy(conf.ListenPorts.HTTPS, conf.ListenPorts.SSLProxy, ngx)
}
go handleSigterm(ngx)
// start the controller
mux := http.NewServeMux()
go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux)
ngx.Start()
// wait
glog.Infof("shutting down Ingress controller...")
for {
glog.Infof("Handled quit, awaiting pod deletion")
time.Sleep(30 * time.Second)
}
}
func handleSigterm(ngx *controller.NGINXController) {
@ -54,6 +150,176 @@ func handleSigterm(ngx *controller.NGINXController) {
exitCode = 1
}
glog.Infof("Handled quit, awaiting pod deletion")
time.Sleep(10 * time.Second)
glog.Infof("Exiting with %v", exitCode)
os.Exit(exitCode)
}
func setupSSLProxy(sslPort, proxyPort int, n *controller.NGINXController) {
glog.Info("starting TLS proxy for SSL passthrough")
n.Proxy = &controller.TCPProxy{
Default: &controller.TCPServer{
Hostname: "localhost",
IP: "127.0.0.1",
Port: proxyPort,
ProxyProtocol: true,
},
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", sslPort))
if err != nil {
glog.Fatalf("%v", err)
}
proxyList := &proxyproto.Listener{Listener: listener}
// start goroutine that accepts tcp connections in port 443
go func() {
for {
var conn net.Conn
var err error
if n.IsProxyProtocolEnabled {
// we need to wrap the listener in order to decode
// proxy protocol before handling the connection
conn, err = proxyList.Accept()
} else {
conn, err = listener.Accept()
}
if err != nil {
glog.Warningf("unexpected error accepting tcp connection: %v", err)
continue
}
glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
go n.Proxy.Handle(conn)
}
}()
}
// createApiserverClient creates new Kubernetes Apiserver client. When kubeconfig or apiserverHost param is empty
// the function assumes that it is running inside a Kubernetes cluster and attempts to
// discover the Apiserver. Otherwise, it connects to the Apiserver specified.
//
// apiserverHost param is in the format of protocol://address:port/pathPrefix, e.g.http://localhost:8001.
// kubeConfig location of kubeconfig file
func createApiserverClient(apiserverHost string, kubeConfig string) (*kubernetes.Clientset, error) {
cfg, err := buildConfigFromFlags(apiserverHost, kubeConfig)
if err != nil {
return nil, err
}
cfg.QPS = defaultQPS
cfg.Burst = defaultBurst
cfg.ContentType = "application/vnd.kubernetes.protobuf"
glog.Infof("Creating API client for %s", cfg.Host)
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
v, err := client.Discovery().ServerVersion()
if err != nil {
return nil, err
}
glog.Infof("Running in Kubernetes Cluster version v%v.%v (%v) - git (%v) commit %v - platform %v",
v.Major, v.Minor, v.GitVersion, v.GitTreeState, v.GitCommit, v.Platform)
return client, nil
}
const (
// High enough QPS to fit all expected use cases. QPS=0 is not set here, because
// client code is overriding it.
defaultQPS = 1e6
// High enough Burst to fit all expected use cases. Burst=0 is not set here, because
// client code is overriding it.
defaultBurst = 1e6
fakeCertificate = "default-fake-certificate"
)
// buildConfigFromFlags builds REST config based on master URL and kubeconfig path.
// If both of them are empty then in cluster config is used.
func buildConfigFromFlags(masterURL, kubeconfigPath string) (*rest.Config, error) {
if kubeconfigPath == "" && masterURL == "" {
kubeconfig, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return kubeconfig, nil
}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&clientcmd.ConfigOverrides{
ClusterInfo: clientcmdapi.Cluster{
Server: masterURL,
},
}).ClientConfig()
}
/**
* Handles fatal init error that prevents server from doing any work. Prints verbose error
* message and quits the server.
*/
func handleFatalInitError(err error) {
glog.Fatalf("Error while initializing connection to Kubernetes apiserver. "+
"This most likely means that the cluster is misconfigured (e.g., it has "+
"invalid apiserver certificates or service accounts configuration). Reason: %s\n"+
"Refer to the troubleshooting guide for more information: "+
"https://github.com/kubernetes/ingress-nginx/blob/master/docs/troubleshooting.md", err)
}
func registerHandlers(enableProfiling bool, port int, ic *controller.NGINXController, mux *http.ServeMux) {
// expose health check endpoint (/healthz)
healthz.InstallHandler(mux,
healthz.PingHealthz,
ic,
)
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/build", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
b, _ := json.Marshal(version.String())
w.Write(b)
})
mux.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) {
err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
if err != nil {
glog.Errorf("unexpected error: %v", err)
}
})
if enableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}
server := &http.Server{
Addr: fmt.Sprintf(":%v", port),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}
func createDefaultSSLCertificate() (string, string) {
defCert, defKey := ssl.GetFakeSSLCert()
c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{})
if err != nil {
glog.Fatalf("Error generating self signed certificate: %v", err)
}
return c.PemSHA, c.PemFileName
}

View file

@ -14,28 +14,40 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
package file
import "testing"
import (
"io/ioutil"
"testing"
)
func TestDiff(t *testing.T) {
func TestSHA1(t *testing.T) {
tests := []struct {
a []byte
b []byte
empty bool
content []byte
sha string
}{
{[]byte(""), []byte(""), true},
{[]byte("a"), []byte("a"), true},
{[]byte("a"), []byte("b"), false},
{[]byte(""), "da39a3ee5e6b4b0d3255bfef95601890afd80709"},
{[]byte("hello world"), "2aae6c35c94fcfb415dbe95f408b9ce91ee846ed"},
}
for _, test := range tests {
b, err := diff(test.a, test.b)
f, err := ioutil.TempFile("", "sha-test")
if err != nil {
t.Fatalf("unexpected error returned: %v", err)
t.Fatal(err)
}
if len(b) == 0 && !test.empty {
t.Fatalf("expected empty but returned %s", b)
f.Write(test.content)
f.Sync()
sha := SHA1(f.Name())
f.Close()
if sha != test.sha {
t.Fatalf("expected %v but returned %s", test.sha, sha)
}
}
sha := SHA1("")
if sha != ""{
t.Fatalf("expected an empty sha but returned %s", sha)
}
}

View file

@ -77,7 +77,7 @@ func TestIngressCorsConfig(t *testing.T) {
t.Errorf("expected a Config type")
}
if nginxCors.CorsEnabled != true {
if !nginxCors.CorsEnabled {
t.Errorf("expected cors enabled but returned %v", nginxCors.CorsEnabled)
}

View file

@ -36,7 +36,7 @@ import (
// syncSecret keeps in sync Secrets used by Ingress rules with the files on
// disk to allow copy of the content of the secret to disk to be used
// by external processes.
func (ic *GenericController) syncSecret(key string) {
func (ic *NGINXController) syncSecret(key string) {
glog.V(3).Infof("starting syncing of secret %v", key)
cert, err := ic.getPemCertificate(key)
@ -70,7 +70,7 @@ func (ic *GenericController) syncSecret(key string) {
// getPemCertificate receives a secret, and creates a ingress.SSLCert as return.
// It parses the secret and verifies if it's a keypair, or a 'ca.crt' secret only.
func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
func (ic *NGINXController) getPemCertificate(secretName string) (*ingress.SSLCert, error) {
secret, err := ic.listers.Secret.GetByName(secretName)
if err != nil {
return nil, fmt.Errorf("error retrieving secret %v: %v", secretName, err)
@ -127,7 +127,7 @@ func (ic *GenericController) getPemCertificate(secretName string) (*ingress.SSLC
// checkMissingSecrets verify if one or more ingress rules contains a reference
// to a secret that is not present in the local secret store.
// In this case we call syncSecret.
func (ic *GenericController) checkMissingSecrets() {
func (ic *NGINXController) checkMissingSecrets() {
for _, obj := range ic.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)

View file

@ -103,8 +103,8 @@ func buildControllerForBackendSSL() cache_client.Controller {
return cache_client.New(cfg)
}
func buildGenericControllerForBackendSSL() *GenericController {
gc := &GenericController{
func buildGenericControllerForBackendSSL() *NGINXController {
gc := &NGINXController{
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
cfg: &Configuration{
Client: buildSimpleClientSetForBackendSSL(),

View file

@ -0,0 +1,68 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"github.com/golang/glog"
"github.com/ncabatoff/process-exporter/proc"
)
// Name returns the healthcheck name
func (n NGINXController) Name() string {
return "Ingress Controller"
}
// Check returns if the nginx healthz endpoint is returning ok (status code 200)
func (n NGINXController) Check(_ *http.Request) error {
res, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", n.cfg.ListenPorts.Status, ngxHealthPath))
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("ingress controller is not healthy")
}
// check the nginx master process is running
fs, err := proc.NewFS("/proc")
if err != nil {
glog.Errorf("%v", err)
return err
}
f, err := ioutil.ReadFile("/run/nginx.pid")
if err != nil {
glog.Errorf("%v", err)
return err
}
pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n"))
if err != nil {
return err
}
_, err = fs.NewProc(pid)
if err != nil {
glog.Errorf("%v", err)
return err
}
return nil
}

View file

@ -24,7 +24,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -35,12 +34,7 @@ import (
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/ingress-nginx/pkg/ingress"
@ -48,11 +42,10 @@ import (
"k8s.io/ingress-nginx/pkg/ingress/annotations/healthcheck"
"k8s.io/ingress-nginx/pkg/ingress/annotations/parser"
"k8s.io/ingress-nginx/pkg/ingress/annotations/proxy"
ngx_config "k8s.io/ingress-nginx/pkg/ingress/controller/config"
"k8s.io/ingress-nginx/pkg/ingress/defaults"
"k8s.io/ingress-nginx/pkg/ingress/resolver"
"k8s.io/ingress-nginx/pkg/ingress/status"
"k8s.io/ingress-nginx/pkg/k8s"
"k8s.io/ingress-nginx/pkg/net/ssl"
"k8s.io/ingress-nginx/pkg/task"
)
@ -60,63 +53,32 @@ const (
defUpstreamName = "upstream-default-backend"
defServerName = "_"
rootLocation = "/"
fakeCertificate = "default-fake-certificate"
)
var (
// list of ports that cannot be used by TCP or UDP services
reservedPorts = []string{"80", "443", "8181", "18080"}
fakeCertificatePath = ""
fakeCertificateSHA = ""
cloner = conversion.NewCloner()
cloner *conversion.Cloner
)
// GenericController holds the boilerplate code required to build an Ingress controlller.
type GenericController struct {
cfg *Configuration
listers *ingress.StoreLister
cacheController *cacheController
annotations annotationExtractor
recorder record.EventRecorder
syncQueue *task.Queue
syncStatus status.Sync
// local store of SSL certificates
// (only certificates used in ingress)
sslCertTracker *sslCertTracker
syncRateLimiter flowcontrol.RateLimiter
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock *sync.Mutex
stopCh chan struct{}
// runningConfig contains the running configuration in the Backend
runningConfig *ingress.Configuration
forceReload int32
func init() {
cloner := conversion.NewCloner()
cloner.RegisterDeepCopyFunc(ingress.GetGeneratedDeepCopyFuncs)
}
// Configuration contains all the settings required by an Ingress controller
type Configuration struct {
Client clientset.Interface
APIServerHost string
KubeConfigFile string
Client clientset.Interface
ResyncPeriod time.Duration
ResyncPeriod time.Duration
ConfigMapName string
DefaultService string
IngressClass string
Namespace string
ConfigMapName string
ForceNamespaceIsolation bool
DisableNodeList bool
@ -124,15 +86,14 @@ type Configuration struct {
// optional
TCPConfigMapName string
// optional
UDPConfigMapName string
DefaultSSLCertificate string
UDPConfigMapName string
DefaultHealthzURL string
DefaultIngressClass string
DefaultSSLCertificate string
// optional
PublishService string
// Backend is the particular implementation to be used.
// (for instance NGINX)
Backend ingress.Controller
UpdateStatus bool
UseNodeInternalIP bool
@ -140,74 +101,25 @@ type Configuration struct {
UpdateStatusOnShutdown bool
SortBackends bool
}
// newIngressController creates an Ingress controller
func newIngressController(config *Configuration) *GenericController {
ListenPorts *ngx_config.ListenPorts
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: config.Client.CoreV1().Events(config.Namespace),
})
EnableSSLPassthrough bool
ic := GenericController{
cfg: config,
stopLock: &sync.Mutex{},
stopCh: make(chan struct{}),
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "ingress-controller",
}),
sslCertTracker: newSSLCertTracker(),
}
EnableProfiling bool
ic.syncQueue = task.NewTaskQueue(ic.syncIngress)
ic.listers, ic.cacheController = ic.createListers(config.DisableNodeList)
if config.UpdateStatus {
ic.syncStatus = status.NewStatusSyncer(status.Config{
Client: config.Client,
PublishService: ic.cfg.PublishService,
IngressLister: ic.listers.Ingress,
ElectionID: config.ElectionID,
IngressClass: config.IngressClass,
DefaultIngressClass: config.DefaultIngressClass,
UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
CustomIngressStatus: ic.cfg.Backend.UpdateIngressStatus,
UseNodeInternalIP: ic.cfg.UseNodeInternalIP,
})
} else {
glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)")
}
ic.annotations = newAnnotationExtractor(ic)
ic.cfg.Backend.SetListers(ic.listers)
cloner.RegisterDeepCopyFunc(ingress.GetGeneratedDeepCopyFuncs)
return &ic
}
// Info returns information about the backend
func (ic GenericController) Info() *ingress.BackendInfo {
return ic.cfg.Backend.Info()
}
// IngressClass returns information about the backend
func (ic GenericController) IngressClass() string {
return ic.cfg.IngressClass
FakeCertificatePath string
FakeCertificateSHA string
}
// GetDefaultBackend returns the default backend
func (ic GenericController) GetDefaultBackend() defaults.Backend {
return ic.cfg.Backend.BackendDefaults()
func (n NGINXController) GetDefaultBackend() defaults.Backend {
return n.backendDefaults
}
// GetPublishService returns the configured service used to set ingress status
func (ic GenericController) GetPublishService() *apiv1.Service {
s, err := ic.listers.Service.GetByName(ic.cfg.PublishService)
func (n NGINXController) GetPublishService() *apiv1.Service {
s, err := n.listers.Service.GetByName(n.cfg.PublishService)
if err != nil {
return nil
}
@ -215,42 +127,37 @@ func (ic GenericController) GetPublishService() *apiv1.Service {
return s
}
// GetRecorder returns the event recorder
func (ic GenericController) GetRecorder() record.EventRecorder {
return ic.recorder
}
// GetSecret searches for a secret in the local secrets Store
func (ic GenericController) GetSecret(name string) (*apiv1.Secret, error) {
return ic.listers.Secret.GetByName(name)
func (n NGINXController) GetSecret(name string) (*apiv1.Secret, error) {
return n.listers.Secret.GetByName(name)
}
// GetService searches for a service in the local secrets Store
func (ic GenericController) GetService(name string) (*apiv1.Service, error) {
return ic.listers.Service.GetByName(name)
func (n NGINXController) GetService(name string) (*apiv1.Service, error) {
return n.listers.Service.GetByName(name)
}
// sync collects all the pieces required to assemble the configuration file and
// then sends the content to the backend (OnUpdate) receiving the populated
// template as response reloading the backend if is required.
func (ic *GenericController) syncIngress(item interface{}) error {
ic.syncRateLimiter.Accept()
func (n *NGINXController) syncIngress(item interface{}) error {
n.syncRateLimiter.Accept()
if ic.syncQueue.IsShuttingDown() {
if n.syncQueue.IsShuttingDown() {
return nil
}
if element, ok := item.(task.Element); ok {
if name, ok := element.Key.(string); ok {
if obj, exists, _ := ic.listers.Ingress.GetByKey(name); exists {
if obj, exists, _ := n.listers.Ingress.GetByKey(name); exists {
ing := obj.(*extensions.Ingress)
ic.readSecrets(ing)
n.readSecrets(ing)
}
}
}
// Sort ingress rules using the ResourceVersion field
ings := ic.listers.Ingress.List()
ings := n.listers.Ingress.List()
sort.SliceStable(ings, func(i, j int) bool {
ir := ings[i].(*extensions.Ingress).ResourceVersion
jr := ings[j].(*extensions.Ingress).ResourceVersion
@ -261,14 +168,14 @@ func (ic *GenericController) syncIngress(item interface{}) error {
var ingresses []*extensions.Ingress
for _, ingIf := range ings {
ing := ingIf.(*extensions.Ingress)
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
if !class.IsValid(ing, n.cfg.IngressClass, n.cfg.DefaultIngressClass) {
continue
}
ingresses = append(ingresses, ing)
}
upstreams, servers := ic.getBackendServers(ingresses)
upstreams, servers := n.getBackendServers(ingresses)
var passUpstreams []*ingress.SSLPassthroughBackend
for _, server := range servers {
@ -294,19 +201,19 @@ func (ic *GenericController) syncIngress(item interface{}) error {
pcfg := ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: ic.getStreamServices(ic.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,
}
if !ic.isForceReload() && ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg) {
if !n.isForceReload() && n.runningConfig != nil && n.runningConfig.Equal(&pcfg) {
glog.V(3).Infof("skipping backend reload (no changes detected)")
return nil
}
glog.Infof("backend reload required")
err := ic.cfg.Backend.OnUpdate(pcfg)
err := n.OnUpdate(pcfg)
if err != nil {
incReloadErrorCount()
glog.Errorf("unexpected failure restarting the backend: \n%v", err)
@ -317,13 +224,13 @@ func (ic *GenericController) syncIngress(item interface{}) error {
incReloadCount()
setSSLExpireTime(servers)
ic.runningConfig = &pcfg
ic.SetForceReload(false)
n.runningConfig = &pcfg
n.SetForceReload(false)
return nil
}
func (ic *GenericController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service {
func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service {
glog.V(3).Infof("obtaining information about stream services of type %v located in configmap %v", proto, configmapName)
if configmapName == "" {
// no configmap configured
@ -336,7 +243,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
return []ingress.L4Service{}
}
configmap, err := ic.listers.ConfigMap.GetByName(configmapName)
configmap, err := n.listers.ConfigMap.GetByName(configmapName)
if err != nil {
glog.Errorf("unexpected error reading configmap %v: %v", configmapName, err)
return []ingress.L4Service{}
@ -386,7 +293,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
continue
}
svcObj, svcExists, err := ic.listers.Service.GetByKey(nsName)
svcObj, svcExists, err := n.listers.Service.GetByKey(nsName)
if err != nil {
glog.Warningf("error getting service %v: %v", nsName, err)
continue
@ -406,7 +313,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
for _, sp := range svc.Spec.Ports {
if sp.Name == svcPort {
if sp.Protocol == proto {
endps = ic.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{})
endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{})
break
}
}
@ -417,7 +324,7 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
for _, sp := range svc.Spec.Ports {
if sp.Port == int32(targetPort) {
if sp.Protocol == proto {
endps = ic.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{})
endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Upstream{})
break
}
}
@ -450,29 +357,29 @@ func (ic *GenericController) getStreamServices(configmapName string, proto apiv1
// getDefaultUpstream returns an upstream associated with the
// default backend service. In case of error retrieving information
// configure the upstream to return http code 503.
func (ic *GenericController) getDefaultUpstream() *ingress.Backend {
func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
upstream := &ingress.Backend{
Name: defUpstreamName,
}
svcKey := ic.cfg.DefaultService
svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey)
svcKey := n.cfg.DefaultService
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey)
if err != nil {
glog.Warningf("unexpected error searching the default backend %v: %v", ic.cfg.DefaultService, err)
upstream.Endpoints = append(upstream.Endpoints, ic.cfg.Backend.DefaultEndpoint())
glog.Warningf("unexpected error searching the default backend %v: %v", n.cfg.DefaultService, err)
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream
}
if !svcExists {
glog.Warningf("service %v does not exist", svcKey)
upstream.Endpoints = append(upstream.Endpoints, ic.cfg.Backend.DefaultEndpoint())
upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint())
return upstream
}
svc := svcObj.(*apiv1.Service)
endps := ic.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Upstream{})
endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Upstream{})
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
endps = []ingress.Endpoint{ic.cfg.Backend.DefaultEndpoint()}
endps = []ingress.Endpoint{n.DefaultEndpoint()}
}
upstream.Service = svc
@ -482,14 +389,14 @@ func (ic *GenericController) getDefaultUpstream() *ingress.Backend {
// getBackendServers returns a list of Upstream and Server to be used by the backend
// An upstream can be used in multiple servers if the namespace, service name and port are the same
func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress) ([]*ingress.Backend, []*ingress.Server) {
du := ic.getDefaultUpstream()
upstreams := ic.createUpstreams(ingresses, du)
servers := ic.createServers(ingresses, upstreams, du)
func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]*ingress.Backend, []*ingress.Server) {
du := n.getDefaultUpstream()
upstreams := n.createUpstreams(ingresses, du)
servers := n.createServers(ingresses, upstreams, du)
for _, ing := range ingresses {
affinity := ic.annotations.SessionAffinity(ing)
anns := ic.annotations.Extract(ing)
affinity := n.annotations.SessionAffinity(ing)
anns := n.annotations.Extract(ing)
for _, rule := range ing.Spec.Rules {
host := rule.Host
@ -508,7 +415,7 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress)
}
if server.CertificateAuth.CAFileName == "" {
ca := ic.annotations.CertificateAuth(ing)
ca := n.annotations.CertificateAuth(ing)
if ca != nil {
server.CertificateAuth = *ca
// It is possible that no CAFileName is found in the secret
@ -609,7 +516,7 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress)
// check if the location contains endpoints and a custom default backend
if location.DefaultBackend != nil {
sp := location.DefaultBackend.Spec.Ports[0]
endps := ic.getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Upstream{})
endps := n.getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Upstream{})
if len(endps) > 0 {
glog.V(3).Infof("using custom default backend in server %v location %v (service %v/%v)",
server.Hostname, location.Path, location.DefaultBackend.Namespace, location.DefaultBackend.Name)
@ -656,7 +563,7 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress)
aUpstreams = append(aUpstreams, upstream)
}
if ic.cfg.SortBackends {
if n.cfg.SortBackends {
sort.SliceStable(aUpstreams, func(a, b int) bool {
return aUpstreams[a].Name < aUpstreams[b].Name
})
@ -678,17 +585,17 @@ func (ic *GenericController) getBackendServers(ingresses []*extensions.Ingress)
}
// GetAuthCertificate is used by the auth-tls annotations to get a cert from a secret
func (ic GenericController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
if _, exists := ic.sslCertTracker.Get(name); !exists {
ic.syncSecret(name)
func (n NGINXController) GetAuthCertificate(name string) (*resolver.AuthSSLCert, error) {
if _, exists := n.sslCertTracker.Get(name); !exists {
n.syncSecret(name)
}
_, err := ic.listers.Secret.GetByName(name)
_, err := n.listers.Secret.GetByName(name)
if err != nil {
return &resolver.AuthSSLCert{}, fmt.Errorf("unexpected error: %v", err)
}
bc, exists := ic.sslCertTracker.Get(name)
bc, exists := n.sslCertTracker.Get(name)
if !exists {
return &resolver.AuthSSLCert{}, fmt.Errorf("secret %v does not exist", name)
}
@ -702,15 +609,15 @@ func (ic GenericController) GetAuthCertificate(name string) (*resolver.AuthSSLCe
// createUpstreams creates the NGINX upstreams for each service referenced in
// Ingress rules. The servers inside the upstream are endpoints.
func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ingress.Backend) map[string]*ingress.Backend {
func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingress.Backend) map[string]*ingress.Backend {
upstreams := make(map[string]*ingress.Backend)
upstreams[defUpstreamName] = du
for _, ing := range data {
secUpstream := ic.annotations.SecureUpstream(ing)
hz := ic.annotations.HealthCheck(ing)
serviceUpstream := ic.annotations.ServiceUpstream(ing)
upstreamHashBy := ic.annotations.UpstreamHashBy(ing)
secUpstream := n.annotations.SecureUpstream(ing)
hz := n.annotations.HealthCheck(ing)
serviceUpstream := n.annotations.ServiceUpstream(ing)
upstreamHashBy := n.annotations.UpstreamHashBy(ing)
var defBackend string
if ing.Spec.Backend != nil {
@ -726,7 +633,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
endpoint, err := n.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
if err != nil {
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
@ -735,7 +642,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing
}
if len(upstreams[defBackend].Endpoints) == 0 {
endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
endps, err := n.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", defBackend, err)
@ -780,7 +687,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, &path.Backend)
endpoint, err := n.getServiceClusterEndpoint(svcKey, &path.Backend)
if err != nil {
glog.Errorf("failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
@ -789,7 +696,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing
}
if len(upstreams[name].Endpoints) == 0 {
endp, err := ic.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz)
endp, err := n.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz)
if err != nil {
glog.Warningf("error obtaining service endpoints: %v", err)
continue
@ -797,7 +704,7 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing
upstreams[name].Endpoints = endp
}
s, err := ic.listers.Service.GetByName(svcKey)
s, err := n.listers.Service.GetByName(svcKey)
if err != nil {
glog.Warningf("error obtaining service: %v", err)
continue
@ -811,8 +718,8 @@ func (ic *GenericController) createUpstreams(data []*extensions.Ingress, du *ing
return upstreams
}
func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
svcObj, svcExists, err := ic.listers.Service.GetByKey(svcKey)
func (n *NGINXController) getServiceClusterEndpoint(svcKey string, backend *extensions.IngressBackend) (endpoint ingress.Endpoint, err error) {
svcObj, svcExists, err := n.listers.Service.GetByKey(svcKey)
if !svcExists {
return endpoint, fmt.Errorf("service %v does not exist", svcKey)
@ -848,9 +755,9 @@ func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *e
// serviceEndpoints returns the upstream servers (endpoints) associated
// to a service.
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
hz *healthcheck.Upstream) ([]ingress.Endpoint, error) {
svc, err := ic.listers.Service.GetByName(svcKey)
svc, err := n.listers.Service.GetByName(svcKey)
var upstreams []ingress.Endpoint
if err != nil {
@ -864,12 +771,12 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
servicePort.TargetPort.String() == backendPort ||
servicePort.Name == backendPort {
endps := ic.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz)
endps := n.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
}
if ic.cfg.SortBackends {
if n.cfg.SortBackends {
sort.SliceStable(endps, func(i, j int) bool {
iName := endps[i].Address
jName := endps[j].Address
@ -898,7 +805,7 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
Port: int32(externalPort),
TargetPort: intstr.FromString(backendPort),
}
endps := ic.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz)
endps := n.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
return upstreams, nil
@ -908,7 +815,7 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
return upstreams, nil
}
if !ic.cfg.SortBackends {
if !n.cfg.SortBackends {
rand.Seed(time.Now().UnixNano())
for i := range upstreams {
j := rand.Intn(i + 1)
@ -923,7 +830,7 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
// FDQN referenced by ingress rules and the common name field in the referenced
// SSL certificates. Each server is configured with location / using a default
// backend specified by the user or the one inside the ingress spec.
func (ic *GenericController) createServers(data []*extensions.Ingress,
func (n *NGINXController) createServers(data []*extensions.Ingress,
upstreams map[string]*ingress.Backend,
du *ingress.Backend) map[string]*ingress.Server {
@ -932,7 +839,7 @@ func (ic *GenericController) createServers(data []*extensions.Ingress,
// remove the alias to avoid conflicts.
aliases := make(map[string]string, len(data))
bdef := ic.GetDefaultBackend()
bdef := n.GetDefaultBackend()
ngxProxy := proxy.Configuration{
BodySize: bdef.ProxyBodySize,
ConnectTimeout: bdef.ProxyConnectTimeout,
@ -946,12 +853,12 @@ func (ic *GenericController) createServers(data []*extensions.Ingress,
}
// generated on Start() with createDefaultSSLCertificate()
defaultPemFileName := fakeCertificatePath
defaultPemSHA := fakeCertificateSHA
defaultPemFileName := n.cfg.FakeCertificatePath
defaultPemSHA := n.cfg.FakeCertificateSHA
// Tries to fetch the default Certificate from nginx configuration.
// If it does not exists, use the ones generated on Start()
defaultCertificate, err := ic.getPemCertificate(ic.cfg.DefaultSSLCertificate)
defaultCertificate, err := n.getPemCertificate(n.cfg.DefaultSSLCertificate)
if err == nil {
defaultPemFileName = defaultCertificate.PemFileName
defaultPemSHA = defaultCertificate.PemSHA
@ -976,7 +883,7 @@ func (ic *GenericController) createServers(data []*extensions.Ingress,
for _, ing := range data {
// check if ssl passthrough is configured
sslpt := ic.annotations.SSLPassthrough(ing)
sslpt := n.annotations.SSLPassthrough(ing)
// default upstream server
un := du.Name
@ -1028,8 +935,8 @@ func (ic *GenericController) createServers(data []*extensions.Ingress,
// configure default location, alias, and SSL
for _, ing := range data {
// setup server-alias based on annotations
aliasAnnotation := ic.annotations.Alias(ing)
srvsnippet := ic.annotations.ServerSnippet(ing)
aliasAnnotation := n.annotations.Alias(ing)
srvsnippet := n.annotations.ServerSnippet(ing)
for _, rule := range ing.Spec.Rules {
host := rule.Host
@ -1095,7 +1002,7 @@ func (ic *GenericController) createServers(data []*extensions.Ingress,
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tlsSecretName)
bc, exists := ic.sslCertTracker.Get(key)
bc, exists := n.sslCertTracker.Get(key)
if !exists {
glog.Warningf("ssl certificate \"%v\" does not exist in local store", key)
continue
@ -1130,7 +1037,7 @@ func (ic *GenericController) createServers(data []*extensions.Ingress,
}
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func (ic *GenericController) getEndpoints(
func (n *NGINXController) getEndpoints(
s *apiv1.Service,
servicePort *apiv1.ServicePort,
proto apiv1.Protocol,
@ -1171,7 +1078,7 @@ func (ic *GenericController) getEndpoints(
}
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := ic.listers.Endpoint.GetServiceEndpoints(s)
ep, err := n.listers.Endpoint.GetServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return upsServers
@ -1221,99 +1128,32 @@ func (ic *GenericController) getEndpoints(
}
// readSecrets extracts information about secrets from an Ingress rule
func (ic *GenericController) readSecrets(ing *extensions.Ingress) {
func (n *NGINXController) readSecrets(ing *extensions.Ingress) {
for _, tls := range ing.Spec.TLS {
if tls.SecretName == "" {
continue
}
key := fmt.Sprintf("%v/%v", ing.Namespace, tls.SecretName)
ic.syncSecret(key)
n.syncSecret(key)
}
key, _ := parser.GetStringAnnotation("ingress.kubernetes.io/auth-tls-secret", ing)
if key == "" {
return
}
ic.syncSecret(key)
n.syncSecret(key)
}
// Stop stops the loadbalancer controller.
func (ic GenericController) Stop() error {
ic.stopLock.Lock()
defer ic.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if !ic.syncQueue.IsShuttingDown() {
glog.Infof("shutting down controller queues")
close(ic.stopCh)
go ic.syncQueue.Shutdown()
if ic.syncStatus != nil {
ic.syncStatus.Shutdown()
}
return nil
}
return fmt.Errorf("shutdown already in progress")
func (n *NGINXController) isForceReload() bool {
return atomic.LoadInt32(&n.forceReload) != 0
}
// Start starts the Ingress controller.
func (ic *GenericController) Start() {
glog.Infof("starting Ingress controller")
ic.cacheController.Run(ic.stopCh)
createDefaultSSLCertificate()
time.Sleep(5 * time.Second)
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, obj := range ic.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
}
ic.readSecrets(ing)
}
go ic.syncQueue.Run(time.Second, ic.stopCh)
if ic.syncStatus != nil {
go ic.syncStatus.Run(ic.stopCh)
}
go wait.Until(ic.checkMissingSecrets, 30*time.Second, ic.stopCh)
// force initial sync
ic.syncQueue.Enqueue(&extensions.Ingress{})
<-ic.stopCh
}
func (ic *GenericController) isForceReload() bool {
return atomic.LoadInt32(&ic.forceReload) != 0
}
func (ic *GenericController) SetForceReload(shouldReload bool) {
func (n *NGINXController) SetForceReload(shouldReload bool) {
if shouldReload {
atomic.StoreInt32(&ic.forceReload, 1)
ic.syncQueue.Enqueue(&extensions.Ingress{})
atomic.StoreInt32(&n.forceReload, 1)
n.syncQueue.Enqueue(&extensions.Ingress{})
} else {
atomic.StoreInt32(&ic.forceReload, 0)
atomic.StoreInt32(&n.forceReload, 0)
}
}
func createDefaultSSLCertificate() {
defCert, defKey := ssl.GetFakeSSLCert()
c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{})
if err != nil {
glog.Fatalf("Error generating self signed certificate: %v", err)
}
fakeCertificateSHA = c.PemSHA
fakeCertificatePath = c.PemFileName
}

View file

@ -1,335 +0,0 @@
package controller
import (
"encoding/json"
"flag"
"fmt"
"net/http"
"net/http/pprof"
"os"
"strings"
"syscall"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/ingress-nginx/pkg/ingress"
"k8s.io/ingress-nginx/pkg/k8s"
)
// NewIngressController returns a configured Ingress controller
func NewIngressController(backend ingress.Controller) *GenericController {
var (
flags = pflag.NewFlagSet("", pflag.ExitOnError)
apiserverHost = flags.String("apiserver-host", "", "The address of the Kubernetes Apiserver "+
"to connect to in the format of protocol://address:port, e.g., "+
"http://localhost:8080. If not specified, the assumption is that the binary runs inside a "+
"Kubernetes cluster and local discovery is attempted.")
kubeConfigFile = flags.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.")
defaultSvc = flags.String("default-backend-service", "",
`Service used to serve a 404 page for the default backend. Takes the form
namespace/name. The controller uses the first node port of this Service for
the default backend.`)
ingressClass = flags.String("ingress-class", "",
`Name of the ingress class to route through this controller.`)
configMap = flags.String("configmap", "",
`Name of the ConfigMap that contains the custom configuration to use`)
publishSvc = flags.String("publish-service", "",
`Service fronting the ingress controllers. Takes the form
namespace/name. The controller will set the endpoint records on the
ingress objects to reflect those on the service.`)
tcpConfigMapName = flags.String("tcp-services-configmap", "",
`Name of the ConfigMap that contains the definition of the TCP services to expose.
The key in the map indicates the external port to be used. The value is the name of the
service with the format namespace/serviceName and the port of the service could be a
number of the name of the port.
The ports 80 and 443 are not allowed as external ports. This ports are reserved for the backend`)
udpConfigMapName = flags.String("udp-services-configmap", "",
`Name of the ConfigMap that contains the definition of the UDP services to expose.
The key in the map indicates the external port to be used. The value is the name of the
service with the format namespace/serviceName and the port of the service could be a
number of the name of the port.`)
resyncPeriod = flags.Duration("sync-period", 600*time.Second,
`Relist and confirm cloud resources this often. Default is 10 minutes`)
watchNamespace = flags.String("watch-namespace", apiv1.NamespaceAll,
`Namespace to watch for Ingress. Default is to watch all namespaces`)
healthzPort = flags.Int("healthz-port", 10254, "port for healthz endpoint.")
profiling = flags.Bool("profiling", true, `Enable profiling via web interface host:port/debug/pprof/`)
defSSLCertificate = flags.String("default-ssl-certificate", "", `Name of the secret
that contains a SSL certificate to be used as default for a HTTPS catch-all server`)
defHealthzURL = flags.String("health-check-path", "/healthz", `Defines
the URL to be used as health check inside in the default server in NGINX.`)
updateStatus = flags.Bool("update-status", true, `Indicates if the
ingress controller should update the Ingress status IP/hostname. Default is true`)
electionID = flags.String("election-id", "ingress-controller-leader", `Election id to use for status update.`)
forceIsolation = flags.Bool("force-namespace-isolation", false,
`Force namespace isolation. This flag is required to avoid the reference of secrets or
configmaps located in a different namespace than the specified in the flag --watch-namespace.`)
disableNodeList = flags.Bool("disable-node-list", false,
`Disable querying nodes. If --force-namespace-isolation is true, this should also be set.`)
updateStatusOnShutdown = flags.Bool("update-status-on-shutdown", true, `Indicates if the
ingress controller should update the Ingress status IP/hostname when the controller
is being stopped. Default is true`)
sortBackends = flags.Bool("sort-backends", false,
`Defines if backends and it's endpoints should be sorted`)
useNodeInternalIP = flags.Bool("report-node-internal-ip-address", false,
`Defines if the nodes IP address to be returned in the ingress status should be the internal instead of the external IP address`)
showVersion = flags.Bool("version", false,
`Shows release information about the NGINX Ingress controller`)
)
flags.AddGoFlagSet(flag.CommandLine)
backend.ConfigureFlags(flags)
flags.Parse(os.Args)
// Workaround for this issue:
// https://github.com/kubernetes/kubernetes/issues/17162
flag.CommandLine.Parse([]string{})
if *showVersion {
fmt.Println(backend.Info().String())
os.Exit(0)
}
backend.OverrideFlags(flags)
flag.Set("logtostderr", "true")
glog.Info(backend.Info())
if *ingressClass != "" {
glog.Infof("Watching for ingress class: %s", *ingressClass)
}
if *defaultSvc == "" {
glog.Fatalf("Please specify --default-backend-service")
}
kubeClient, err := createApiserverClient(*apiserverHost, *kubeConfigFile)
if err != nil {
handleFatalInitError(err)
}
ns, name, err := k8s.ParseNameNS(*defaultSvc)
if err != nil {
glog.Fatalf("invalid format for service %v: %v", *defaultSvc, err)
}
_, err = kubeClient.Core().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
if strings.Contains(err.Error(), "cannot get services in the namespace") {
glog.Fatalf("✖ It seems the cluster it is running with Authorization enabled (like RBAC) and there is no permissions for the ingress controller. Please check the configuration")
}
glog.Fatalf("no service with name %v found: %v", *defaultSvc, err)
}
glog.Infof("validated %v as the default backend", *defaultSvc)
if *publishSvc != "" {
ns, name, err := k8s.ParseNameNS(*publishSvc)
if err != nil {
glog.Fatalf("invalid service format: %v", err)
}
svc, err := kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
glog.Fatalf("unexpected error getting information about service %v: %v", *publishSvc, err)
}
if len(svc.Status.LoadBalancer.Ingress) == 0 {
if len(svc.Spec.ExternalIPs) > 0 {
glog.Infof("service %v validated as assigned with externalIP", *publishSvc)
} else {
// We could poll here, but we instead just exit and rely on k8s to restart us
glog.Fatalf("service %s does not (yet) have ingress points", *publishSvc)
}
} else {
glog.Infof("service %v validated as source of Ingress status", *publishSvc)
}
}
if *watchNamespace != "" {
_, err = kubeClient.CoreV1().Namespaces().Get(*watchNamespace, metav1.GetOptions{})
if err != nil {
glog.Fatalf("no watchNamespace with name %v found: %v", *watchNamespace, err)
}
}
if resyncPeriod.Seconds() < 10 {
glog.Fatalf("resync period (%vs) is too low", resyncPeriod.Seconds())
}
err = os.MkdirAll(ingress.DefaultSSLDirectory, 0655)
if err != nil {
glog.Errorf("Failed to mkdir SSL directory: %v", err)
}
config := &Configuration{
UpdateStatus: *updateStatus,
ElectionID: *electionID,
Client: kubeClient,
ResyncPeriod: *resyncPeriod,
DefaultService: *defaultSvc,
IngressClass: *ingressClass,
DefaultIngressClass: backend.DefaultIngressClass(),
Namespace: *watchNamespace,
ConfigMapName: *configMap,
TCPConfigMapName: *tcpConfigMapName,
UDPConfigMapName: *udpConfigMapName,
DefaultSSLCertificate: *defSSLCertificate,
DefaultHealthzURL: *defHealthzURL,
PublishService: *publishSvc,
Backend: backend,
ForceNamespaceIsolation: *forceIsolation,
DisableNodeList: *disableNodeList,
UpdateStatusOnShutdown: *updateStatusOnShutdown,
SortBackends: *sortBackends,
UseNodeInternalIP: *useNodeInternalIP,
}
ic := newIngressController(config)
go registerHandlers(*profiling, *healthzPort, ic)
return ic
}
func registerHandlers(enableProfiling bool, port int, ic *GenericController) {
mux := http.NewServeMux()
// expose health check endpoint (/healthz)
healthz.InstallHandler(mux,
healthz.PingHealthz,
ic.cfg.Backend,
)
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/build", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
b, _ := json.Marshal(ic.Info())
w.Write(b)
})
mux.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) {
err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
if err != nil {
glog.Errorf("unexpected error: %v", err)
}
})
if enableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}
server := &http.Server{
Addr: fmt.Sprintf(":%v", port),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}
const (
// High enough QPS to fit all expected use cases. QPS=0 is not set here, because
// client code is overriding it.
defaultQPS = 1e6
// High enough Burst to fit all expected use cases. Burst=0 is not set here, because
// client code is overriding it.
defaultBurst = 1e6
)
// buildConfigFromFlags builds REST config based on master URL and kubeconfig path.
// If both of them are empty then in cluster config is used.
func buildConfigFromFlags(masterURL, kubeconfigPath string) (*rest.Config, error) {
if kubeconfigPath == "" && masterURL == "" {
kubeconfig, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return kubeconfig, nil
}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&clientcmd.ConfigOverrides{
ClusterInfo: clientcmdapi.Cluster{
Server: masterURL,
},
}).ClientConfig()
}
// createApiserverClient creates new Kubernetes Apiserver client. When kubeconfig or apiserverHost param is empty
// the function assumes that it is running inside a Kubernetes cluster and attempts to
// discover the Apiserver. Otherwise, it connects to the Apiserver specified.
//
// apiserverHost param is in the format of protocol://address:port/pathPrefix, e.g.http://localhost:8001.
// kubeConfig location of kubeconfig file
func createApiserverClient(apiserverHost string, kubeConfig string) (*kubernetes.Clientset, error) {
cfg, err := buildConfigFromFlags(apiserverHost, kubeConfig)
if err != nil {
return nil, err
}
cfg.QPS = defaultQPS
cfg.Burst = defaultBurst
cfg.ContentType = "application/vnd.kubernetes.protobuf"
glog.Infof("Creating API client for %s", cfg.Host)
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
v, err := client.Discovery().ServerVersion()
if err != nil {
return nil, err
}
glog.Infof("Running in Kubernetes Cluster version v%v.%v (%v) - git (%v) commit %v - platform %v",
v.Major, v.Minor, v.GitVersion, v.GitTreeState, v.GitCommit, v.Platform)
return client, nil
}
/**
* Handles fatal init error that prevents server from doing any work. Prints verbose error
* message and quits the server.
*/
func handleFatalInitError(err error) {
glog.Fatalf("Error while initializing connection to Kubernetes apiserver. "+
"This most likely means that the cluster is misconfigured (e.g., it has "+
"invalid apiserver certificates or service accounts configuration). Reason: %s\n"+
"Refer to the troubleshooting guide for more information: "+
"https://github.com/kubernetes/ingress-nginx/blob/master/docs/troubleshooting.md", err)
}

View file

@ -64,20 +64,21 @@ func (c *cacheController) Run(stopCh chan struct{}) {
}
}
func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.StoreLister, *cacheController) {
func (n *NGINXController) createListers(disableNodeLister bool, stopCh chan struct{}) *ingress.StoreLister {
// from here to the end of the method all the code is just boilerplate
// required to watch Ingress, Secrets, ConfigMaps and Endoints.
// This is used to detect new content, updates or removals and act accordingly
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !class.IsValid(addIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
if !class.IsValid(addIng, n.cfg.IngressClass, defIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, addIng)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", addIng.Name, class.IngressKey, a)
return
}
ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
ic.syncQueue.Enqueue(obj)
n.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
n.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
delIng, ok := obj.(*extensions.Ingress)
@ -94,29 +95,29 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto
return
}
}
if !class.IsValid(delIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass) {
if !class.IsValid(delIng, n.cfg.IngressClass, defIngressClass) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", delIng.Name, class.IngressKey)
return
}
ic.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
ic.syncQueue.Enqueue(obj)
n.recorder.Eventf(delIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", delIng.Namespace, delIng.Name))
n.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
validCur := class.IsValid(curIng, ic.cfg.IngressClass, ic.cfg.DefaultIngressClass)
validOld := class.IsValid(oldIng, n.cfg.IngressClass, defIngressClass)
validCur := class.IsValid(curIng, n.cfg.IngressClass, defIngressClass)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
ic.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
n.recorder.Eventf(curIng, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
ic.syncQueue.Enqueue(cur)
n.syncQueue.Enqueue(cur)
},
}
@ -125,7 +126,7 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto
if !reflect.DeepEqual(old, cur) {
sec := cur.(*apiv1.Secret)
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
ic.syncSecret(key)
n.syncSecret(key)
}
},
DeleteFunc: func(obj interface{}) {
@ -144,23 +145,23 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto
}
}
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
ic.sslCertTracker.DeleteAll(key)
ic.syncQueue.Enqueue(key)
n.sslCertTracker.DeleteAll(key)
n.syncQueue.Enqueue(key)
},
}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
n.syncQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
ic.syncQueue.Enqueue(obj)
n.syncQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
oep := old.(*apiv1.Endpoints)
ocur := cur.(*apiv1.Endpoints)
if !reflect.DeepEqual(ocur.Subsets, oep.Subsets) {
ic.syncQueue.Enqueue(cur)
n.syncQueue.Enqueue(cur)
}
},
}
@ -169,33 +170,33 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto
AddFunc: func(obj interface{}) {
upCmap := obj.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == ic.cfg.ConfigMapName {
if mapKey == n.cfg.ConfigMapName {
glog.V(2).Infof("adding configmap %v to backend", mapKey)
ic.cfg.Backend.SetConfig(upCmap)
ic.SetForceReload(true)
n.SetConfig(upCmap)
n.SetForceReload(true)
}
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
upCmap := cur.(*apiv1.ConfigMap)
mapKey := fmt.Sprintf("%s/%s", upCmap.Namespace, upCmap.Name)
if mapKey == ic.cfg.ConfigMapName {
if mapKey == n.cfg.ConfigMapName {
glog.V(2).Infof("updating configmap backend (%v)", mapKey)
ic.cfg.Backend.SetConfig(upCmap)
ic.SetForceReload(true)
n.SetConfig(upCmap)
n.SetForceReload(true)
}
// updates to configuration configmaps can trigger an update
if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName {
ic.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
ic.syncQueue.Enqueue(cur)
if mapKey == n.cfg.ConfigMapName || mapKey == n.cfg.TCPConfigMapName || mapKey == n.cfg.UDPConfigMapName {
n.recorder.Eventf(upCmap, apiv1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", mapKey))
n.syncQueue.Enqueue(cur)
}
}
},
}
watchNs := apiv1.NamespaceAll
if ic.cfg.ForceNamespaceIsolation && ic.cfg.Namespace != apiv1.NamespaceAll {
watchNs = ic.cfg.Namespace
if n.cfg.ForceNamespaceIsolation && n.cfg.Namespace != apiv1.NamespaceAll {
watchNs = n.cfg.Namespace
}
lister := &ingress.StoreLister{}
@ -203,34 +204,36 @@ func (ic *GenericController) createListers(disableNodeLister bool) (*ingress.Sto
controller := &cacheController{}
lister.Ingress.Store, controller.Ingress = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
&extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)
cache.NewListWatchFromClient(n.cfg.Client.ExtensionsV1beta1().RESTClient(), "ingresses", n.cfg.Namespace, fields.Everything()),
&extensions.Ingress{}, n.cfg.ResyncPeriod, ingEventHandler)
lister.Endpoint.Store, controller.Endpoint = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
&apiv1.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "endpoints", n.cfg.Namespace, fields.Everything()),
&apiv1.Endpoints{}, n.cfg.ResyncPeriod, eventHandler)
lister.Secret.Store, controller.Secret = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
&apiv1.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "secrets", watchNs, fields.Everything()),
&apiv1.Secret{}, n.cfg.ResyncPeriod, secrEventHandler)
lister.ConfigMap.Store, controller.Configmap = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
&apiv1.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "configmaps", watchNs, fields.Everything()),
&apiv1.ConfigMap{}, n.cfg.ResyncPeriod, mapEventHandler)
lister.Service.Store, controller.Service = cache.NewInformer(
cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
&apiv1.Service{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "services", n.cfg.Namespace, fields.Everything()),
&apiv1.Service{}, n.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
var nodeListerWatcher cache.ListerWatcher
if disableNodeLister {
nodeListerWatcher = fcache.NewFakeControllerSource()
} else {
nodeListerWatcher = cache.NewListWatchFromClient(ic.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
nodeListerWatcher = cache.NewListWatchFromClient(n.cfg.Client.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
}
lister.Node.Store, controller.Node = cache.NewInformer(
nodeListerWatcher,
&apiv1.Node{}, ic.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
&apiv1.Node{}, n.cfg.ResyncPeriod, cache.ResourceEventHandlerFuncs{})
return lister, controller
controller.Run(n.stopCh)
return lister
}

View file

@ -34,7 +34,6 @@ func init() {
prometheus.MustRegister(reloadOperation)
prometheus.MustRegister(reloadOperationErrors)
prometheus.MustRegister(sslExpireTime)
}
var (
@ -74,11 +73,9 @@ func incReloadErrorCount() {
}
func setSSLExpireTime(servers []*ingress.Server) {
for _, s := range servers {
if s.Hostname != defServerName {
sslExpireTime.WithLabelValues(s.Hostname).Set(float64(s.SSLExpireTime.Unix()))
}
}
}

View file

@ -23,31 +23,36 @@ import (
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/golang/glog"
"github.com/mitchellh/go-ps"
"github.com/spf13/pflag"
proxyproto "github.com/armon/go-proxyproto"
"github.com/ncabatoff/process-exporter/proc"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/ingress-nginx/pkg/ingress"
"k8s.io/ingress-nginx/pkg/ingress/controller"
"k8s.io/ingress-nginx/pkg/ingress/annotations/class"
"k8s.io/ingress-nginx/pkg/ingress/annotations/parser"
ngx_config "k8s.io/ingress-nginx/pkg/ingress/controller/config"
"k8s.io/ingress-nginx/pkg/ingress/controller/process"
ngx_template "k8s.io/ingress-nginx/pkg/ingress/controller/template"
"k8s.io/ingress-nginx/pkg/ingress/defaults"
"k8s.io/ingress-nginx/pkg/ingress/status"
ing_net "k8s.io/ingress-nginx/pkg/net"
"k8s.io/ingress-nginx/pkg/net/dns"
"k8s.io/ingress-nginx/pkg/net/ssl"
"k8s.io/ingress-nginx/pkg/nginx/config"
ngx_template "k8s.io/ingress-nginx/pkg/nginx/template"
"k8s.io/ingress-nginx/version"
"k8s.io/ingress-nginx/pkg/task"
)
type statusModule string
@ -57,8 +62,6 @@ const (
defaultStatusModule statusModule = "default"
vtsStatusModule statusModule = "vts"
defUpstreamName = "upstream-default-backend"
)
var (
@ -71,26 +74,66 @@ var (
// NewNGINXController creates a new NGINX Ingress controller.
// If the environment variable NGINX_BINARY exists it will be used
// as source for nginx commands
func NewNGINXController() *NGINXController {
func NewNGINXController(config *Configuration) *NGINXController {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = nginxBinary
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: config.Client.CoreV1().Events(config.Namespace),
})
h, err := dns.GetSystemNameServers()
if err != nil {
glog.Warningf("unexpected error reading system nameservers: %v", err)
}
n := &NGINXController{
backendDefaults: ngx_config.NewDefault().Backend,
binary: ngx,
configmap: &apiv1.ConfigMap{},
isIPV6Enabled: isIPv6Enabled(),
configmap: &apiv1.ConfigMap{},
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
ports: &config.ListenPorts{},
backendDefaults: config.NewDefault().Backend,
cfg: config,
sslCertTracker: newSSLCertTracker(),
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
}),
stopCh: make(chan struct{}),
stopLock: &sync.Mutex{},
}
n.stats = newStatsCollector(config.Namespace, config.IngressClass, n.binary, n.cfg.ListenPorts.Status)
n.syncQueue = task.NewTaskQueue(n.syncIngress)
n.listers = n.createListers(config.DisableNodeList, n.stopCh)
if config.UpdateStatus {
n.syncStatus = status.NewStatusSyncer(status.Config{
Client: config.Client,
PublishService: n.cfg.PublishService,
IngressLister: n.listers.Ingress,
ElectionID: config.ElectionID,
IngressClass: config.IngressClass,
DefaultIngressClass: config.DefaultIngressClass,
UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
UseNodeInternalIP: n.cfg.UseNodeInternalIP,
})
} else {
glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)")
}
n.annotations = newAnnotationExtractor(n)
var onChange func()
onChange = func() {
template, err := ngx_template.NewTemplate(tmplPath, onChange)
@ -107,7 +150,7 @@ Error loading new template : %v
n.t.Close()
n.t = template
glog.Info("new NGINX template loaded")
n.controller.SetForceReload(true)
n.SetForceReload(true)
}
ngxTpl, err := ngx_template.NewTemplate(tmplPath, onChange)
@ -122,8 +165,40 @@ Error loading new template : %v
// NGINXController ...
type NGINXController struct {
controller *controller.GenericController
t *ngx_template.Template
cfg *Configuration
listers *ingress.StoreLister
annotations annotationExtractor
recorder record.EventRecorder
syncQueue *task.Queue
syncStatus status.Sync
// local store of SSL certificates
// (only certificates used in ingress)
sslCertTracker *sslCertTracker
syncRateLimiter flowcontrol.RateLimiter
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock *sync.Mutex
stopCh chan struct{}
// ngxErrCh channel used to detect errors with the nginx processes
ngxErrCh chan error
// runningConfig contains the running configuration in the Backend
runningConfig *ingress.Configuration
forceReload int32
t *ngx_template.Template
configmap *apiv1.ConfigMap
@ -132,8 +207,6 @@ type NGINXController struct {
binary string
resolver []net.IP
cmdArgs []string
stats *statsCollector
statusModule statusModule
@ -141,25 +214,42 @@ type NGINXController struct {
isIPV6Enabled bool
// returns true if proxy protocol es enabled
isProxyProtocolEnabled bool
IsProxyProtocolEnabled bool
isSSLPassthroughEnabled bool
isShuttingDown bool
proxy *proxy
ports *config.ListenPorts
Proxy *TCPProxy
backendDefaults defaults.Backend
}
// Start start a new NGINX master process running in foreground.
func (n *NGINXController) Start() {
n.isShuttingDown = false
glog.Infof("starting Ingress controller")
n.controller = controller.NewIngressController(n)
go n.controller.Start()
// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
for _, obj := range n.listers.Ingress.List() {
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing, n.cfg.IngressClass, n.cfg.DefaultIngressClass) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
continue
}
n.readSecrets(ing)
}
go n.syncQueue.Run(time.Second, n.stopCh)
if n.syncStatus != nil {
go n.syncStatus.Run(n.stopCh)
}
go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh)
done := make(chan error, 1)
cmd := exec.Command(n.binary, "-c", cfgPath)
@ -172,68 +262,55 @@ func (n *NGINXController) Start() {
}
glog.Info("starting NGINX process...")
n.start(cmd, done)
n.start(cmd)
// force initial sync
n.syncQueue.Enqueue(&extensions.Ingress{})
// if the nginx master process dies the workers continue to process requests,
// passing checks but in case of updates in ingress no updates will be
// reflected in the nginx configuration which can lead to confusion and report
// issues because of this behavior.
// To avoid this issue we restart nginx in case of errors.
for {
err := <-done
if n.isShuttingDown {
break
}
if exitError, ok := err.(*exec.ExitError); ok {
waitStatus := exitError.Sys().(syscall.WaitStatus)
glog.Warningf(`
-------------------------------------------------------------------------------
NGINX master process died (%v): %v
-------------------------------------------------------------------------------
`, waitStatus.ExitStatus(), err)
}
cmd.Process.Release()
cmd = exec.Command(n.binary, "-c", cfgPath)
// we wait until the workers are killed
for {
conn, err := net.DialTimeout("tcp", "127.0.0.1:80", 1*time.Second)
if err != nil {
select {
case err := <-done:
if n.isShuttingDown {
break
}
conn.Close()
// kill nginx worker processes
fs, err := proc.NewFS("/proc")
procs, _ := fs.FS.AllProcs()
for _, p := range procs {
pn, err := p.Comm()
if err != nil {
glog.Errorf("unexpected error obtaining process information: %v", err)
continue
}
if pn == "nginx" {
osp, err := os.FindProcess(p.PID)
if err != nil {
glog.Errorf("unexpected error obtaining process information: %v", err)
continue
}
osp.Signal(syscall.SIGQUIT)
}
// if the nginx master process dies the workers continue to process requests,
// passing checks but in case of updates in ingress no updates will be
// reflected in the nginx configuration which can lead to confusion and report
// issues because of this behavior.
// To avoid this issue we restart nginx in case of errors.
if process.IsRespawnIfRequired(err) {
process.WaitUntilPortIsAvailable(n.cfg.ListenPorts.HTTP)
// release command resources
cmd.Process.Release()
cmd = exec.Command(n.binary, "-c", cfgPath)
// start a new nginx master process if the controller is not being stopped
n.start(cmd)
}
time.Sleep(100 * time.Millisecond)
case <-n.stopCh:
break
}
// restart a new nginx master process if the controller
// is not being stopped
n.start(cmd, done)
}
}
// Stop gracefully stops the NGINX master process.
func (n *NGINXController) Stop() error {
n.isShuttingDown = true
n.controller.Stop()
n.stopLock.Lock()
defer n.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if n.syncQueue.IsShuttingDown() {
return fmt.Errorf("shutdown already in progress")
}
glog.Infof("shutting down controller queues")
close(n.stopCh)
go n.syncQueue.Shutdown()
if n.syncStatus != nil {
n.syncStatus.Shutdown()
}
// Send stop signal to Nginx
glog.Info("stopping NGINX process...")
@ -246,192 +323,42 @@ func (n *NGINXController) Stop() error {
}
// Wait for the Nginx process disappear
waitForNginxShutdown()
glog.Info("NGINX process has stopped")
timer := time.NewTicker(time.Second * 1)
for t := range timer.C {
glog.V(3).Infof("tick at", t)
if !process.IsNginxRunning() {
glog.Info("NGINX process has stopped")
timer.Stop()
break
}
}
return nil
}
func (n *NGINXController) start(cmd *exec.Cmd, done chan error) {
func (n *NGINXController) start(cmd *exec.Cmd) {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
glog.Fatalf("nginx error: %v", err)
done <- err
n.ngxErrCh <- err
return
}
n.cmdArgs = cmd.Args
go func() {
done <- cmd.Wait()
n.ngxErrCh <- cmd.Wait()
}()
}
// BackendDefaults returns the nginx defaults
func (n NGINXController) BackendDefaults() defaults.Backend {
return n.backendDefaults
}
// printDiff returns the difference between the running configuration
// and the new one
func (n NGINXController) printDiff(data []byte) {
if !glog.V(2) {
return
}
in, err := os.Open(cfgPath)
if err != nil {
return
}
src, err := ioutil.ReadAll(in)
in.Close()
if err != nil {
return
}
if !bytes.Equal(src, data) {
tmpfile, err := ioutil.TempFile("", "nginx-cfg-diff")
if err != nil {
glog.Errorf("error creating temporal file: %s", err)
return
}
defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), data, 0644)
if err != nil {
return
}
diffOutput, err := diff(src, data)
if err != nil {
glog.Errorf("error computing diff: %s", err)
return
}
glog.Infof("NGINX configuration diff\n")
glog.Infof("%v", string(diffOutput))
os.Remove(tmpfile.Name())
}
}
// Info return build information
func (n NGINXController) Info() *ingress.BackendInfo {
return &ingress.BackendInfo{
Name: "NGINX",
Release: version.RELEASE,
Build: version.COMMIT,
Repository: version.REPO,
}
}
// DefaultEndpoint returns the default endpoint to be use as default server that returns 404.
func (n NGINXController) DefaultEndpoint() ingress.Endpoint {
return ingress.Endpoint{
Address: "127.0.0.1",
Port: fmt.Sprintf("%v", n.ports.Default),
Port: fmt.Sprintf("%v", n.cfg.ListenPorts.Default),
Target: &apiv1.ObjectReference{},
}
}
// ConfigureFlags allow to configure more flags before the parsing of
// command line arguments
func (n *NGINXController) ConfigureFlags(flags *pflag.FlagSet) {
flags.BoolVar(&n.isSSLPassthroughEnabled, "enable-ssl-passthrough", false, `Enable SSL passthrough feature. Default is disabled`)
flags.IntVar(&n.ports.HTTP, "http-port", 80, `Indicates the port to use for HTTP traffic`)
flags.IntVar(&n.ports.HTTPS, "https-port", 443, `Indicates the port to use for HTTPS traffic`)
flags.IntVar(&n.ports.Status, "status-port", 18080, `Indicates the TCP port to use for exposing the nginx status page`)
flags.IntVar(&n.ports.SSLProxy, "ssl-passtrough-proxy-port", 442, `Default port to use internally for SSL when SSL Passthgough is enabled`)
flags.IntVar(&n.ports.Default, "default-server-port", 8181, `Default port to use for exposing the default server (catch all)`)
}
// OverrideFlags customize NGINX controller flags
func (n *NGINXController) OverrideFlags(flags *pflag.FlagSet) {
// we check port collisions
if !isPortAvailable(n.ports.HTTP) {
glog.Fatalf("Port %v is already in use. Please check the flag --http-port", n.ports.HTTP)
}
if !isPortAvailable(n.ports.HTTPS) {
glog.Fatalf("Port %v is already in use. Please check the flag --https-port", n.ports.HTTPS)
}
if !isPortAvailable(n.ports.Status) {
glog.Fatalf("Port %v is already in use. Please check the flag --status-port", n.ports.Status)
}
if !isPortAvailable(n.ports.Default) {
glog.Fatalf("Port %v is already in use. Please check the flag --default-server-port", n.ports.Default)
}
ic, _ := flags.GetString("ingress-class")
wc, _ := flags.GetString("watch-namespace")
if ic == "" {
ic = defIngressClass
}
if ic != defIngressClass {
glog.Warningf("only Ingress with class %v will be processed by this ingress controller", ic)
}
flags.Set("ingress-class", ic)
h, _ := flags.GetInt("healthz-port")
n.ports.Health = h
n.stats = newStatsCollector(wc, ic, n.binary, n.ports.Status)
if n.isSSLPassthroughEnabled {
if !isPortAvailable(n.ports.SSLProxy) {
glog.Fatalf("Port %v is already in use. Please check the flag --ssl-passtrough-proxy-port", n.ports.SSLProxy)
}
glog.Info("starting TLS proxy for SSL passthrough")
n.proxy = &proxy{
Default: &server{
Hostname: "localhost",
IP: "127.0.0.1",
Port: n.ports.SSLProxy,
ProxyProtocol: true,
},
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", n.ports.HTTPS))
if err != nil {
glog.Fatalf("%v", err)
}
proxyList := &proxyproto.Listener{Listener: listener}
// start goroutine that accepts tcp connections in port 443
go func() {
for {
var conn net.Conn
var err error
if n.isProxyProtocolEnabled {
// we need to wrap the listener in order to decode
// proxy protocol before handling the connection
conn, err = proxyList.Accept()
} else {
conn, err = listener.Accept()
}
if err != nil {
glog.Warningf("unexpected error accepting tcp connection: %v", err)
continue
}
glog.V(3).Infof("remote address %s to local %s", conn.RemoteAddr(), conn.LocalAddr())
go n.proxy.Handle(conn)
}
}()
}
}
// DefaultIngressClass just return the default ingress class
func (n NGINXController) DefaultIngressClass() string {
return defIngressClass
}
// testTemplate checks if the NGINX configuration inside the byte array is valid
// running the command "nginx -t" using a temporal file.
func (n NGINXController) testTemplate(cfg []byte) error {
@ -466,7 +393,7 @@ Error: %v
// SetConfig sets the configured configmap
func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) {
n.configmap = cmap
n.isProxyProtocolEnabled = false
n.IsProxyProtocolEnabled = false
m := map[string]string{}
if cmap != nil {
@ -477,7 +404,7 @@ func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) {
if ok {
b, err := strconv.ParseBool(val)
if err == nil {
n.isProxyProtocolEnabled = b
n.IsProxyProtocolEnabled = b
}
}
@ -494,16 +421,6 @@ func (n *NGINXController) SetConfig(cmap *apiv1.ConfigMap) {
n.backendDefaults = c.Backend
}
// SetListers sets the configured store listers in the generic ingress controller
func (n *NGINXController) SetListers(lister *ingress.StoreLister) {
n.storeLister = lister
}
// UpdateIngressStatus custom Ingress status update
func (n *NGINXController) UpdateIngressStatus(*extensions.Ingress) []apiv1.LoadBalancerIngress {
return nil
}
// OnUpdate is called by syncQueue in https://github.com/kubernetes/ingress-nginx/blob/master/pkg/ingress/controller/controller.go#L426
// periodically to keep the configuration in sync.
//
@ -516,7 +433,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := ngx_template.ReadConfig(n.configmap.Data)
cfg.Resolver = n.resolver
servers := []*server{}
servers := []*TCPServer{}
for _, pb := range ingressCfg.PassthroughBackends {
svc := pb.Service
if svc == nil {
@ -541,7 +458,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
}
//TODO: Allow PassthroughBackends to specify they support proxy-protocol
servers = append(servers, &server{
servers = append(servers, &TCPServer{
Hostname: pb.Hostname,
IP: svc.Spec.ClusterIP,
Port: port,
@ -550,7 +467,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
}
if n.isSSLPassthroughEnabled {
n.proxy.ServerList = servers
n.Proxy.ServerList = servers
}
// we need to check if the status module configuration changed
@ -671,7 +588,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg.SSLDHParam = sslDHParam
tc := config.TemplateConfig{
tc := ngx_config.TemplateConfig{
ProxySetHeaders: setHeaders,
AddHeaders: addHeaders,
MaxOpenFiles: maxOpenFiles,
@ -687,8 +604,8 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6,
RedirectServers: redirectServers,
IsSSLPassthroughEnabled: n.isSSLPassthroughEnabled,
ListenPorts: n.ports,
PublishService: n.controller.GetPublishService(),
ListenPorts: n.cfg.ListenPorts,
PublishService: n.GetPublishService(),
}
content, err := n.t.Write(tc)
@ -702,7 +619,34 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
return err
}
n.printDiff(content)
if glog.V(2) {
src, _ := ioutil.ReadFile(cfgPath)
if !bytes.Equal(src, content) {
tmpfile, err := ioutil.TempFile("", "new-nginx-cfg")
if err != nil {
return err
}
defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), content, 0644)
if err != nil {
return err
}
diffOutput, err := exec.Command("diff", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
if err != nil {
return err
}
glog.Infof("NGINX configuration diff\n")
glog.Infof("%v\n", string(diffOutput))
// Do not use defer to remove the temporal file.
// This is helpful when there is an error in the
// temporal configuration (we can manually inspect the file).
// Only remove the file when no error occured.
os.Remove(tmpfile.Name())
}
}
err = ioutil.WriteFile(cfgPath, content, 0644)
if err != nil {
@ -727,46 +671,6 @@ func nginxHashBucketSize(longestString int) int {
return nextPowerOf2(rawSize)
}
// Name returns the healthcheck name
func (n NGINXController) Name() string {
return "Ingress Controller"
}
// Check returns if the nginx healthz endpoint is returning ok (status code 200)
func (n NGINXController) Check(_ *http.Request) error {
res, err := http.Get(fmt.Sprintf("http://0.0.0.0:%v%v", n.ports.Status, ngxHealthPath))
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("ingress controller is not healthy")
}
// check the nginx master process is running
fs, err := proc.NewFS("/proc")
if err != nil {
glog.Errorf("%v", err)
return err
}
f, err := ioutil.ReadFile("/run/nginx.pid")
if err != nil {
glog.Errorf("%v", err)
return err
}
pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n"))
if err != nil {
return err
}
_, err = fs.NewProc(int(pid))
if err != nil {
glog.Errorf("%v", err)
return err
}
return nil
}
// http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
// https://play.golang.org/p/TVSyCcdxUh
func nextPowerOf2(v int) int {
@ -780,32 +684,3 @@ func nextPowerOf2(v int) int {
return v
}
func isIPv6Enabled() bool {
cmd := exec.Command("test", "-f", "/proc/net/if_inet6")
return cmd.Run() == nil
}
// isNginxRunning returns true if a process with the name 'nginx' is found
func isNginxProcessPresent() bool {
processes, _ := ps.Processes()
for _, p := range processes {
if p.Executable() == "nginx" {
return true
}
}
return false
}
func waitForNginxShutdown() {
timer := time.NewTicker(time.Second * 1)
defer timer.Stop()
for {
select {
case <-timer.C:
if !isNginxProcessPresent() {
return
}
}
}
}

View file

@ -0,0 +1,71 @@
package process
import (
"fmt"
"net"
"os"
"os/exec"
"syscall"
"time"
"github.com/golang/glog"
ps "github.com/mitchellh/go-ps"
"github.com/ncabatoff/process-exporter/proc"
)
func IsRespawnIfRequired(err error) bool {
exitError, ok := err.(*exec.ExitError)
if !ok {
return false
}
waitStatus := exitError.Sys().(syscall.WaitStatus)
glog.Warningf(`
-------------------------------------------------------------------------------
NGINX master process died (%v): %v
-------------------------------------------------------------------------------
`, waitStatus.ExitStatus(), err)
return true
}
func WaitUntilPortIsAvailable(port int) {
// we wait until the workers are killed
for {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%v", port), 1*time.Second)
if err != nil {
break
}
conn.Close()
// kill nginx worker processes
fs, err := proc.NewFS("/proc")
procs, _ := fs.FS.AllProcs()
for _, p := range procs {
pn, err := p.Comm()
if err != nil {
glog.Errorf("unexpected error obtaining process information: %v", err)
continue
}
if pn == "nginx" {
osp, err := os.FindProcess(p.PID)
if err != nil {
glog.Errorf("unexpected error obtaining process information: %v", err)
continue
}
osp.Signal(syscall.SIGQUIT)
}
}
time.Sleep(100 * time.Millisecond)
}
}
// IsNginxRunning returns true if a process with the name 'nginx' is found
func IsNginxRunning() bool {
processes, _ := ps.Processes()
for _, p := range processes {
if p.Executable() == "nginx" {
return true
}
}
return false
}

View file

@ -20,7 +20,7 @@ import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/pkg/nginx/metric/collector"
"k8s.io/ingress-nginx/pkg/ingress/controller/metric/collector"
)
const (

View file

@ -10,19 +10,19 @@ import (
"github.com/paultag/sniff/parser"
)
type server struct {
type TCPServer struct {
Hostname string
IP string
Port int
ProxyProtocol bool
}
type proxy struct {
ServerList []*server
Default *server
type TCPProxy struct {
ServerList []*TCPServer
Default *TCPServer
}
func (p *proxy) Get(host string) *server {
func (p *TCPProxy) Get(host string) *TCPServer {
if p.ServerList == nil {
return p.Default
}
@ -36,7 +36,7 @@ func (p *proxy) Get(host string) *server {
return p.Default
}
func (p *proxy) Handle(conn net.Conn) {
func (p *TCPProxy) Handle(conn net.Conn) {
defer conn.Close()
data := make([]byte, 4096)

View file

@ -26,8 +26,8 @@ import (
"github.com/mitchellh/mapstructure"
"k8s.io/ingress-nginx/pkg/ingress/controller/config"
ing_net "k8s.io/ingress-nginx/pkg/net"
"k8s.io/ingress-nginx/pkg/nginx/config"
)
const (
@ -41,11 +41,9 @@ const (
// ReadConfig obtains the configuration defined by the user merged with the defaults.
func ReadConfig(src map[string]string) config.Configuration {
conf := map[string]string{}
if src != nil {
// we need to copy the configmap data because the content is altered
for k, v := range src {
conf[k] = v
}
// we need to copy the configmap data because the content is altered
for k, v := range src {
conf[k] = v
}
errors := make([]int, 0)

View file

@ -21,7 +21,7 @@ import (
"github.com/kylelemons/godebug/pretty"
"k8s.io/ingress-nginx/pkg/nginx/config"
"k8s.io/ingress-nginx/pkg/ingress/controller/config"
)
func TestFilterErrors(t *testing.T) {

View file

@ -37,8 +37,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-nginx/pkg/ingress"
"k8s.io/ingress-nginx/pkg/ingress/annotations/ratelimit"
"k8s.io/ingress-nginx/pkg/ingress/controller/config"
ing_net "k8s.io/ingress-nginx/pkg/net"
"k8s.io/ingress-nginx/pkg/nginx/config"
"k8s.io/ingress-nginx/pkg/watch"
)

View file

@ -29,7 +29,7 @@ import (
"k8s.io/ingress-nginx/pkg/ingress"
"k8s.io/ingress-nginx/pkg/ingress/annotations/authreq"
"k8s.io/ingress-nginx/pkg/ingress/annotations/rewrite"
"k8s.io/ingress-nginx/pkg/nginx/config"
"k8s.io/ingress-nginx/pkg/ingress/controller/config"
)
var (
@ -158,7 +158,7 @@ func TestBuildAuthResponseHeaders(t *testing.T) {
func TestTemplateWithData(t *testing.T) {
pwd, _ := os.Getwd()
f, err := os.Open(path.Join(pwd, "../../../test/data/config.json"))
f, err := os.Open(path.Join(pwd, "../../../../test/data/config.json"))
if err != nil {
t.Errorf("unexpected error reading json file: %v", err)
}
@ -174,7 +174,7 @@ func TestTemplateWithData(t *testing.T) {
if dat.ListenPorts == nil {
dat.ListenPorts = &config.ListenPorts{}
}
tf, err := os.Open(path.Join(pwd, "../../../rootfs/etc/nginx/template/nginx.tmpl"))
tf, err := os.Open(path.Join(pwd, "../../../../rootfs/etc/nginx/template/nginx.tmpl"))
if err != nil {
t.Errorf("unexpected error reading json file: %v", err)
}
@ -193,7 +193,7 @@ func TestTemplateWithData(t *testing.T) {
func BenchmarkTemplateWithData(b *testing.B) {
pwd, _ := os.Getwd()
f, err := os.Open(path.Join(pwd, "../../../test/data/config.json"))
f, err := os.Open(path.Join(pwd, "../../../../test/data/config.json"))
if err != nil {
b.Errorf("unexpected error reading json file: %v", err)
}
@ -207,7 +207,7 @@ func BenchmarkTemplateWithData(b *testing.B) {
b.Errorf("unexpected error unmarshalling json: %v", err)
}
tf, err := os.Open(path.Join(pwd, "../../rootfs/etc/nginx/template/nginx.tmpl"))
tf, err := os.Open(path.Join(pwd, "../../../rootfs/etc/nginx/template/nginx.tmpl"))
if err != nil {
b.Errorf("unexpected error reading json file: %v", err)
}

View file

@ -17,11 +17,6 @@ limitations under the License.
package controller
import (
"fmt"
"io/ioutil"
"net"
"os"
"os/exec"
"syscall"
"github.com/golang/glog"
@ -54,34 +49,3 @@ func sysctlFSFileMax() int {
}
return int(rLimit.Max)
}
func diff(b1, b2 []byte) ([]byte, error) {
f1, err := ioutil.TempFile("", "a")
if err != nil {
return nil, err
}
defer f1.Close()
defer os.Remove(f1.Name())
f2, err := ioutil.TempFile("", "b")
if err != nil {
return nil, err
}
defer f2.Close()
defer os.Remove(f2.Name())
f1.Write(b1)
f2.Write(b2)
out, _ := exec.Command("diff", "-u", f1.Name(), f2.Name()).CombinedOutput()
return out, nil
}
func isPortAvailable(p int) bool {
ln, err := net.Listen("tcp", fmt.Sprintf(":%v", p))
if err != nil {
return false
}
ln.Close()
return true
}

View file

@ -1,71 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ingress
// This package contains the interface is required to implement to build an Ingress controller
// A dummy implementation could be
//
// func main() {
// dc := newDummyController()
// controller.NewIngressController(dc)
// glog.Infof("shutting down Ingress controller...")
// }
//
//where newDummyController returns and implementation of the Controller interface:
//
// func newDummyController() ingress.Controller {
// return &DummyController{}
// }
//
// type DummyController struct {
// }
//
// func (dc DummyController) Reload(data []byte) ([]byte, error) {
// err := ioutil.WriteFile("/arbitrary-path", data, 0644)
// if err != nil {
// return nil, err
// }
//
// return exec.Command("some command", "--reload").CombinedOutput()
// }
//
// func (dc DummyController) Test(file string) *exec.Cmd {
// return exec.Command("some command", "--config-file", file)
// }
//
// func (dc DummyController) OnUpdate(*api.ConfigMap, Configuration) ([]byte, error) {
// return []byte(`<string containing a configuration file>`)
// }
//
// func (dc DummyController) BackendDefaults() defaults.Backend {
// return ingress.NewStandardDefaults()
// }
//
// func (n DummyController) Name() string {
// return "dummy Controller"
// }
//
// func (n DummyController) Check(_ *http.Request) error {
// return nil
// }
//
// func (dc DummyController) Info() *BackendInfo {
// Name: "dummy",
// Release: "0.0.0",
// Build: "git-00000000",
// Repository: "git://foo.bar.com",
// }

View file

@ -72,9 +72,6 @@ type Config struct {
DefaultIngressClass string
IngressClass string
// CustomIngressStatus allows to set custom values in Ingress status
CustomIngressStatus func(*extensions.Ingress) []apiv1.LoadBalancerIngress
}
// statusSync keeps the status IP in each Ingress rule updated executing a periodic check
@ -252,9 +249,8 @@ func (s *statusSync) runningAddresses() ([]string, error) {
addrs = append(addrs, ip.IP)
}
}
for _, ip := range svc.Spec.ExternalIPs {
addrs = append(addrs, ip)
}
addrs = append(addrs, svc.Spec.ExternalIPs...)
return addrs, nil
}
@ -307,8 +303,6 @@ func sliceToStatus(endpoints []string) []apiv1.LoadBalancerIngress {
}
// updateStatus changes the status information of Ingress rules
// If the backend function CustomIngressStatus returns a value different
// of nil then it uses the returned value or the newIngressPoint values
func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
ings := s.IngressLister.List()
@ -324,7 +318,7 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
continue
}
batch.Queue(runUpdate(ing, newIngressPoint, s.Client, s.CustomIngressStatus))
batch.Queue(runUpdate(ing, newIngressPoint, s.Client))
}
batch.QueueComplete()
@ -332,18 +326,13 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
}
func runUpdate(ing *extensions.Ingress, status []apiv1.LoadBalancerIngress,
client clientset.Interface,
statusFunc func(*extensions.Ingress) []apiv1.LoadBalancerIngress) pool.WorkFunc {
client clientset.Interface) pool.WorkFunc {
return func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
addrs := status
ca := statusFunc(ing)
if ca != nil {
addrs = ca
}
sort.SliceStable(addrs, lessLoadBalancerIngress(addrs))
curIPs := ing.Status.LoadBalancer.Ingress

View file

@ -249,9 +249,6 @@ func buildStatusSync() statusSync {
Client: buildSimpleClientSet(),
PublishService: apiv1.NamespaceDefault + "/" + "foo",
IngressLister: buildIngressListener(),
CustomIngressStatus: func(*extensions.Ingress) []apiv1.LoadBalancerIngress {
return nil
},
},
}
}
@ -267,9 +264,6 @@ func TestStatusActions(t *testing.T) {
DefaultIngressClass: "nginx",
IngressClass: "",
UpdateStatusOnShutdown: true,
CustomIngressStatus: func(*extensions.Ingress) []apiv1.LoadBalancerIngress {
return nil
},
}
// create object
fkSync := NewStatusSyncer(c)

View file

@ -17,15 +17,11 @@ limitations under the License.
package ingress
import (
"fmt"
"time"
"github.com/spf13/pflag"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/ingress-nginx/pkg/ingress/annotations/auth"
"k8s.io/ingress-nginx/pkg/ingress/annotations/authreq"
@ -36,7 +32,6 @@ import (
"k8s.io/ingress-nginx/pkg/ingress/annotations/ratelimit"
"k8s.io/ingress-nginx/pkg/ingress/annotations/redirect"
"k8s.io/ingress-nginx/pkg/ingress/annotations/rewrite"
"k8s.io/ingress-nginx/pkg/ingress/defaults"
"k8s.io/ingress-nginx/pkg/ingress/resolver"
"k8s.io/ingress-nginx/pkg/ingress/store"
)
@ -49,63 +44,6 @@ var (
DefaultSSLDirectory = "/ingress-controller/ssl"
)
// Controller holds the methods to handle an Ingress backend
// TODO (#18): Make sure this is sufficiently supportive of other backends.
type Controller interface {
// HealthzChecker returns is a named healthz check that returns the ingress
// controller status
healthz.HealthzChecker
// OnUpdate callback invoked from the sync queue https://k8s.io/ingress/core/blob/master/pkg/ingress/controller/controller.go#L387
// when an update occurs. This is executed frequently because Ingress
// controllers watches changes in:
// - Ingresses: main work
// - Secrets: referenced from Ingress rules with TLS configured
// - ConfigMaps: where the controller reads custom configuration
// - Services: referenced from Ingress rules and required to obtain
// information about ports and annotations
// - Endpoints: referenced from Services and what the backend uses
// to route traffic
// Any update to services, endpoints, secrets (only those referenced from Ingress)
// and ingress trigger the execution.
// Notifications of type Add, Update and Delete:
// https://github.com/kubernetes/kubernetes/blob/master/pkg/client/cache/controller.go#L164
//
// Configuration returns the translation from Ingress rules containing
// information about all the upstreams (service endpoints ) "virtual"
// servers (FQDN) and all the locations inside each server. Each
// location contains information about all the annotations were configured
// https://k8s.io/ingress/core/blob/master/pkg/ingress/types.go#L83
// The backend returns an error if was not possible to update the configuration.
//
OnUpdate(Configuration) error
// ConfigMap content of --configmap
SetConfig(*apiv1.ConfigMap)
// SetListers allows the access of store listers present in the generic controller
// This avoid the use of the kubernetes client.
SetListers(*StoreLister)
// BackendDefaults returns the minimum settings required to configure the
// communication to endpoints
BackendDefaults() defaults.Backend
// Info returns information about the ingress controller
Info() *BackendInfo
// ConfigureFlags allow to configure more flags before the parsing of
// command line arguments
ConfigureFlags(*pflag.FlagSet)
// OverrideFlags allow the customization of the flags in the backend
OverrideFlags(*pflag.FlagSet)
// DefaultIngressClass just return the default ingress class
DefaultIngressClass() string
// UpdateIngressStatus custom callback used to update the status in an Ingress rule
// This allows custom implementations
// If the function returns nil the standard functions will be executed.
UpdateIngressStatus(*extensions.Ingress) []apiv1.LoadBalancerIngress
// DefaultEndpoint returns the Endpoint to use as default when the
// referenced service does not exists. This should return the content
// of to the default backend
DefaultEndpoint() Endpoint
}
// StoreLister returns the configured stores for ingresses, services,
// endpoints, secrets and configmaps.
type StoreLister struct {
@ -117,29 +55,6 @@ type StoreLister struct {
ConfigMap store.ConfigMapLister
}
// BackendInfo returns information about the backend.
// This fields contains information that helps to track issues or to
// map the running ingress controller to source code
type BackendInfo struct {
// Name returns the name of the backend implementation
Name string `json:"name"`
// Release returns the running version (semver)
Release string `json:"release"`
// Build returns information about the git commit
Build string `json:"build"`
// Repository return information about the git repository
Repository string `json:"repository"`
}
func (bi BackendInfo) String() string {
return fmt.Sprintf(`
Name: %v
Release: %v
Build: %v
Repository: %v
`, bi.Name, bi.Release, bi.Build, bi.Repository)
}
// Configuration holds the definition of all the parts required to describe all
// ingresses reachable by the ingress controller (using a filter by namespace)
type Configuration struct {
@ -261,9 +176,6 @@ type Server struct {
// Location describes an URI inside a server.
// Also contains additional information about annotations in the Ingress.
//
// Important:
// The implementation of annotations is optional
//
// In some cases when more than one annotations is defined a particular order in the execution
// is required.
// The chain in the execution order of annotations should be:

View file

@ -16,30 +16,6 @@ limitations under the License.
package ingress
// Equal tests for equality between two BackendInfo types
func (bi1 *BackendInfo) Equal(bi2 *BackendInfo) bool {
if bi1 == bi2 {
return true
}
if bi1 == nil || bi2 == nil {
return false
}
if bi1.Name != bi2.Name {
return false
}
if bi1.Release != bi2.Release {
return false
}
if bi1.Build != bi2.Build {
return false
}
if bi1.Repository != bi2.Repository {
return false
}
return true
}
// Equal tests for equality between two Configuration types
func (c1 *Configuration) Equal(c2 *Configuration) bool {
if c1 == c2 {

View file

@ -1,58 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package k8s
import (
api "k8s.io/api/core/v1"
core "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
)
func EnsureSecret(cl kubernetes.Interface, secret *api.Secret) (*api.Secret, error) {
s, err := cl.CoreV1().Secrets(secret.Namespace).Create(secret)
if err != nil {
if k8sErrors.IsAlreadyExists(err) {
return cl.CoreV1().Secrets(secret.Namespace).Update(secret)
}
return nil, err
}
return s, nil
}
func EnsureIngress(cl kubernetes.Interface, ingress *extensions.Ingress) (*extensions.Ingress, error) {
s, err := cl.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress)
if err != nil {
if k8sErrors.IsNotFound(err) {
return cl.ExtensionsV1beta1().Ingresses(ingress.Namespace).Create(ingress)
}
return nil, err
}
return s, nil
}
func EnsureService(cl kubernetes.Interface, service *core.Service) (*core.Service, error) {
s, err := cl.CoreV1().Services(service.Namespace).Update(service)
if err != nil {
if k8sErrors.IsNotFound(err) {
return cl.CoreV1().Services(service.Namespace).Create(service)
}
return nil, err
}
return s, nil
}

View file

@ -16,9 +16,29 @@ limitations under the License.
package net
import _net "net"
import (
"fmt"
_net "net"
"os/exec"
)
// IsIPV6 checks if the input contains a valid IPV6 address
func IsIPV6(ip _net.IP) bool {
return ip.To4() == nil
}
// IsPortAvailable checks if a TCP port is available or not
func IsPortAvailable(p int) bool {
ln, err := _net.Listen("tcp", fmt.Sprintf(":%v", p))
if err != nil {
return false
}
ln.Close()
return true
}
// IsIPv6Enabled checks if IPV6 is enabled or not
func IsIPv6Enabled() bool {
cmd := exec.Command("test", "-f", "/proc/net/if_inet6")
return cmd.Run() == nil
}

View file

@ -41,3 +41,20 @@ func TestIsIPV6(t *testing.T) {
}
}
}
func TestIsPortAvailable(t *testing.T) {
if !IsPortAvailable(0) {
t.Fatal("expected port 0 to be avilable (random port) but returned false")
}
ln, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer ln.Close()
p := ln.Addr().(*net.TCPAddr).Port
if IsPortAvailable(p) {
t.Fatalf("expected port %v to not be available", p)
}
}

View file

@ -16,6 +16,8 @@ limitations under the License.
package version
import "fmt"
var (
// RELEASE returns the release version
RELEASE = "UNKNOWN"
@ -24,3 +26,14 @@ var (
// COMMIT returns the short sha from git
COMMIT = "UNKNOWN"
)
// String returns information about the release.
func String() string {
return fmt.Sprintf(`-------------------------------------------------------------------------------
NGINX Ingress controller
Release: %v
Build: %v
Repository: %v
-------------------------------------------------------------------------------
`, RELEASE, COMMIT, REPO)
}