Fix data race updating ingress status (#1872)
This commit is contained in:
parent
da829748ec
commit
a09527cf6d
3 changed files with 14 additions and 22 deletions
|
@ -268,7 +268,7 @@ func (n *NGINXController) Start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.syncStatus != nil {
|
if n.syncStatus != nil {
|
||||||
go n.syncStatus.Run(n.stopCh)
|
go n.syncStatus.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh)
|
go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh)
|
||||||
|
|
|
@ -52,7 +52,7 @@ const (
|
||||||
|
|
||||||
// Sync ...
|
// Sync ...
|
||||||
type Sync interface {
|
type Sync interface {
|
||||||
Run(stopCh <-chan struct{})
|
Run()
|
||||||
Shutdown()
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,16 +91,8 @@ type statusSync struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the loop to keep the status in sync
|
// Run starts the loop to keep the status in sync
|
||||||
func (s statusSync) Run(stopCh <-chan struct{}) {
|
func (s statusSync) Run() {
|
||||||
go s.elector.Run()
|
s.elector.Run()
|
||||||
go wait.Forever(s.update, updateInterval)
|
|
||||||
go s.syncQueue.Run(time.Second, stopCh)
|
|
||||||
<-stopCh
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statusSync) update() {
|
|
||||||
// send a dummy object to the queue to force a sync
|
|
||||||
s.syncQueue.Enqueue("sync status")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown stop the sync. In case the instance is the leader it will remove the current IP
|
// Shutdown stop the sync. In case the instance is the leader it will remove the current IP
|
||||||
|
@ -146,11 +138,6 @@ func (s *statusSync) sync(key interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.elector.IsLeader() {
|
|
||||||
glog.V(2).Infof("skipping Ingress status update (I am not the current leader)")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
addrs, err := s.runningAddresses()
|
addrs, err := s.runningAddresses()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -188,6 +175,12 @@ func NewStatusSyncer(config Config) Sync {
|
||||||
callbacks := leaderelection.LeaderCallbacks{
|
callbacks := leaderelection.LeaderCallbacks{
|
||||||
OnStartedLeading: func(stop <-chan struct{}) {
|
OnStartedLeading: func(stop <-chan struct{}) {
|
||||||
glog.V(2).Infof("I am the new status update leader")
|
glog.V(2).Infof("I am the new status update leader")
|
||||||
|
go st.syncQueue.Run(time.Second, stop)
|
||||||
|
wait.PollUntil(updateInterval, func() (bool, error) {
|
||||||
|
// send a dummy object to the queue to force a sync
|
||||||
|
st.syncQueue.Enqueue("sync status")
|
||||||
|
return false, nil
|
||||||
|
}, stop)
|
||||||
},
|
},
|
||||||
OnStoppedLeading: func() {
|
OnStoppedLeading: func() {
|
||||||
glog.V(2).Infof("I am not status update leader anymore")
|
glog.V(2).Infof("I am not status update leader anymore")
|
||||||
|
|
|
@ -273,9 +273,8 @@ func TestStatusActions(t *testing.T) {
|
||||||
|
|
||||||
fk := fkSync.(statusSync)
|
fk := fkSync.(statusSync)
|
||||||
|
|
||||||
ns := make(chan struct{})
|
|
||||||
// start it and wait for the election and syn actions
|
// start it and wait for the election and syn actions
|
||||||
go fk.Run(ns)
|
go fk.Run()
|
||||||
// wait for the election
|
// wait for the election
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
// execute sync
|
// execute sync
|
||||||
|
@ -294,6 +293,8 @@ func TestStatusActions(t *testing.T) {
|
||||||
t.Fatalf("returned %v but expected %v", fooIngress1CurIPs, newIPs)
|
t.Fatalf("returned %v but expected %v", fooIngress1CurIPs, newIPs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
// execute shutdown
|
// execute shutdown
|
||||||
fk.Shutdown()
|
fk.Shutdown()
|
||||||
// ingress should be empty
|
// ingress should be empty
|
||||||
|
@ -314,9 +315,6 @@ func TestStatusActions(t *testing.T) {
|
||||||
if oic.Status.LoadBalancer.Ingress[0].IP != "0.0.0.0" && oic.Status.LoadBalancer.Ingress[0].Hostname != "foo.bar.com" {
|
if oic.Status.LoadBalancer.Ingress[0].IP != "0.0.0.0" && oic.Status.LoadBalancer.Ingress[0].Hostname != "foo.bar.com" {
|
||||||
t.Fatalf("invalid ingress status for rule with different class")
|
t.Fatalf("invalid ingress status for rule with different class")
|
||||||
}
|
}
|
||||||
|
|
||||||
// end test
|
|
||||||
ns <- struct{}{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCallback(t *testing.T) {
|
func TestCallback(t *testing.T) {
|
||||||
|
@ -325,6 +323,7 @@ func TestCallback(t *testing.T) {
|
||||||
|
|
||||||
func TestKeyfunc(t *testing.T) {
|
func TestKeyfunc(t *testing.T) {
|
||||||
fk := buildStatusSync()
|
fk := buildStatusSync()
|
||||||
|
|
||||||
i := "foo_base_pod"
|
i := "foo_base_pod"
|
||||||
r, err := fk.keyfunc(i)
|
r, err := fk.keyfunc(i)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue