diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index ad5979338..979b06bd4 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -188,7 +188,11 @@ func (n *NGINXController) syncIngress(interface{}) error { klog.Infof("Backend successfully reloaded.") n.metricCollector.ConfigSuccess(hash, true) n.metricCollector.IncReloadCount() - n.metricCollector.SetSSLExpireTime(servers) + + if n.isLeader() { + klog.V(2).Infof("Updating ssl expiration metrics.") + n.metricCollector.SetSSLExpireTime(servers) + } } isFirstSync := n.runningConfig.Equal(&ingress.Configuration{}) diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 234f77640..0a29f3d08 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -31,6 +31,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "text/template" "time" @@ -115,6 +116,7 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File if err != nil { klog.Fatalf("unexpected error obtaining pod information: %v", err) } + n.podInfo = pod n.store = store.New( config.EnableSSLChainCompletion, @@ -132,15 +134,13 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File config.DisableCatchAll) n.syncQueue = task.NewTaskQueue(n.syncIngress) + if config.UpdateStatus { - n.syncStatus = status.NewStatusSyncer(status.Config{ + n.syncStatus = status.NewStatusSyncer(pod, status.Config{ Client: config.Client, PublishService: config.PublishService, PublishStatusAddress: config.PublishStatusAddress, IngressLister: n.store, - ElectionID: config.ElectionID, - IngressClass: class.IngressClass, - DefaultIngressClass: class.DefaultClass, UpdateStatusOnShutdown: config.UpdateStatusOnShutdown, UseNodeInternalIP: config.UseNodeInternalIP, }) @@ -215,13 +215,15 @@ Error loading new template: %v // NGINXController describes a NGINX Ingress controller. type NGINXController struct { + podInfo *k8s.PodInfo + cfg *Configuration recorder record.EventRecorder syncQueue *task.Queue - syncStatus status.Sync + syncStatus status.Syncer syncRateLimiter flowcontrol.RateLimiter @@ -254,6 +256,8 @@ type NGINXController struct { fileSystem filesystem.Filesystem metricCollector metric.Collector + + currentLeader uint32 } // Start starts a new NGINX master process running in the foreground. @@ -262,10 +266,35 @@ func (n *NGINXController) Start() { n.store.Run(n.stopCh) - if n.syncStatus != nil { - go n.syncStatus.Run() + // we need to use the defined ingress class to allow multiple leaders + // in order to update information about ingress status + electionID := fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.DefaultClass) + if class.IngressClass != "" { + electionID = fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.IngressClass) } + setupLeaderElection(&leaderElectionConfig{ + Client: n.cfg.Client, + ElectionID: electionID, + OnStartedLeading: func(stopCh chan struct{}) { + if n.syncStatus != nil { + go n.syncStatus.Run(stopCh) + } + + n.setLeader(true) + n.metricCollector.OnStartedLeading(electionID) + // manually update SSL expiration metrics + // (to not wait for a reload) + n.metricCollector.SetSSLExpireTime(n.runningConfig.Servers) + }, + OnStoppedLeading: func() { + n.setLeader(false) + n.metricCollector.OnStoppedLeading(electionID) + }, + PodName: n.podInfo.Name, + PodNamespace: n.podInfo.Namespace, + }) + cmd := nginxExecCommand() // put NGINX in another process group to prevent it @@ -1099,3 +1128,15 @@ func buildRedirects(servers []*ingress.Server) []*redirect { return redirectServers } + +func (n *NGINXController) setLeader(leader bool) { + var i uint32 + if leader { + i = 1 + } + atomic.StoreUint32(&n.currentLeader, i) +} + +func (n *NGINXController) isLeader() bool { + return atomic.LoadUint32(&n.currentLeader) != 0 +} diff --git a/internal/ingress/controller/status.go b/internal/ingress/controller/status.go new file mode 100644 index 000000000..f6f562b91 --- /dev/null +++ b/internal/ingress/controller/status.go @@ -0,0 +1,123 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "os" + "time" + + "k8s.io/klog" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" +) + +type leaderElectionConfig struct { + PodName string + PodNamespace string + + Client clientset.Interface + + ElectionID string + + OnStartedLeading func(chan struct{}) + OnStoppedLeading func() +} + +func setupLeaderElection(config *leaderElectionConfig) { + var elector *leaderelection.LeaderElector + + // start a new context + ctx := context.Background() + + var cancelContext context.CancelFunc + + var newLeaderCtx = func(ctx context.Context) context.CancelFunc { + // allow to cancel the context in case we stop being the leader + leaderCtx, cancel := context.WithCancel(ctx) + go elector.Run(leaderCtx) + return cancel + } + + var stopCh chan struct{} + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.V(2).Infof("I am the new leader") + stopCh = make(chan struct{}) + + if config.OnStartedLeading != nil { + config.OnStartedLeading(stopCh) + } + }, + OnStoppedLeading: func() { + klog.V(2).Info("I am not leader anymore") + close(stopCh) + + // cancel the context + cancelContext() + + cancelContext = newLeaderCtx(ctx) + + if config.OnStoppedLeading != nil { + config.OnStoppedLeading() + } + }, + OnNewLeader: func(identity string) { + klog.Infof("new leader elected: %v", identity) + }, + } + + broadcaster := record.NewBroadcaster() + hostname, _ := os.Hostname() + + recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ + Component: "ingress-leader-elector", + Host: hostname, + }) + + lock := resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{Namespace: config.PodNamespace, Name: config.ElectionID}, + Client: config.Client.CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: config.PodName, + EventRecorder: recorder, + }, + } + + ttl := 30 * time.Second + var err error + + elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: &lock, + LeaseDuration: ttl, + RenewDeadline: ttl / 2, + RetryPeriod: ttl / 4, + + Callbacks: callbacks, + }) + if err != nil { + klog.Fatalf("unexpected error starting leader election: %v", err) + } + + cancelContext = newLeaderCtx(ctx) +} diff --git a/internal/ingress/metric/collectors/controller.go b/internal/ingress/metric/collectors/controller.go index c24caaa45..3df99bafd 100644 --- a/internal/ingress/metric/collectors/controller.go +++ b/internal/ingress/metric/collectors/controller.go @@ -46,6 +46,8 @@ type Controller struct { constLabels prometheus.Labels labels prometheus.Labels + + leaderElection *prometheus.GaugeVec } // NewController creates a new prometheus collector for the @@ -112,6 +114,15 @@ func NewController(pod, namespace, class string) *Controller { }, sslLabelHost, ), + leaderElection: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: PrometheusNamespace, + Name: "leader_election_status", + Help: "Gauge reporting status of the leader election, 0 indicates follower, 1 indicates leader. 'name' is the string used to identify the lease", + ConstLabels: constLabels, + }, + []string{"name"}, + ), } return cm @@ -127,6 +138,16 @@ func (cm *Controller) IncReloadErrorCount() { cm.reloadOperationErrors.With(cm.constLabels).Inc() } +// OnStartedLeading indicates the pod was elected as the leader +func (cm *Controller) OnStartedLeading(electionID string) { + cm.leaderElection.WithLabelValues(electionID).Set(1.0) +} + +// OnStoppedLeading indicates the pod stopped being the leader +func (cm *Controller) OnStoppedLeading(electionID string) { + cm.leaderElection.WithLabelValues(electionID).Set(0) +} + // ConfigSuccess set a boolean flag according to the output of the controller configuration reload func (cm *Controller) ConfigSuccess(hash uint64, success bool) { if success { @@ -150,6 +171,7 @@ func (cm Controller) Describe(ch chan<- *prometheus.Desc) { cm.reloadOperation.Describe(ch) cm.reloadOperationErrors.Describe(ch) cm.sslExpireTime.Describe(ch) + cm.leaderElection.Describe(ch) } // Collect implements the prometheus.Collector interface. @@ -160,6 +182,7 @@ func (cm Controller) Collect(ch chan<- prometheus.Metric) { cm.reloadOperation.Collect(ch) cm.reloadOperationErrors.Collect(ch) cm.sslExpireTime.Collect(ch) + cm.leaderElection.Collect(ch) } // SetSSLExpireTime sets the expiration time of SSL Certificates @@ -179,13 +202,21 @@ func (cm *Controller) SetSSLExpireTime(servers []*ingress.Server) { // RemoveMetrics removes metrics for hostnames not available anymore func (cm *Controller) RemoveMetrics(hosts []string, registry prometheus.Gatherer) { + cm.removeSSLExpireMetrics(true, hosts, registry) +} + +// RemoveAllSSLExpireMetrics removes metrics for expiration of SSL Certificates +func (cm *Controller) RemoveAllSSLExpireMetrics(registry prometheus.Gatherer) { + cm.removeSSLExpireMetrics(false, []string{}, registry) +} + +func (cm *Controller) removeSSLExpireMetrics(onlyDefinedHosts bool, hosts []string, registry prometheus.Gatherer) { mfs, err := registry.Gather() if err != nil { klog.Errorf("Error gathering metrics: %v", err) return } - klog.V(2).Infof("removing SSL certificate metrics for %v hosts", hosts) toRemove := sets.NewString(hosts...) for _, mf := range mfs { @@ -208,7 +239,7 @@ func (cm *Controller) RemoveMetrics(hosts []string, registry prometheus.Gatherer continue } - if !toRemove.Has(host) { + if onlyDefinedHosts && !toRemove.Has(host) { continue } diff --git a/internal/ingress/metric/dummy.go b/internal/ingress/metric/dummy.go index 4c6872195..46ce3f7a8 100644 --- a/internal/ingress/metric/dummy.go +++ b/internal/ingress/metric/dummy.go @@ -52,3 +52,9 @@ func (dc DummyCollector) SetSSLExpireTime([]*ingress.Server) {} // SetHosts ... func (dc DummyCollector) SetHosts(hosts sets.String) {} + +// OnStartedLeading indicates the pod is not the current leader +func (dc DummyCollector) OnStartedLeading(electionID string) {} + +// OnStoppedLeading indicates the pod is not the current leader +func (dc DummyCollector) OnStoppedLeading(electionID string) {} diff --git a/internal/ingress/metric/main.go b/internal/ingress/metric/main.go index 4950a2497..8039c2d74 100644 --- a/internal/ingress/metric/main.go +++ b/internal/ingress/metric/main.go @@ -36,6 +36,9 @@ type Collector interface { IncReloadCount() IncReloadErrorCount() + OnStartedLeading(string) + OnStoppedLeading(string) + RemoveMetrics(ingresses, endpoints []string) SetSSLExpireTime([]*ingress.Server) @@ -147,3 +150,14 @@ func (c *collector) SetSSLExpireTime(servers []*ingress.Server) { func (c *collector) SetHosts(hosts sets.String) { c.socket.SetHosts(hosts) } + +// OnStartedLeading indicates the pod was elected as the leader +func (c *collector) OnStartedLeading(electionID string) { + c.ingressController.OnStartedLeading(electionID) +} + +// OnStoppedLeading indicates the pod stopped being the leader +func (c *collector) OnStoppedLeading(electionID string) { + c.ingressController.OnStoppedLeading(electionID) + c.ingressController.RemoveAllSSLExpireMetrics(c.registry) +} diff --git a/internal/ingress/status/status.go b/internal/ingress/status/status.go index 635bb5e6a..399e3e39a 100644 --- a/internal/ingress/status/status.go +++ b/internal/ingress/status/status.go @@ -17,10 +17,8 @@ limitations under the License. package status import ( - "context" "fmt" "net" - "os" "sort" "strings" "time" @@ -34,10 +32,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/ingress-nginx/internal/ingress" @@ -49,9 +43,10 @@ const ( updateInterval = 60 * time.Second ) -// Sync ... -type Sync interface { - Run() +// Syncer ... +type Syncer interface { + Run(chan struct{}) + Shutdown() } @@ -68,16 +63,11 @@ type Config struct { PublishStatusAddress string - ElectionID string - UpdateStatusOnShutdown bool UseNodeInternalIP bool IngressLister ingressLister - - DefaultIngressClass string - IngressClass string } // statusSync keeps the status IP in each Ingress rule updated executing a periodic check @@ -89,109 +79,35 @@ type Config struct { // two flags are set, the source is the IP/s of the node/s type statusSync struct { Config + // pod contains runtime information about this pod pod *k8s.PodInfo - elector *leaderelection.LeaderElector - // workqueue used to keep in sync the status IP/s // in the Ingress rules syncQueue *task.Queue } -// Run starts the loop to keep the status in sync -func (s statusSync) Run() { - // we need to use the defined ingress class to allow multiple leaders - // in order to update information about ingress status - electionID := fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.DefaultIngressClass) - if s.Config.IngressClass != "" { - electionID = fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.IngressClass) - } +// Start starts the loop to keep the status in sync +func (s statusSync) Run(stopCh chan struct{}) { + go s.syncQueue.Run(time.Second, stopCh) - // start a new context - ctx := context.Background() + // trigger initial sync + s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) - var cancelContext context.CancelFunc - - var newLeaderCtx = func(ctx context.Context) context.CancelFunc { - // allow to cancel the context in case we stop being the leader - leaderCtx, cancel := context.WithCancel(ctx) - go s.elector.Run(leaderCtx) - return cancel - } - - var stopCh chan struct{} - callbacks := leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - klog.V(2).Infof("I am the new status update leader") - stopCh = make(chan struct{}) - go s.syncQueue.Run(time.Second, stopCh) - // trigger initial sync - s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) - // when this instance is the leader we need to enqueue - // an item to trigger the update of the Ingress status. - wait.PollUntil(updateInterval, func() (bool, error) { - s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) - return false, nil - }, stopCh) - }, - OnStoppedLeading: func() { - klog.V(2).Info("I am not status update leader anymore") - close(stopCh) - - // cancel the context - cancelContext() - - cancelContext = newLeaderCtx(ctx) - }, - OnNewLeader: func(identity string) { - klog.Infof("new leader elected: %v", identity) - }, - } - - broadcaster := record.NewBroadcaster() - hostname, _ := os.Hostname() - - recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ - Component: "ingress-leader-elector", - Host: hostname, - }) - - lock := resourcelock.ConfigMapLock{ - ConfigMapMeta: metav1.ObjectMeta{Namespace: s.pod.Namespace, Name: electionID}, - Client: s.Config.Client.CoreV1(), - LockConfig: resourcelock.ResourceLockConfig{ - Identity: s.pod.Name, - EventRecorder: recorder, - }, - } - - ttl := 30 * time.Second - var err error - s.elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: &lock, - LeaseDuration: ttl, - RenewDeadline: ttl / 2, - RetryPeriod: ttl / 4, - Callbacks: callbacks, - }) - if err != nil { - klog.Fatalf("unexpected error starting leader election: %v", err) - } - - cancelContext = newLeaderCtx(ctx) + // when this instance is the leader we need to enqueue + // an item to trigger the update of the Ingress status. + wait.PollUntil(updateInterval, func() (bool, error) { + s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) + return false, nil + }, stopCh) } -// Shutdown stop the sync. In case the instance is the leader it will remove the current IP +// Shutdown stops the sync. In case the instance is the leader it will remove the current IP // if there is no other instances running. func (s statusSync) Shutdown() { go s.syncQueue.Shutdown() - // remove IP from Ingress - if s.elector != nil && !s.elector.IsLeader() { - return - } - if !s.UpdateStatusOnShutdown { klog.Warningf("skipping update of status of Ingress rules") return @@ -226,10 +142,6 @@ func (s *statusSync) sync(key interface{}) error { return nil } - if s.elector != nil && !s.elector.IsLeader() { - return fmt.Errorf("i am not the current leader. Skiping status update") - } - addrs, err := s.runningAddresses() if err != nil { return err @@ -243,15 +155,10 @@ func (s statusSync) keyfunc(input interface{}) (interface{}, error) { return input, nil } -// NewStatusSyncer returns a new Sync instance -func NewStatusSyncer(config Config) Sync { - pod, err := k8s.GetPodDetails(config.Client) - if err != nil { - klog.Fatalf("unexpected error obtaining pod information: %v", err) - } - +// NewStatusSyncer returns a new Syncer instance +func NewStatusSyncer(podInfo *k8s.PodInfo, config Config) Syncer { st := statusSync{ - pod: pod, + pod: podInfo, Config: config, } diff --git a/internal/ingress/status/status_test.go b/internal/ingress/status/status_test.go index 463f830b9..c070dc21f 100644 --- a/internal/ingress/status/status_test.go +++ b/internal/ingress/status/status_test.go @@ -287,12 +287,16 @@ func TestStatusActions(t *testing.T) { Client: buildSimpleClientSet(), PublishService: "", IngressLister: buildIngressLister(), - DefaultIngressClass: "nginx", - IngressClass: "", UpdateStatusOnShutdown: true, } // create object - fkSync := NewStatusSyncer(c) + fkSync := NewStatusSyncer(&k8s.PodInfo{ + Name: "foo_base_pod", + Namespace: apiv1.NamespaceDefault, + Labels: map[string]string{ + "lable_sig": "foo_pod", + }, + }, c) if fkSync == nil { t.Fatalf("expected a valid Sync") } @@ -300,7 +304,10 @@ func TestStatusActions(t *testing.T) { fk := fkSync.(statusSync) // start it and wait for the election and syn actions - go fk.Run() + stopCh := make(chan struct{}) + defer close(stopCh) + + go fk.Run(stopCh) // wait for the election time.Sleep(100 * time.Millisecond) // execute sync