Refactor initial synchronization of ingress objects (#1891)
This commit is contained in:
parent
313fdd2d1a
commit
142b444685
1 changed files with 18 additions and 3 deletions
|
@ -19,6 +19,7 @@ package controller
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
|
@ -42,15 +43,16 @@ type cacheController struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cacheController) Run(stopCh chan struct{}) {
|
func (c *cacheController) Run(stopCh chan struct{}) {
|
||||||
go c.Ingress.Run(stopCh)
|
|
||||||
go c.Endpoint.Run(stopCh)
|
go c.Endpoint.Run(stopCh)
|
||||||
go c.Service.Run(stopCh)
|
go c.Service.Run(stopCh)
|
||||||
go c.Secret.Run(stopCh)
|
go c.Secret.Run(stopCh)
|
||||||
go c.Configmap.Run(stopCh)
|
go c.Configmap.Run(stopCh)
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
go c.Ingress.Run(stopCh)
|
||||||
|
|
||||||
// Wait for all involved caches to be synced, before processing items from the queue is started
|
// Wait for all involved caches to be synced, before processing items from the queue is started
|
||||||
if !cache.WaitForCacheSync(stopCh,
|
if !cache.WaitForCacheSync(stopCh,
|
||||||
c.Ingress.HasSynced,
|
|
||||||
c.Endpoint.HasSynced,
|
c.Endpoint.HasSynced,
|
||||||
c.Service.HasSynced,
|
c.Service.HasSynced,
|
||||||
c.Secret.HasSynced,
|
c.Secret.HasSynced,
|
||||||
|
@ -58,6 +60,16 @@ func (c *cacheController) Run(stopCh chan struct{}) {
|
||||||
) {
|
) {
|
||||||
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
|
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We need to wait before start syncing the ingress rules
|
||||||
|
// because the rules requires content from other listers
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
go c.Ingress.Run(stopCh)
|
||||||
|
if !cache.WaitForCacheSync(stopCh,
|
||||||
|
c.Ingress.HasSynced,
|
||||||
|
) {
|
||||||
|
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NGINXController) createListers(stopCh chan struct{}) (*ingress.StoreLister, *cacheController) {
|
func (n *NGINXController) createListers(stopCh chan struct{}) (*ingress.StoreLister, *cacheController) {
|
||||||
|
@ -120,6 +132,9 @@ func (n *NGINXController) createListers(stopCh chan struct{}) (*ingress.StoreLis
|
||||||
}
|
}
|
||||||
|
|
||||||
secrEventHandler := cache.ResourceEventHandlerFuncs{
|
secrEventHandler := cache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: func(obj interface{}) {
|
||||||
|
n.syncQueue.Enqueue(obj)
|
||||||
|
},
|
||||||
UpdateFunc: func(old, cur interface{}) {
|
UpdateFunc: func(old, cur interface{}) {
|
||||||
if !reflect.DeepEqual(old, cur) {
|
if !reflect.DeepEqual(old, cur) {
|
||||||
sec := cur.(*apiv1.Secret)
|
sec := cur.(*apiv1.Secret)
|
||||||
|
@ -147,7 +162,7 @@ func (n *NGINXController) createListers(stopCh chan struct{}) (*ingress.StoreLis
|
||||||
}
|
}
|
||||||
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
key := fmt.Sprintf("%v/%v", sec.Namespace, sec.Name)
|
||||||
n.sslCertTracker.Delete(key)
|
n.sslCertTracker.Delete(key)
|
||||||
n.syncQueue.Enqueue(key)
|
n.syncQueue.Enqueue(obj)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue