commit
e0793650d0
8 changed files with 260 additions and 127 deletions
|
@ -188,7 +188,11 @@ func (n *NGINXController) syncIngress(interface{}) error {
|
||||||
klog.Infof("Backend successfully reloaded.")
|
klog.Infof("Backend successfully reloaded.")
|
||||||
n.metricCollector.ConfigSuccess(hash, true)
|
n.metricCollector.ConfigSuccess(hash, true)
|
||||||
n.metricCollector.IncReloadCount()
|
n.metricCollector.IncReloadCount()
|
||||||
n.metricCollector.SetSSLExpireTime(servers)
|
|
||||||
|
if n.isLeader() {
|
||||||
|
klog.V(2).Infof("Updating ssl expiration metrics.")
|
||||||
|
n.metricCollector.SetSSLExpireTime(servers)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
|
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
|
||||||
|
|
|
@ -31,6 +31,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"text/template"
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
@ -115,6 +116,7 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("unexpected error obtaining pod information: %v", err)
|
klog.Fatalf("unexpected error obtaining pod information: %v", err)
|
||||||
}
|
}
|
||||||
|
n.podInfo = pod
|
||||||
|
|
||||||
n.store = store.New(
|
n.store = store.New(
|
||||||
config.EnableSSLChainCompletion,
|
config.EnableSSLChainCompletion,
|
||||||
|
@ -132,15 +134,13 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File
|
||||||
config.DisableCatchAll)
|
config.DisableCatchAll)
|
||||||
|
|
||||||
n.syncQueue = task.NewTaskQueue(n.syncIngress)
|
n.syncQueue = task.NewTaskQueue(n.syncIngress)
|
||||||
|
|
||||||
if config.UpdateStatus {
|
if config.UpdateStatus {
|
||||||
n.syncStatus = status.NewStatusSyncer(status.Config{
|
n.syncStatus = status.NewStatusSyncer(pod, status.Config{
|
||||||
Client: config.Client,
|
Client: config.Client,
|
||||||
PublishService: config.PublishService,
|
PublishService: config.PublishService,
|
||||||
PublishStatusAddress: config.PublishStatusAddress,
|
PublishStatusAddress: config.PublishStatusAddress,
|
||||||
IngressLister: n.store,
|
IngressLister: n.store,
|
||||||
ElectionID: config.ElectionID,
|
|
||||||
IngressClass: class.IngressClass,
|
|
||||||
DefaultIngressClass: class.DefaultClass,
|
|
||||||
UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
|
UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
|
||||||
UseNodeInternalIP: config.UseNodeInternalIP,
|
UseNodeInternalIP: config.UseNodeInternalIP,
|
||||||
})
|
})
|
||||||
|
@ -215,13 +215,15 @@ Error loading new template: %v
|
||||||
|
|
||||||
// NGINXController describes a NGINX Ingress controller.
|
// NGINXController describes a NGINX Ingress controller.
|
||||||
type NGINXController struct {
|
type NGINXController struct {
|
||||||
|
podInfo *k8s.PodInfo
|
||||||
|
|
||||||
cfg *Configuration
|
cfg *Configuration
|
||||||
|
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
|
|
||||||
syncQueue *task.Queue
|
syncQueue *task.Queue
|
||||||
|
|
||||||
syncStatus status.Sync
|
syncStatus status.Syncer
|
||||||
|
|
||||||
syncRateLimiter flowcontrol.RateLimiter
|
syncRateLimiter flowcontrol.RateLimiter
|
||||||
|
|
||||||
|
@ -254,6 +256,8 @@ type NGINXController struct {
|
||||||
fileSystem filesystem.Filesystem
|
fileSystem filesystem.Filesystem
|
||||||
|
|
||||||
metricCollector metric.Collector
|
metricCollector metric.Collector
|
||||||
|
|
||||||
|
currentLeader uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts a new NGINX master process running in the foreground.
|
// Start starts a new NGINX master process running in the foreground.
|
||||||
|
@ -262,10 +266,35 @@ func (n *NGINXController) Start() {
|
||||||
|
|
||||||
n.store.Run(n.stopCh)
|
n.store.Run(n.stopCh)
|
||||||
|
|
||||||
if n.syncStatus != nil {
|
// we need to use the defined ingress class to allow multiple leaders
|
||||||
go n.syncStatus.Run()
|
// in order to update information about ingress status
|
||||||
|
electionID := fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.DefaultClass)
|
||||||
|
if class.IngressClass != "" {
|
||||||
|
electionID = fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.IngressClass)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setupLeaderElection(&leaderElectionConfig{
|
||||||
|
Client: n.cfg.Client,
|
||||||
|
ElectionID: electionID,
|
||||||
|
OnStartedLeading: func(stopCh chan struct{}) {
|
||||||
|
if n.syncStatus != nil {
|
||||||
|
go n.syncStatus.Run(stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
n.setLeader(true)
|
||||||
|
n.metricCollector.OnStartedLeading(electionID)
|
||||||
|
// manually update SSL expiration metrics
|
||||||
|
// (to not wait for a reload)
|
||||||
|
n.metricCollector.SetSSLExpireTime(n.runningConfig.Servers)
|
||||||
|
},
|
||||||
|
OnStoppedLeading: func() {
|
||||||
|
n.setLeader(false)
|
||||||
|
n.metricCollector.OnStoppedLeading(electionID)
|
||||||
|
},
|
||||||
|
PodName: n.podInfo.Name,
|
||||||
|
PodNamespace: n.podInfo.Namespace,
|
||||||
|
})
|
||||||
|
|
||||||
cmd := nginxExecCommand()
|
cmd := nginxExecCommand()
|
||||||
|
|
||||||
// put NGINX in another process group to prevent it
|
// put NGINX in another process group to prevent it
|
||||||
|
@ -1099,3 +1128,15 @@ func buildRedirects(servers []*ingress.Server) []*redirect {
|
||||||
|
|
||||||
return redirectServers
|
return redirectServers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *NGINXController) setLeader(leader bool) {
|
||||||
|
var i uint32
|
||||||
|
if leader {
|
||||||
|
i = 1
|
||||||
|
}
|
||||||
|
atomic.StoreUint32(&n.currentLeader, i)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NGINXController) isLeader() bool {
|
||||||
|
return atomic.LoadUint32(&n.currentLeader) != 0
|
||||||
|
}
|
||||||
|
|
123
internal/ingress/controller/status.go
Normal file
123
internal/ingress/controller/status.go
Normal file
|
@ -0,0 +1,123 @@
|
||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/klog"
|
||||||
|
|
||||||
|
apiv1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/kubernetes/scheme"
|
||||||
|
"k8s.io/client-go/tools/leaderelection"
|
||||||
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||||
|
"k8s.io/client-go/tools/record"
|
||||||
|
)
|
||||||
|
|
||||||
|
type leaderElectionConfig struct {
|
||||||
|
PodName string
|
||||||
|
PodNamespace string
|
||||||
|
|
||||||
|
Client clientset.Interface
|
||||||
|
|
||||||
|
ElectionID string
|
||||||
|
|
||||||
|
OnStartedLeading func(chan struct{})
|
||||||
|
OnStoppedLeading func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupLeaderElection(config *leaderElectionConfig) {
|
||||||
|
var elector *leaderelection.LeaderElector
|
||||||
|
|
||||||
|
// start a new context
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
var cancelContext context.CancelFunc
|
||||||
|
|
||||||
|
var newLeaderCtx = func(ctx context.Context) context.CancelFunc {
|
||||||
|
// allow to cancel the context in case we stop being the leader
|
||||||
|
leaderCtx, cancel := context.WithCancel(ctx)
|
||||||
|
go elector.Run(leaderCtx)
|
||||||
|
return cancel
|
||||||
|
}
|
||||||
|
|
||||||
|
var stopCh chan struct{}
|
||||||
|
callbacks := leaderelection.LeaderCallbacks{
|
||||||
|
OnStartedLeading: func(ctx context.Context) {
|
||||||
|
klog.V(2).Infof("I am the new leader")
|
||||||
|
stopCh = make(chan struct{})
|
||||||
|
|
||||||
|
if config.OnStartedLeading != nil {
|
||||||
|
config.OnStartedLeading(stopCh)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
OnStoppedLeading: func() {
|
||||||
|
klog.V(2).Info("I am not leader anymore")
|
||||||
|
close(stopCh)
|
||||||
|
|
||||||
|
// cancel the context
|
||||||
|
cancelContext()
|
||||||
|
|
||||||
|
cancelContext = newLeaderCtx(ctx)
|
||||||
|
|
||||||
|
if config.OnStoppedLeading != nil {
|
||||||
|
config.OnStoppedLeading()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
OnNewLeader: func(identity string) {
|
||||||
|
klog.Infof("new leader elected: %v", identity)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
broadcaster := record.NewBroadcaster()
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
|
||||||
|
recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
|
||||||
|
Component: "ingress-leader-elector",
|
||||||
|
Host: hostname,
|
||||||
|
})
|
||||||
|
|
||||||
|
lock := resourcelock.ConfigMapLock{
|
||||||
|
ConfigMapMeta: metav1.ObjectMeta{Namespace: config.PodNamespace, Name: config.ElectionID},
|
||||||
|
Client: config.Client.CoreV1(),
|
||||||
|
LockConfig: resourcelock.ResourceLockConfig{
|
||||||
|
Identity: config.PodName,
|
||||||
|
EventRecorder: recorder,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ttl := 30 * time.Second
|
||||||
|
var err error
|
||||||
|
|
||||||
|
elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
||||||
|
Lock: &lock,
|
||||||
|
LeaseDuration: ttl,
|
||||||
|
RenewDeadline: ttl / 2,
|
||||||
|
RetryPeriod: ttl / 4,
|
||||||
|
|
||||||
|
Callbacks: callbacks,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
klog.Fatalf("unexpected error starting leader election: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cancelContext = newLeaderCtx(ctx)
|
||||||
|
}
|
|
@ -46,6 +46,8 @@ type Controller struct {
|
||||||
|
|
||||||
constLabels prometheus.Labels
|
constLabels prometheus.Labels
|
||||||
labels prometheus.Labels
|
labels prometheus.Labels
|
||||||
|
|
||||||
|
leaderElection *prometheus.GaugeVec
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewController creates a new prometheus collector for the
|
// NewController creates a new prometheus collector for the
|
||||||
|
@ -112,6 +114,15 @@ func NewController(pod, namespace, class string) *Controller {
|
||||||
},
|
},
|
||||||
sslLabelHost,
|
sslLabelHost,
|
||||||
),
|
),
|
||||||
|
leaderElection: prometheus.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: PrometheusNamespace,
|
||||||
|
Name: "leader_election_status",
|
||||||
|
Help: "Gauge reporting status of the leader election, 0 indicates follower, 1 indicates leader. 'name' is the string used to identify the lease",
|
||||||
|
ConstLabels: constLabels,
|
||||||
|
},
|
||||||
|
[]string{"name"},
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
return cm
|
return cm
|
||||||
|
@ -127,6 +138,16 @@ func (cm *Controller) IncReloadErrorCount() {
|
||||||
cm.reloadOperationErrors.With(cm.constLabels).Inc()
|
cm.reloadOperationErrors.With(cm.constLabels).Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnStartedLeading indicates the pod was elected as the leader
|
||||||
|
func (cm *Controller) OnStartedLeading(electionID string) {
|
||||||
|
cm.leaderElection.WithLabelValues(electionID).Set(1.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnStoppedLeading indicates the pod stopped being the leader
|
||||||
|
func (cm *Controller) OnStoppedLeading(electionID string) {
|
||||||
|
cm.leaderElection.WithLabelValues(electionID).Set(0)
|
||||||
|
}
|
||||||
|
|
||||||
// ConfigSuccess set a boolean flag according to the output of the controller configuration reload
|
// ConfigSuccess set a boolean flag according to the output of the controller configuration reload
|
||||||
func (cm *Controller) ConfigSuccess(hash uint64, success bool) {
|
func (cm *Controller) ConfigSuccess(hash uint64, success bool) {
|
||||||
if success {
|
if success {
|
||||||
|
@ -150,6 +171,7 @@ func (cm Controller) Describe(ch chan<- *prometheus.Desc) {
|
||||||
cm.reloadOperation.Describe(ch)
|
cm.reloadOperation.Describe(ch)
|
||||||
cm.reloadOperationErrors.Describe(ch)
|
cm.reloadOperationErrors.Describe(ch)
|
||||||
cm.sslExpireTime.Describe(ch)
|
cm.sslExpireTime.Describe(ch)
|
||||||
|
cm.leaderElection.Describe(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect implements the prometheus.Collector interface.
|
// Collect implements the prometheus.Collector interface.
|
||||||
|
@ -160,6 +182,7 @@ func (cm Controller) Collect(ch chan<- prometheus.Metric) {
|
||||||
cm.reloadOperation.Collect(ch)
|
cm.reloadOperation.Collect(ch)
|
||||||
cm.reloadOperationErrors.Collect(ch)
|
cm.reloadOperationErrors.Collect(ch)
|
||||||
cm.sslExpireTime.Collect(ch)
|
cm.sslExpireTime.Collect(ch)
|
||||||
|
cm.leaderElection.Collect(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetSSLExpireTime sets the expiration time of SSL Certificates
|
// SetSSLExpireTime sets the expiration time of SSL Certificates
|
||||||
|
@ -179,13 +202,21 @@ func (cm *Controller) SetSSLExpireTime(servers []*ingress.Server) {
|
||||||
|
|
||||||
// RemoveMetrics removes metrics for hostnames not available anymore
|
// RemoveMetrics removes metrics for hostnames not available anymore
|
||||||
func (cm *Controller) RemoveMetrics(hosts []string, registry prometheus.Gatherer) {
|
func (cm *Controller) RemoveMetrics(hosts []string, registry prometheus.Gatherer) {
|
||||||
|
cm.removeSSLExpireMetrics(true, hosts, registry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveAllSSLExpireMetrics removes metrics for expiration of SSL Certificates
|
||||||
|
func (cm *Controller) RemoveAllSSLExpireMetrics(registry prometheus.Gatherer) {
|
||||||
|
cm.removeSSLExpireMetrics(false, []string{}, registry)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cm *Controller) removeSSLExpireMetrics(onlyDefinedHosts bool, hosts []string, registry prometheus.Gatherer) {
|
||||||
mfs, err := registry.Gather()
|
mfs, err := registry.Gather()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Error gathering metrics: %v", err)
|
klog.Errorf("Error gathering metrics: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(2).Infof("removing SSL certificate metrics for %v hosts", hosts)
|
|
||||||
toRemove := sets.NewString(hosts...)
|
toRemove := sets.NewString(hosts...)
|
||||||
|
|
||||||
for _, mf := range mfs {
|
for _, mf := range mfs {
|
||||||
|
@ -208,7 +239,7 @@ func (cm *Controller) RemoveMetrics(hosts []string, registry prometheus.Gatherer
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if !toRemove.Has(host) {
|
if onlyDefinedHosts && !toRemove.Has(host) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,3 +52,9 @@ func (dc DummyCollector) SetSSLExpireTime([]*ingress.Server) {}
|
||||||
|
|
||||||
// SetHosts ...
|
// SetHosts ...
|
||||||
func (dc DummyCollector) SetHosts(hosts sets.String) {}
|
func (dc DummyCollector) SetHosts(hosts sets.String) {}
|
||||||
|
|
||||||
|
// OnStartedLeading indicates the pod is not the current leader
|
||||||
|
func (dc DummyCollector) OnStartedLeading(electionID string) {}
|
||||||
|
|
||||||
|
// OnStoppedLeading indicates the pod is not the current leader
|
||||||
|
func (dc DummyCollector) OnStoppedLeading(electionID string) {}
|
||||||
|
|
|
@ -36,6 +36,9 @@ type Collector interface {
|
||||||
IncReloadCount()
|
IncReloadCount()
|
||||||
IncReloadErrorCount()
|
IncReloadErrorCount()
|
||||||
|
|
||||||
|
OnStartedLeading(string)
|
||||||
|
OnStoppedLeading(string)
|
||||||
|
|
||||||
RemoveMetrics(ingresses, endpoints []string)
|
RemoveMetrics(ingresses, endpoints []string)
|
||||||
|
|
||||||
SetSSLExpireTime([]*ingress.Server)
|
SetSSLExpireTime([]*ingress.Server)
|
||||||
|
@ -147,3 +150,14 @@ func (c *collector) SetSSLExpireTime(servers []*ingress.Server) {
|
||||||
func (c *collector) SetHosts(hosts sets.String) {
|
func (c *collector) SetHosts(hosts sets.String) {
|
||||||
c.socket.SetHosts(hosts)
|
c.socket.SetHosts(hosts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnStartedLeading indicates the pod was elected as the leader
|
||||||
|
func (c *collector) OnStartedLeading(electionID string) {
|
||||||
|
c.ingressController.OnStartedLeading(electionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnStoppedLeading indicates the pod stopped being the leader
|
||||||
|
func (c *collector) OnStoppedLeading(electionID string) {
|
||||||
|
c.ingressController.OnStoppedLeading(electionID)
|
||||||
|
c.ingressController.RemoveAllSSLExpireMetrics(c.registry)
|
||||||
|
}
|
||||||
|
|
|
@ -17,10 +17,8 @@ limitations under the License.
|
||||||
package status
|
package status
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -34,10 +32,6 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/scheme"
|
|
||||||
"k8s.io/client-go/tools/leaderelection"
|
|
||||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
|
||||||
"k8s.io/client-go/tools/record"
|
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
||||||
|
|
||||||
"k8s.io/ingress-nginx/internal/ingress"
|
"k8s.io/ingress-nginx/internal/ingress"
|
||||||
|
@ -49,9 +43,10 @@ const (
|
||||||
updateInterval = 60 * time.Second
|
updateInterval = 60 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// Sync ...
|
// Syncer ...
|
||||||
type Sync interface {
|
type Syncer interface {
|
||||||
Run()
|
Run(chan struct{})
|
||||||
|
|
||||||
Shutdown()
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,16 +63,11 @@ type Config struct {
|
||||||
|
|
||||||
PublishStatusAddress string
|
PublishStatusAddress string
|
||||||
|
|
||||||
ElectionID string
|
|
||||||
|
|
||||||
UpdateStatusOnShutdown bool
|
UpdateStatusOnShutdown bool
|
||||||
|
|
||||||
UseNodeInternalIP bool
|
UseNodeInternalIP bool
|
||||||
|
|
||||||
IngressLister ingressLister
|
IngressLister ingressLister
|
||||||
|
|
||||||
DefaultIngressClass string
|
|
||||||
IngressClass string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// statusSync keeps the status IP in each Ingress rule updated executing a periodic check
|
// statusSync keeps the status IP in each Ingress rule updated executing a periodic check
|
||||||
|
@ -89,109 +79,35 @@ type Config struct {
|
||||||
// two flags are set, the source is the IP/s of the node/s
|
// two flags are set, the source is the IP/s of the node/s
|
||||||
type statusSync struct {
|
type statusSync struct {
|
||||||
Config
|
Config
|
||||||
|
|
||||||
// pod contains runtime information about this pod
|
// pod contains runtime information about this pod
|
||||||
pod *k8s.PodInfo
|
pod *k8s.PodInfo
|
||||||
|
|
||||||
elector *leaderelection.LeaderElector
|
|
||||||
|
|
||||||
// workqueue used to keep in sync the status IP/s
|
// workqueue used to keep in sync the status IP/s
|
||||||
// in the Ingress rules
|
// in the Ingress rules
|
||||||
syncQueue *task.Queue
|
syncQueue *task.Queue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the loop to keep the status in sync
|
// Start starts the loop to keep the status in sync
|
||||||
func (s statusSync) Run() {
|
func (s statusSync) Run(stopCh chan struct{}) {
|
||||||
// we need to use the defined ingress class to allow multiple leaders
|
go s.syncQueue.Run(time.Second, stopCh)
|
||||||
// in order to update information about ingress status
|
|
||||||
electionID := fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.DefaultIngressClass)
|
|
||||||
if s.Config.IngressClass != "" {
|
|
||||||
electionID = fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.IngressClass)
|
|
||||||
}
|
|
||||||
|
|
||||||
// start a new context
|
// trigger initial sync
|
||||||
ctx := context.Background()
|
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||||
|
|
||||||
var cancelContext context.CancelFunc
|
// when this instance is the leader we need to enqueue
|
||||||
|
// an item to trigger the update of the Ingress status.
|
||||||
var newLeaderCtx = func(ctx context.Context) context.CancelFunc {
|
wait.PollUntil(updateInterval, func() (bool, error) {
|
||||||
// allow to cancel the context in case we stop being the leader
|
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
||||||
leaderCtx, cancel := context.WithCancel(ctx)
|
return false, nil
|
||||||
go s.elector.Run(leaderCtx)
|
}, stopCh)
|
||||||
return cancel
|
|
||||||
}
|
|
||||||
|
|
||||||
var stopCh chan struct{}
|
|
||||||
callbacks := leaderelection.LeaderCallbacks{
|
|
||||||
OnStartedLeading: func(ctx context.Context) {
|
|
||||||
klog.V(2).Infof("I am the new status update leader")
|
|
||||||
stopCh = make(chan struct{})
|
|
||||||
go s.syncQueue.Run(time.Second, stopCh)
|
|
||||||
// trigger initial sync
|
|
||||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
|
||||||
// when this instance is the leader we need to enqueue
|
|
||||||
// an item to trigger the update of the Ingress status.
|
|
||||||
wait.PollUntil(updateInterval, func() (bool, error) {
|
|
||||||
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
|
|
||||||
return false, nil
|
|
||||||
}, stopCh)
|
|
||||||
},
|
|
||||||
OnStoppedLeading: func() {
|
|
||||||
klog.V(2).Info("I am not status update leader anymore")
|
|
||||||
close(stopCh)
|
|
||||||
|
|
||||||
// cancel the context
|
|
||||||
cancelContext()
|
|
||||||
|
|
||||||
cancelContext = newLeaderCtx(ctx)
|
|
||||||
},
|
|
||||||
OnNewLeader: func(identity string) {
|
|
||||||
klog.Infof("new leader elected: %v", identity)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
broadcaster := record.NewBroadcaster()
|
|
||||||
hostname, _ := os.Hostname()
|
|
||||||
|
|
||||||
recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
|
|
||||||
Component: "ingress-leader-elector",
|
|
||||||
Host: hostname,
|
|
||||||
})
|
|
||||||
|
|
||||||
lock := resourcelock.ConfigMapLock{
|
|
||||||
ConfigMapMeta: metav1.ObjectMeta{Namespace: s.pod.Namespace, Name: electionID},
|
|
||||||
Client: s.Config.Client.CoreV1(),
|
|
||||||
LockConfig: resourcelock.ResourceLockConfig{
|
|
||||||
Identity: s.pod.Name,
|
|
||||||
EventRecorder: recorder,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
ttl := 30 * time.Second
|
|
||||||
var err error
|
|
||||||
s.elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
|
||||||
Lock: &lock,
|
|
||||||
LeaseDuration: ttl,
|
|
||||||
RenewDeadline: ttl / 2,
|
|
||||||
RetryPeriod: ttl / 4,
|
|
||||||
Callbacks: callbacks,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
klog.Fatalf("unexpected error starting leader election: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cancelContext = newLeaderCtx(ctx)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown stop the sync. In case the instance is the leader it will remove the current IP
|
// Shutdown stops the sync. In case the instance is the leader it will remove the current IP
|
||||||
// if there is no other instances running.
|
// if there is no other instances running.
|
||||||
func (s statusSync) Shutdown() {
|
func (s statusSync) Shutdown() {
|
||||||
go s.syncQueue.Shutdown()
|
go s.syncQueue.Shutdown()
|
||||||
|
|
||||||
// remove IP from Ingress
|
|
||||||
if s.elector != nil && !s.elector.IsLeader() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !s.UpdateStatusOnShutdown {
|
if !s.UpdateStatusOnShutdown {
|
||||||
klog.Warningf("skipping update of status of Ingress rules")
|
klog.Warningf("skipping update of status of Ingress rules")
|
||||||
return
|
return
|
||||||
|
@ -226,10 +142,6 @@ func (s *statusSync) sync(key interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.elector != nil && !s.elector.IsLeader() {
|
|
||||||
return fmt.Errorf("i am not the current leader. Skiping status update")
|
|
||||||
}
|
|
||||||
|
|
||||||
addrs, err := s.runningAddresses()
|
addrs, err := s.runningAddresses()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -243,15 +155,10 @@ func (s statusSync) keyfunc(input interface{}) (interface{}, error) {
|
||||||
return input, nil
|
return input, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStatusSyncer returns a new Sync instance
|
// NewStatusSyncer returns a new Syncer instance
|
||||||
func NewStatusSyncer(config Config) Sync {
|
func NewStatusSyncer(podInfo *k8s.PodInfo, config Config) Syncer {
|
||||||
pod, err := k8s.GetPodDetails(config.Client)
|
|
||||||
if err != nil {
|
|
||||||
klog.Fatalf("unexpected error obtaining pod information: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
st := statusSync{
|
st := statusSync{
|
||||||
pod: pod,
|
pod: podInfo,
|
||||||
|
|
||||||
Config: config,
|
Config: config,
|
||||||
}
|
}
|
||||||
|
|
|
@ -287,12 +287,16 @@ func TestStatusActions(t *testing.T) {
|
||||||
Client: buildSimpleClientSet(),
|
Client: buildSimpleClientSet(),
|
||||||
PublishService: "",
|
PublishService: "",
|
||||||
IngressLister: buildIngressLister(),
|
IngressLister: buildIngressLister(),
|
||||||
DefaultIngressClass: "nginx",
|
|
||||||
IngressClass: "",
|
|
||||||
UpdateStatusOnShutdown: true,
|
UpdateStatusOnShutdown: true,
|
||||||
}
|
}
|
||||||
// create object
|
// create object
|
||||||
fkSync := NewStatusSyncer(c)
|
fkSync := NewStatusSyncer(&k8s.PodInfo{
|
||||||
|
Name: "foo_base_pod",
|
||||||
|
Namespace: apiv1.NamespaceDefault,
|
||||||
|
Labels: map[string]string{
|
||||||
|
"lable_sig": "foo_pod",
|
||||||
|
},
|
||||||
|
}, c)
|
||||||
if fkSync == nil {
|
if fkSync == nil {
|
||||||
t.Fatalf("expected a valid Sync")
|
t.Fatalf("expected a valid Sync")
|
||||||
}
|
}
|
||||||
|
@ -300,7 +304,10 @@ func TestStatusActions(t *testing.T) {
|
||||||
fk := fkSync.(statusSync)
|
fk := fkSync.(statusSync)
|
||||||
|
|
||||||
// start it and wait for the election and syn actions
|
// start it and wait for the election and syn actions
|
||||||
go fk.Run()
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
|
go fk.Run(stopCh)
|
||||||
// wait for the election
|
// wait for the election
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
// execute sync
|
// execute sync
|
||||||
|
|
Loading…
Reference in a new issue