diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index cb8d3712c..e669c2313 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -259,6 +259,56 @@ func (n *NGINXController) syncIngress(interface{}) error { return nil } +// syncEndpoints collects and update only Dynamic configs +// Triggered for Endpoints changes +func (n *NGINXController) syncEndpoints(interface{}) error { + n.syncRateLimiter.Accept() + + if n.syncEndpointsQueue.IsShuttingDown() { + return nil + } + + ings := n.store.ListIngresses() + + _, _, pcfg := n.getConfiguration(ings) + + if n.runningConfig.Equal(pcfg) { + klog.V(3).Infof("No configuration change detected, skipping backend reload") + return nil + } + + retry := wait.Backoff{ + Steps: 1 + n.cfg.DynamicConfigurationRetries, + Duration: time.Second, + Factor: 1.3, + Jitter: 0.1, + } + + retriesRemaining := retry.Steps + err := wait.ExponentialBackoff(retry, func() (bool, error) { + err := n.configureDynamically(pcfg) + if err == nil { + klog.V(2).Infof("Endpoints reconfiguration succeeded.") + return true, nil + } + retriesRemaining-- + if retriesRemaining > 0 { + klog.Warningf("Endpoints reconfiguration failed (retrying; %d retries left): %v", retriesRemaining, err) + return false, nil + } + klog.Warningf("Endpoints reconfiguration failed: %v", err) + return false, err + }) + if err != nil { + klog.Errorf("Unexpected failure reconfiguring NGINX Endpoints:\n%v", err) + return err + } + + n.runningConfig = pcfg + + return nil +} + // GetWarnings returns a list of warnings an Ingress gets when being created. // The warnings are going to be used in an admission webhook, and they represent // a list of messages that users need to be aware (like deprecation notices) diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 578d5b4e8..73f6fc100 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -141,6 +141,7 @@ func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXContro config.DisableSyncEvents) n.syncQueue = task.NewTaskQueue(n.syncIngress) + n.syncEndpointsQueue = task.NewTaskQueue(n.syncEndpoints) if config.UpdateStatus { n.syncStatus = status.NewStatusSyncer(status.Config{ @@ -223,6 +224,8 @@ type NGINXController struct { syncQueue *task.Queue + syncEndpointsQueue *task.Queue + syncStatus status.Syncer syncRateLimiter flowcontrol.RateLimiter @@ -309,6 +312,7 @@ func (n *NGINXController) Start() { n.start(cmd) go n.syncQueue.Run(time.Second, n.stopCh) + go n.syncEndpointsQueue.Run(time.Second, n.stopCh) // force initial sync n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync")) @@ -358,6 +362,11 @@ func (n *NGINXController) Start() { continue } + if evt.Type == store.EndpointEvent { + n.syncEndpointsQueue.EnqueueTask(evt.Obj) + continue + } + n.syncQueue.EnqueueSkippableTask(evt.Obj) } else { klog.Warningf("Unexpected event type received %T", event) diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index 4288785de..7b08ef653 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -120,6 +120,8 @@ const ( DeleteEvent EventType = "DELETE" // ConfigurationEvent event associated when a controller configuration object is created or updated ConfigurationEvent EventType = "CONFIGURATION" + // EndpointEvent event associated with an endpoint added, updated or deleted in an informer + EndpointEvent EventType = "ENDPOINT" ) // Event holds the context of an event. @@ -705,13 +707,13 @@ func New( epsEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { updateCh.In() <- Event{ - Type: CreateEvent, + Type: EndpointEvent, Obj: obj, } }, DeleteFunc: func(obj interface{}) { updateCh.In() <- Event{ - Type: DeleteEvent, + Type: EndpointEvent, Obj: obj, } }, @@ -726,7 +728,7 @@ func New( } if !reflect.DeepEqual(ceps.Endpoints, oeps.Endpoints) { updateCh.In() <- Event{ - Type: UpdateEvent, + Type: EndpointEvent, Obj: cur, } } diff --git a/pkg/apis/ingress/types_equals.go b/pkg/apis/ingress/types_equals.go index eeed9a06e..a70ba492c 100644 --- a/pkg/apis/ingress/types_equals.go +++ b/pkg/apis/ingress/types_equals.go @@ -49,12 +49,12 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool { } } - match = compareL4Service(c1.TCPEndpoints, c2.TCPEndpoints) + match = CompareL4Service(c1.TCPEndpoints, c2.TCPEndpoints) if !match { return false } - match = compareL4Service(c1.UDPEndpoints, c2.UDPEndpoints) + match = CompareL4Service(c1.UDPEndpoints, c2.UDPEndpoints) if !match { return false } @@ -655,6 +655,6 @@ var compareL4ServiceFunc = func(e1, e2 interface{}) bool { return (&b1).Equal(&b2) } -func compareL4Service(a, b []L4Service) bool { +func CompareL4Service(a, b []L4Service) bool { return sets.Compare(a, b, compareL4ServiceFunc) } diff --git a/pkg/apis/ingress/types_equals_test.go b/pkg/apis/ingress/types_equals_test.go index 53643f912..904596abc 100644 --- a/pkg/apis/ingress/types_equals_test.go +++ b/pkg/apis/ingress/types_equals_test.go @@ -129,7 +129,7 @@ func TestL4ServiceElementsMatch(t *testing.T) { } for _, testCase := range testCases { - result := compareL4Service(testCase.listA, testCase.listB) + result := CompareL4Service(testCase.listA, testCase.listB) if result != testCase.expected { t.Errorf("expected %v but returned %v (%v - %v)", testCase.expected, result, testCase.listA, testCase.listB) } diff --git a/pkg/apis/ingress/types_updates.go b/pkg/apis/ingress/types_updates.go new file mode 100644 index 000000000..b1e27fd03 --- /dev/null +++ b/pkg/apis/ingress/types_updates.go @@ -0,0 +1,31 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ingress + +func (a *Configuration) UpdateWithoutEndpoints(b *Configuration) { + tcpEndpoints := a.TCPEndpoints + udpEndpoints := a.UDPEndpoints + + a = b + a.TCPEndpoints = tcpEndpoints + a.UDPEndpoints = udpEndpoints +} + +func (a *Configuration) UpdateEndpoints(tcpEndpoints, udpEndpoints []L4Service) { + a.TCPEndpoints = tcpEndpoints + a.UDPEndpoints = udpEndpoints +}