diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 234f77640..00f83c495 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -49,7 +49,6 @@ import ( "k8s.io/ingress-nginx/internal/file" "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/annotations/class" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" "k8s.io/ingress-nginx/internal/ingress/controller/process" "k8s.io/ingress-nginx/internal/ingress/controller/store" @@ -115,6 +114,7 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File if err != nil { klog.Fatalf("unexpected error obtaining pod information: %v", err) } + n.podInfo = pod n.store = store.New( config.EnableSSLChainCompletion, @@ -132,15 +132,13 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File config.DisableCatchAll) n.syncQueue = task.NewTaskQueue(n.syncIngress) + if config.UpdateStatus { - n.syncStatus = status.NewStatusSyncer(status.Config{ + n.syncStatus = status.NewStatusSyncer(pod, status.Config{ Client: config.Client, PublishService: config.PublishService, PublishStatusAddress: config.PublishStatusAddress, IngressLister: n.store, - ElectionID: config.ElectionID, - IngressClass: class.IngressClass, - DefaultIngressClass: class.DefaultClass, UpdateStatusOnShutdown: config.UpdateStatusOnShutdown, UseNodeInternalIP: config.UseNodeInternalIP, }) @@ -215,13 +213,15 @@ Error loading new template: %v // NGINXController describes a NGINX Ingress controller. type NGINXController struct { + podInfo *k8s.PodInfo + cfg *Configuration recorder record.EventRecorder syncQueue *task.Queue - syncStatus status.Sync + syncStatus status.Syncer syncRateLimiter flowcontrol.RateLimiter @@ -262,9 +262,27 @@ func (n *NGINXController) Start() { n.store.Run(n.stopCh) - if n.syncStatus != nil { - go n.syncStatus.Run() - } + setupLeaderElection(&leaderElectionConfig{ + Client: n.cfg.Client, + ElectionID: n.cfg.ElectionID, + OnStartedLeading: func(stopCh chan struct{}) { + if n.syncStatus != nil { + go n.syncStatus.Run(stopCh) + } + }, + OnStoppedLeading: func() { + // Remove prometheus metrics related to SSL certificates + srvs := sets.NewString() + for _, s := range n.runningConfig.Servers { + if !srvs.Has(s.Hostname) { + srvs.Insert(s.Hostname) + } + } + n.metricCollector.RemoveMetrics(nil, srvs.List()) + }, + PodName: n.podInfo.Name, + PodNamespace: n.podInfo.Namespace, + }) cmd := nginxExecCommand() diff --git a/internal/ingress/controller/status.go b/internal/ingress/controller/status.go new file mode 100644 index 000000000..187de3e4e --- /dev/null +++ b/internal/ingress/controller/status.go @@ -0,0 +1,133 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "os" + "time" + + "k8s.io/klog" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + + "k8s.io/ingress-nginx/internal/ingress/annotations/class" +) + +type leaderElectionConfig struct { + PodName string + PodNamespace string + + Client clientset.Interface + + ElectionID string + + OnStartedLeading func(chan struct{}) + OnStoppedLeading func() +} + +func setupLeaderElection(config *leaderElectionConfig) { + // we need to use the defined ingress class to allow multiple leaders + // in order to update information about ingress status + electionID := fmt.Sprintf("%v-%v", config.ElectionID, class.DefaultClass) + if class.IngressClass != "" { + electionID = fmt.Sprintf("%v-%v", config.ElectionID, class.IngressClass) + } + + var elector *leaderelection.LeaderElector + + // start a new context + ctx := context.Background() + + var cancelContext context.CancelFunc + + var newLeaderCtx = func(ctx context.Context) context.CancelFunc { + // allow to cancel the context in case we stop being the leader + leaderCtx, cancel := context.WithCancel(ctx) + go elector.Run(leaderCtx) + return cancel + } + + var stopCh chan struct{} + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.V(2).Infof("I am the new leader") + stopCh = make(chan struct{}) + + if config.OnStartedLeading != nil { + config.OnStartedLeading(stopCh) + } + }, + OnStoppedLeading: func() { + klog.V(2).Info("I am not leader anymore") + close(stopCh) + + // cancel the context + cancelContext() + + cancelContext = newLeaderCtx(ctx) + + if config.OnStoppedLeading != nil { + config.OnStoppedLeading() + } + }, + OnNewLeader: func(identity string) { + klog.Infof("new leader elected: %v", identity) + }, + } + + broadcaster := record.NewBroadcaster() + hostname, _ := os.Hostname() + + recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ + Component: "ingress-leader-elector", + Host: hostname, + }) + + lock := resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{Namespace: config.PodNamespace, Name: electionID}, + Client: config.Client.CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: config.PodName, + EventRecorder: recorder, + }, + } + + ttl := 30 * time.Second + var err error + + elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: &lock, + LeaseDuration: ttl, + RenewDeadline: ttl / 2, + RetryPeriod: ttl / 4, + + Callbacks: callbacks, + }) + if err != nil { + klog.Fatalf("unexpected error starting leader election: %v", err) + } + + cancelContext = newLeaderCtx(ctx) +} diff --git a/internal/ingress/status/status.go b/internal/ingress/status/status.go index 635bb5e6a..399e3e39a 100644 --- a/internal/ingress/status/status.go +++ b/internal/ingress/status/status.go @@ -17,10 +17,8 @@ limitations under the License. package status import ( - "context" "fmt" "net" - "os" "sort" "strings" "time" @@ -34,10 +32,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/ingress-nginx/internal/ingress" @@ -49,9 +43,10 @@ const ( updateInterval = 60 * time.Second ) -// Sync ... -type Sync interface { - Run() +// Syncer ... +type Syncer interface { + Run(chan struct{}) + Shutdown() } @@ -68,16 +63,11 @@ type Config struct { PublishStatusAddress string - ElectionID string - UpdateStatusOnShutdown bool UseNodeInternalIP bool IngressLister ingressLister - - DefaultIngressClass string - IngressClass string } // statusSync keeps the status IP in each Ingress rule updated executing a periodic check @@ -89,109 +79,35 @@ type Config struct { // two flags are set, the source is the IP/s of the node/s type statusSync struct { Config + // pod contains runtime information about this pod pod *k8s.PodInfo - elector *leaderelection.LeaderElector - // workqueue used to keep in sync the status IP/s // in the Ingress rules syncQueue *task.Queue } -// Run starts the loop to keep the status in sync -func (s statusSync) Run() { - // we need to use the defined ingress class to allow multiple leaders - // in order to update information about ingress status - electionID := fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.DefaultIngressClass) - if s.Config.IngressClass != "" { - electionID = fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.IngressClass) - } +// Start starts the loop to keep the status in sync +func (s statusSync) Run(stopCh chan struct{}) { + go s.syncQueue.Run(time.Second, stopCh) - // start a new context - ctx := context.Background() + // trigger initial sync + s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) - var cancelContext context.CancelFunc - - var newLeaderCtx = func(ctx context.Context) context.CancelFunc { - // allow to cancel the context in case we stop being the leader - leaderCtx, cancel := context.WithCancel(ctx) - go s.elector.Run(leaderCtx) - return cancel - } - - var stopCh chan struct{} - callbacks := leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - klog.V(2).Infof("I am the new status update leader") - stopCh = make(chan struct{}) - go s.syncQueue.Run(time.Second, stopCh) - // trigger initial sync - s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) - // when this instance is the leader we need to enqueue - // an item to trigger the update of the Ingress status. - wait.PollUntil(updateInterval, func() (bool, error) { - s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) - return false, nil - }, stopCh) - }, - OnStoppedLeading: func() { - klog.V(2).Info("I am not status update leader anymore") - close(stopCh) - - // cancel the context - cancelContext() - - cancelContext = newLeaderCtx(ctx) - }, - OnNewLeader: func(identity string) { - klog.Infof("new leader elected: %v", identity) - }, - } - - broadcaster := record.NewBroadcaster() - hostname, _ := os.Hostname() - - recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ - Component: "ingress-leader-elector", - Host: hostname, - }) - - lock := resourcelock.ConfigMapLock{ - ConfigMapMeta: metav1.ObjectMeta{Namespace: s.pod.Namespace, Name: electionID}, - Client: s.Config.Client.CoreV1(), - LockConfig: resourcelock.ResourceLockConfig{ - Identity: s.pod.Name, - EventRecorder: recorder, - }, - } - - ttl := 30 * time.Second - var err error - s.elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: &lock, - LeaseDuration: ttl, - RenewDeadline: ttl / 2, - RetryPeriod: ttl / 4, - Callbacks: callbacks, - }) - if err != nil { - klog.Fatalf("unexpected error starting leader election: %v", err) - } - - cancelContext = newLeaderCtx(ctx) + // when this instance is the leader we need to enqueue + // an item to trigger the update of the Ingress status. + wait.PollUntil(updateInterval, func() (bool, error) { + s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) + return false, nil + }, stopCh) } -// Shutdown stop the sync. In case the instance is the leader it will remove the current IP +// Shutdown stops the sync. In case the instance is the leader it will remove the current IP // if there is no other instances running. func (s statusSync) Shutdown() { go s.syncQueue.Shutdown() - // remove IP from Ingress - if s.elector != nil && !s.elector.IsLeader() { - return - } - if !s.UpdateStatusOnShutdown { klog.Warningf("skipping update of status of Ingress rules") return @@ -226,10 +142,6 @@ func (s *statusSync) sync(key interface{}) error { return nil } - if s.elector != nil && !s.elector.IsLeader() { - return fmt.Errorf("i am not the current leader. Skiping status update") - } - addrs, err := s.runningAddresses() if err != nil { return err @@ -243,15 +155,10 @@ func (s statusSync) keyfunc(input interface{}) (interface{}, error) { return input, nil } -// NewStatusSyncer returns a new Sync instance -func NewStatusSyncer(config Config) Sync { - pod, err := k8s.GetPodDetails(config.Client) - if err != nil { - klog.Fatalf("unexpected error obtaining pod information: %v", err) - } - +// NewStatusSyncer returns a new Syncer instance +func NewStatusSyncer(podInfo *k8s.PodInfo, config Config) Syncer { st := statusSync{ - pod: pod, + pod: podInfo, Config: config, }