Separate update nginx config and Endpoints

Signed-off-by: zvlb <vl.zemtsov@gmail.com>
This commit is contained in:
zvlb 2024-02-29 16:39:26 +02:00
parent 7d6fa0badf
commit dea9975dc7
6 changed files with 79 additions and 16 deletions

View file

@ -163,7 +163,7 @@ func (n *NGINXController) GetPublishService() *apiv1.Service {
return s return s
} }
// syncIngress collects all the pieces required to assemble the NGINX // syncIngress collects all the pieces required to assemble the NGINX (without TCP and UPD Endpoints)
// configuration file and passes the resulting data structures to the backend // configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary. // (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error { func (n *NGINXController) syncIngress(interface{}) error {
@ -254,7 +254,33 @@ func (n *NGINXController) syncIngress(interface{}) error {
rc := utilingress.GetRemovedCertificateSerialNumbers(n.runningConfig, pcfg) rc := utilingress.GetRemovedCertificateSerialNumbers(n.runningConfig, pcfg)
n.metricCollector.RemoveMetrics(ri, re, rc) n.metricCollector.RemoveMetrics(ri, re, rc)
n.runningConfig = pcfg n.runningConfig.UpdateWithoutEndpoints(pcfg)
return nil
}
// syncEndpoints collects and update only TCP and UDP Endpoints for NGINX
func (n *NGINXController) syncEndpoints(interface{}) error {
n.syncRateLimiter.Accept()
if n.syncEndpointsQueue.IsShuttingDown() {
return nil
}
TCPEndpoints := n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP)
UDPEndpoints := n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP)
if ingress.CompareL4Service(n.runningConfig.TCPEndpoints, TCPEndpoints) && ingress.CompareL4Service(n.runningConfig.UDPEndpoints, UDPEndpoints) {
klog.V(3).Infof("No configuration change detected, skipping backend reload")
return nil
}
err := updateStreamConfiguration(TCPEndpoints, UDPEndpoints)
if err != nil {
return err
}
n.runningConfig.UpdateEndpoints(TCPEndpoints, UDPEndpoints)
return nil return nil
} }

View file

@ -141,6 +141,7 @@ func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXContro
config.DisableSyncEvents) config.DisableSyncEvents)
n.syncQueue = task.NewTaskQueue(n.syncIngress) n.syncQueue = task.NewTaskQueue(n.syncIngress)
n.syncEndpointsQueue = task.NewTaskQueue(n.syncEndpoints)
if config.UpdateStatus { if config.UpdateStatus {
n.syncStatus = status.NewStatusSyncer(status.Config{ n.syncStatus = status.NewStatusSyncer(status.Config{
@ -223,6 +224,8 @@ type NGINXController struct {
syncQueue *task.Queue syncQueue *task.Queue
syncEndpointsQueue *task.Queue
syncStatus status.Syncer syncStatus status.Syncer
syncRateLimiter flowcontrol.RateLimiter syncRateLimiter flowcontrol.RateLimiter
@ -309,6 +312,7 @@ func (n *NGINXController) Start() {
n.start(cmd) n.start(cmd)
go n.syncQueue.Run(time.Second, n.stopCh) go n.syncQueue.Run(time.Second, n.stopCh)
go n.syncEndpointsQueue.Run(time.Second, n.stopCh)
// force initial sync // force initial sync
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync")) n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
@ -819,13 +823,13 @@ func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) erro
} }
} }
streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints) // streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
if streamConfigurationChanged { // if streamConfigurationChanged {
err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints) // err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
if err != nil { // if err != nil {
return err // return err
} // }
} // }
serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers) serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
if serversChanged { if serversChanged {

View file

@ -120,6 +120,8 @@ const (
DeleteEvent EventType = "DELETE" DeleteEvent EventType = "DELETE"
// ConfigurationEvent event associated when a controller configuration object is created or updated // ConfigurationEvent event associated when a controller configuration object is created or updated
ConfigurationEvent EventType = "CONFIGURATION" ConfigurationEvent EventType = "CONFIGURATION"
// EndpointEvent event associated with an endpoint added, updated or deleted in an informer
EndpointEvent EventType = "ENDPOINT"
) )
// Event holds the context of an event. // Event holds the context of an event.
@ -705,13 +707,13 @@ func New(
epsEventHandler := cache.ResourceEventHandlerFuncs{ epsEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
updateCh.In() <- Event{ updateCh.In() <- Event{
Type: CreateEvent, Type: EndpointEvent,
Obj: obj, Obj: obj,
} }
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
updateCh.In() <- Event{ updateCh.In() <- Event{
Type: DeleteEvent, Type: EndpointEvent,
Obj: obj, Obj: obj,
} }
}, },
@ -726,7 +728,7 @@ func New(
} }
if !reflect.DeepEqual(ceps.Endpoints, oeps.Endpoints) { if !reflect.DeepEqual(ceps.Endpoints, oeps.Endpoints) {
updateCh.In() <- Event{ updateCh.In() <- Event{
Type: UpdateEvent, Type: EndpointEvent,
Obj: cur, Obj: cur,
} }
} }

View file

@ -49,12 +49,12 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool {
} }
} }
match = compareL4Service(c1.TCPEndpoints, c2.TCPEndpoints) match = CompareL4Service(c1.TCPEndpoints, c2.TCPEndpoints)
if !match { if !match {
return false return false
} }
match = compareL4Service(c1.UDPEndpoints, c2.UDPEndpoints) match = CompareL4Service(c1.UDPEndpoints, c2.UDPEndpoints)
if !match { if !match {
return false return false
} }
@ -655,6 +655,6 @@ var compareL4ServiceFunc = func(e1, e2 interface{}) bool {
return (&b1).Equal(&b2) return (&b1).Equal(&b2)
} }
func compareL4Service(a, b []L4Service) bool { func CompareL4Service(a, b []L4Service) bool {
return sets.Compare(a, b, compareL4ServiceFunc) return sets.Compare(a, b, compareL4ServiceFunc)
} }

View file

@ -129,7 +129,7 @@ func TestL4ServiceElementsMatch(t *testing.T) {
} }
for _, testCase := range testCases { for _, testCase := range testCases {
result := compareL4Service(testCase.listA, testCase.listB) result := CompareL4Service(testCase.listA, testCase.listB)
if result != testCase.expected { if result != testCase.expected {
t.Errorf("expected %v but returned %v (%v - %v)", testCase.expected, result, testCase.listA, testCase.listB) t.Errorf("expected %v but returned %v (%v - %v)", testCase.expected, result, testCase.listA, testCase.listB)
} }

View file

@ -0,0 +1,31 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ingress
func (a *Configuration) UpdateWithoutEndpoints(b *Configuration) {
tcpEndpoints := a.TCPEndpoints
udpEndpoints := a.UDPEndpoints
a = b
a.TCPEndpoints = tcpEndpoints
a.UDPEndpoints = udpEndpoints
}
func (a *Configuration) UpdateEndpoints(tcpEndpoints, udpEndpoints []L4Service) {
a.TCPEndpoints = tcpEndpoints
a.UDPEndpoints = udpEndpoints
}