Merge pull request #1392 from aledbf/fix-sync

Avoid issues with goroutines updating fields
This commit is contained in:
Manuel Alejandro de Brito Fontes 2017-09-19 14:14:40 -07:00 committed by GitHub
commit f0d926a4d0
2 changed files with 31 additions and 10 deletions

View file

@ -24,6 +24,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -110,9 +111,9 @@ type GenericController struct {
// runningConfig contains the running configuration in the Backend // runningConfig contains the running configuration in the Backend
runningConfig *ingress.Configuration runningConfig *ingress.Configuration
forceReload bool forceReload int32
initialSyncDone bool initialSyncDone int32
} }
// Configuration contains all the settings required by an Ingress controller // Configuration contains all the settings required by an Ingress controller
@ -283,7 +284,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
PassthroughBackends: passUpstreams, PassthroughBackends: passUpstreams,
} }
if !ic.forceReload && ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg) { if !ic.isForceReload() && ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg) {
glog.V(3).Infof("skipping backend reload (no changes detected)") glog.V(3).Infof("skipping backend reload (no changes detected)")
return nil return nil
} }
@ -302,7 +303,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
setSSLExpireTime(servers) setSSLExpireTime(servers)
ic.runningConfig = &pcfg ic.runningConfig = &pcfg
ic.forceReload = false ic.setForceReload(false)
return nil return nil
} }
@ -1185,7 +1186,7 @@ func (ic GenericController) Stop() error {
} }
// Start starts the Ingress controller. // Start starts the Ingress controller.
func (ic GenericController) Start() { func (ic *GenericController) Start() {
glog.Infof("starting Ingress controller") glog.Infof("starting Ingress controller")
go ic.ingController.Run(ic.stopCh) go ic.ingController.Run(ic.stopCh)
@ -1224,14 +1225,14 @@ func (ic GenericController) Start() {
createDefaultSSLCertificate() createDefaultSSLCertificate()
ic.setInitialSyncDone()
go ic.syncQueue.Run(time.Second, ic.stopCh) go ic.syncQueue.Run(time.Second, ic.stopCh)
if ic.syncStatus != nil { if ic.syncStatus != nil {
go ic.syncStatus.Run(ic.stopCh) go ic.syncStatus.Run(ic.stopCh)
} }
ic.initialSyncDone = true
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
// force initial sync // force initial sync
ic.syncQueue.Enqueue(&extensions.Ingress{}) ic.syncQueue.Enqueue(&extensions.Ingress{})
@ -1239,6 +1240,26 @@ func (ic GenericController) Start() {
<-ic.stopCh <-ic.stopCh
} }
func (ic *GenericController) isForceReload() bool {
return atomic.LoadInt32(&ic.forceReload) != 0
}
func (ic *GenericController) setForceReload(shouldReload bool) {
if shouldReload {
atomic.StoreInt32(&ic.forceReload, 1)
} else {
atomic.StoreInt32(&ic.forceReload, 0)
}
}
func (ic *GenericController) isInitialSyncDone() bool {
return atomic.LoadInt32(&ic.initialSyncDone) != 0
}
func (ic *GenericController) setInitialSyncDone() {
atomic.StoreInt32(&ic.initialSyncDone, 1)
}
func createDefaultSSLCertificate() { func createDefaultSSLCertificate() {
defCert, defKey := ssl.GetFakeSSLCert() defCert, defKey := ssl.GetFakeSSLCert()
c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{}) c, err := ssl.AddOrUpdateCertAndKey(fakeCertificate, defCert, defKey, []byte{})

View file

@ -46,7 +46,7 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
} }
ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name)) ic.recorder.Eventf(addIng, apiv1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", addIng.Namespace, addIng.Name))
if ic.initialSyncDone { if ic.isInitialSyncDone() {
ic.syncQueue.Enqueue(obj) ic.syncQueue.Enqueue(obj)
} }
}, },
@ -142,7 +142,7 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
if mapKey == ic.cfg.ConfigMapName { if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("adding configmap %v to backend", mapKey) glog.V(2).Infof("adding configmap %v to backend", mapKey)
ic.cfg.Backend.SetConfig(upCmap) ic.cfg.Backend.SetConfig(upCmap)
ic.forceReload = true ic.setForceReload(true)
} }
}, },
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
@ -152,7 +152,7 @@ func (ic *GenericController) createListers(disableNodeLister bool) {
if mapKey == ic.cfg.ConfigMapName { if mapKey == ic.cfg.ConfigMapName {
glog.V(2).Infof("updating configmap backend (%v)", mapKey) glog.V(2).Infof("updating configmap backend (%v)", mapKey)
ic.cfg.Backend.SetConfig(upCmap) ic.cfg.Backend.SetConfig(upCmap)
ic.forceReload = true ic.setForceReload(true)
} }
// updates to configuration configmaps can trigger an update // updates to configuration configmaps can trigger an update
if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName { if mapKey == ic.cfg.ConfigMapName || mapKey == ic.cfg.TCPConfigMapName || mapKey == ic.cfg.UDPConfigMapName {