Separate update nginx config and Endpoints

Signed-off-by: zvlb <vl.zemtsov@gmail.com>
This commit is contained in:
zvlb 2024-02-29 16:39:26 +02:00
parent dc999d81da
commit c632426afe
6 changed files with 99 additions and 7 deletions

View file

@ -259,6 +259,56 @@ func (n *NGINXController) syncIngress(interface{}) error {
return nil
}
// syncEndpoints collects and update only Dynamic configs
// Triggered for Endpoints changes
func (n *NGINXController) syncEndpoints(interface{}) error {
n.syncRateLimiter.Accept()
if n.syncEndpointsQueue.IsShuttingDown() {
return nil
}
ings := n.store.ListIngresses()
_, _, pcfg := n.getConfiguration(ings)
if n.runningConfig.Equal(pcfg) {
klog.V(3).Infof("No configuration change detected, skipping backend reload")
return nil
}
retry := wait.Backoff{
Steps: 1 + n.cfg.DynamicConfigurationRetries,
Duration: time.Second,
Factor: 1.3,
Jitter: 0.1,
}
retriesRemaining := retry.Steps
err := wait.ExponentialBackoff(retry, func() (bool, error) {
err := n.configureDynamically(pcfg)
if err == nil {
klog.V(2).Infof("Endpoints reconfiguration succeeded.")
return true, nil
}
retriesRemaining--
if retriesRemaining > 0 {
klog.Warningf("Endpoints reconfiguration failed (retrying; %d retries left): %v", retriesRemaining, err)
return false, nil
}
klog.Warningf("Endpoints reconfiguration failed: %v", err)
return false, err
})
if err != nil {
klog.Errorf("Unexpected failure reconfiguring NGINX Endpoints:\n%v", err)
return err
}
n.runningConfig = pcfg
return nil
}
// GetWarnings returns a list of warnings an Ingress gets when being created.
// The warnings are going to be used in an admission webhook, and they represent
// a list of messages that users need to be aware (like deprecation notices)

View file

@ -141,6 +141,7 @@ func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXContro
config.DisableSyncEvents)
n.syncQueue = task.NewTaskQueue(n.syncIngress)
n.syncEndpointsQueue = task.NewTaskQueue(n.syncEndpoints)
if config.UpdateStatus {
n.syncStatus = status.NewStatusSyncer(status.Config{
@ -223,6 +224,8 @@ type NGINXController struct {
syncQueue *task.Queue
syncEndpointsQueue *task.Queue
syncStatus status.Syncer
syncRateLimiter flowcontrol.RateLimiter
@ -309,6 +312,7 @@ func (n *NGINXController) Start() {
n.start(cmd)
go n.syncQueue.Run(time.Second, n.stopCh)
go n.syncEndpointsQueue.Run(time.Second, n.stopCh)
// force initial sync
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
@ -358,6 +362,11 @@ func (n *NGINXController) Start() {
continue
}
if evt.Type == store.EndpointEvent {
n.syncEndpointsQueue.EnqueueTask(evt.Obj)
continue
}
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
klog.Warningf("Unexpected event type received %T", event)

View file

@ -120,6 +120,8 @@ const (
DeleteEvent EventType = "DELETE"
// ConfigurationEvent event associated when a controller configuration object is created or updated
ConfigurationEvent EventType = "CONFIGURATION"
// EndpointEvent event associated with an endpoint added, updated or deleted in an informer
EndpointEvent EventType = "ENDPOINT"
)
// Event holds the context of an event.
@ -705,13 +707,13 @@ func New(
epsEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: CreateEvent,
Type: EndpointEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: DeleteEvent,
Type: EndpointEvent,
Obj: obj,
}
},
@ -726,7 +728,7 @@ func New(
}
if !reflect.DeepEqual(ceps.Endpoints, oeps.Endpoints) {
updateCh.In() <- Event{
Type: UpdateEvent,
Type: EndpointEvent,
Obj: cur,
}
}

View file

@ -49,12 +49,12 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool {
}
}
match = compareL4Service(c1.TCPEndpoints, c2.TCPEndpoints)
match = CompareL4Service(c1.TCPEndpoints, c2.TCPEndpoints)
if !match {
return false
}
match = compareL4Service(c1.UDPEndpoints, c2.UDPEndpoints)
match = CompareL4Service(c1.UDPEndpoints, c2.UDPEndpoints)
if !match {
return false
}
@ -655,6 +655,6 @@ var compareL4ServiceFunc = func(e1, e2 interface{}) bool {
return (&b1).Equal(&b2)
}
func compareL4Service(a, b []L4Service) bool {
func CompareL4Service(a, b []L4Service) bool {
return sets.Compare(a, b, compareL4ServiceFunc)
}

View file

@ -129,7 +129,7 @@ func TestL4ServiceElementsMatch(t *testing.T) {
}
for _, testCase := range testCases {
result := compareL4Service(testCase.listA, testCase.listB)
result := CompareL4Service(testCase.listA, testCase.listB)
if result != testCase.expected {
t.Errorf("expected %v but returned %v (%v - %v)", testCase.expected, result, testCase.listA, testCase.listB)
}

View file

@ -0,0 +1,31 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ingress
func (a *Configuration) UpdateWithoutEndpoints(b *Configuration) {
tcpEndpoints := a.TCPEndpoints
udpEndpoints := a.UDPEndpoints
a = b
a.TCPEndpoints = tcpEndpoints
a.UDPEndpoints = udpEndpoints
}
func (a *Configuration) UpdateEndpoints(tcpEndpoints, udpEndpoints []L4Service) {
a.TCPEndpoints = tcpEndpoints
a.UDPEndpoints = udpEndpoints
}