Fix multiple leader election
This commit is contained in:
parent
7eb2b81fd3
commit
f3efe498ed
2 changed files with 15 additions and 32 deletions
|
@ -85,13 +85,11 @@ type statusSync struct {
|
||||||
// workqueue used to keep in sync the status IP/s
|
// workqueue used to keep in sync the status IP/s
|
||||||
// in the Ingress rules
|
// in the Ingress rules
|
||||||
syncQueue *task.Queue
|
syncQueue *task.Queue
|
||||||
|
|
||||||
runLock *sync.Mutex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the loop to keep the status in sync
|
// Run starts the loop to keep the status in sync
|
||||||
func (s statusSync) Run(stopCh <-chan struct{}) {
|
func (s statusSync) Run(stopCh <-chan struct{}) {
|
||||||
go wait.Forever(s.elector.Run, 0)
|
go s.elector.Run()
|
||||||
go wait.Forever(s.update, updateInterval)
|
go wait.Forever(s.update, updateInterval)
|
||||||
go s.syncQueue.Run(time.Second, stopCh)
|
go s.syncQueue.Run(time.Second, stopCh)
|
||||||
<-stopCh
|
<-stopCh
|
||||||
|
@ -140,9 +138,6 @@ func (s statusSync) Shutdown() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *statusSync) sync(key interface{}) error {
|
func (s *statusSync) sync(key interface{}) error {
|
||||||
s.runLock.Lock()
|
|
||||||
defer s.runLock.Unlock()
|
|
||||||
|
|
||||||
if s.syncQueue.IsShuttingDown() {
|
if s.syncQueue.IsShuttingDown() {
|
||||||
glog.V(2).Infof("skipping Ingress status update (shutting down in progress)")
|
glog.V(2).Infof("skipping Ingress status update (shutting down in progress)")
|
||||||
return nil
|
return nil
|
||||||
|
@ -162,18 +157,6 @@ func (s *statusSync) sync(key interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// callback invoked function when a new leader is elected
|
|
||||||
func (s *statusSync) callback(leader string) {
|
|
||||||
if s.syncQueue.IsShuttingDown() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(2).Infof("new leader elected (%v)", leader)
|
|
||||||
if leader == s.pod.Name {
|
|
||||||
glog.V(2).Infof("I am the new status update leader")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s statusSync) keyfunc(input interface{}) (interface{}, error) {
|
func (s statusSync) keyfunc(input interface{}) (interface{}, error) {
|
||||||
return input, nil
|
return input, nil
|
||||||
}
|
}
|
||||||
|
@ -186,9 +169,9 @@ func NewStatusSyncer(config Config) Sync {
|
||||||
}
|
}
|
||||||
|
|
||||||
st := statusSync{
|
st := statusSync{
|
||||||
pod: pod,
|
pod: pod,
|
||||||
runLock: &sync.Mutex{},
|
|
||||||
Config: config,
|
Config: config,
|
||||||
}
|
}
|
||||||
st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)
|
st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)
|
||||||
|
|
||||||
|
@ -201,10 +184,13 @@ func NewStatusSyncer(config Config) Sync {
|
||||||
|
|
||||||
callbacks := leaderelection.LeaderCallbacks{
|
callbacks := leaderelection.LeaderCallbacks{
|
||||||
OnStartedLeading: func(stop <-chan struct{}) {
|
OnStartedLeading: func(stop <-chan struct{}) {
|
||||||
st.callback(pod.Name)
|
glog.V(2).Infof("I am the new status update leader")
|
||||||
},
|
},
|
||||||
OnStoppedLeading: func() {
|
OnStoppedLeading: func() {
|
||||||
st.callback("")
|
glog.V(2).Infof("I am not status update leader anymore")
|
||||||
|
},
|
||||||
|
OnNewLeader: func(identity string) {
|
||||||
|
glog.Infof("new leader elected: %v", identity)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,16 +206,17 @@ func NewStatusSyncer(config Config) Sync {
|
||||||
ConfigMapMeta: meta_v1.ObjectMeta{Namespace: pod.Namespace, Name: electionID},
|
ConfigMapMeta: meta_v1.ObjectMeta{Namespace: pod.Namespace, Name: electionID},
|
||||||
Client: config.Client.CoreV1(),
|
Client: config.Client.CoreV1(),
|
||||||
LockConfig: resourcelock.ResourceLockConfig{
|
LockConfig: resourcelock.ResourceLockConfig{
|
||||||
Identity: electionID,
|
Identity: pod.Name,
|
||||||
EventRecorder: recorder,
|
EventRecorder: recorder,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ttl := 30 * time.Second
|
||||||
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
||||||
Lock: &lock,
|
Lock: &lock,
|
||||||
LeaseDuration: 30 * time.Second,
|
LeaseDuration: ttl,
|
||||||
RenewDeadline: 15 * time.Second,
|
RenewDeadline: ttl / 2,
|
||||||
RetryPeriod: 5 * time.Second,
|
RetryPeriod: ttl / 4,
|
||||||
Callbacks: callbacks,
|
Callbacks: callbacks,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ package status
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -245,7 +244,6 @@ func buildStatusSync() statusSync {
|
||||||
"lable_sig": "foo_pod",
|
"lable_sig": "foo_pod",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
runLock: &sync.Mutex{},
|
|
||||||
syncQueue: task.NewTaskQueue(fakeSynFn),
|
syncQueue: task.NewTaskQueue(fakeSynFn),
|
||||||
Config: Config{
|
Config: Config{
|
||||||
Client: buildSimpleClientSet(),
|
Client: buildSimpleClientSet(),
|
||||||
|
@ -328,9 +326,7 @@ func TestStatusActions(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCallback(t *testing.T) {
|
func TestCallback(t *testing.T) {
|
||||||
fk := buildStatusSync()
|
buildStatusSync()
|
||||||
// do nothing
|
|
||||||
fk.callback("foo_base_pod")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestKeyfunc(t *testing.T) {
|
func TestKeyfunc(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue