Update leader election methods

This commit is contained in:
Manuel Alejandro de Brito Fontes 2018-09-27 14:57:32 -03:00
parent 6c33bee8fd
commit 4c46ee95c9
No known key found for this signature in database
GPG key ID: 786136016A8BA02A
3 changed files with 14 additions and 8 deletions

View file

@ -18,6 +18,7 @@ package controller
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -256,7 +257,7 @@ func (n *NGINXController) Start() {
n.store.Run(n.stopCh) n.store.Run(n.stopCh)
if n.syncStatus != nil { if n.syncStatus != nil {
go n.syncStatus.Run() go n.syncStatus.Run(context.Background())
} }
cmd := nginxExecCommand() cmd := nginxExecCommand()

View file

@ -17,6 +17,7 @@ limitations under the License.
package status package status
import ( import (
"context"
"fmt" "fmt"
"net" "net"
"os" "os"
@ -50,7 +51,7 @@ const (
// Sync ... // Sync ...
type Sync interface { type Sync interface {
Run() Run(ctx context.Context)
Shutdown() Shutdown()
} }
@ -98,8 +99,8 @@ type statusSync struct {
} }
// Run starts the loop to keep the status in sync // Run starts the loop to keep the status in sync
func (s statusSync) Run() { func (s statusSync) Run(ctx context.Context) {
s.elector.Run() s.elector.Run(ctx)
} }
// Shutdown stop the sync. In case the instance is the leader it will remove the current IP // Shutdown stop the sync. In case the instance is the leader it will remove the current IP
@ -179,19 +180,22 @@ func NewStatusSyncer(config Config) Sync {
electionID = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass) electionID = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass)
} }
var stopCh chan struct{}
callbacks := leaderelection.LeaderCallbacks{ callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(stop <-chan struct{}) { OnStartedLeading: func(ctx context.Context) {
glog.V(2).Infof("I am the new status update leader") glog.V(2).Infof("I am the new status update leader")
go st.syncQueue.Run(time.Second, stop) stopCh = make(chan struct{})
go st.syncQueue.Run(time.Second, stopCh)
// when this instance is the leader we need to enqueue // when this instance is the leader we need to enqueue
// an item to trigger the update of the Ingress status. // an item to trigger the update of the Ingress status.
wait.PollUntil(updateInterval, func() (bool, error) { wait.PollUntil(updateInterval, func() (bool, error) {
st.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) st.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
return false, nil return false, nil
}, stop) }, stopCh)
}, },
OnStoppedLeading: func() { OnStoppedLeading: func() {
glog.V(2).Infof("I am not status update leader anymore") glog.V(2).Infof("I am not status update leader anymore")
close(stopCh)
}, },
OnNewLeader: func(identity string) { OnNewLeader: func(identity string) {
glog.Infof("new leader elected: %v", identity) glog.Infof("new leader elected: %v", identity)

View file

@ -17,6 +17,7 @@ limitations under the License.
package status package status
import ( import (
"context"
"os" "os"
"testing" "testing"
"time" "time"
@ -297,7 +298,7 @@ func TestStatusActions(t *testing.T) {
fk := fkSync.(statusSync) fk := fkSync.(statusSync)
// start it and wait for the election and syn actions // start it and wait for the election and syn actions
go fk.Run() go fk.Run(context.Background())
// wait for the election // wait for the election
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
// execute sync // execute sync