diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index b137ce8e1..60d8f29a4 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -167,7 +167,7 @@ func (n *NGINXController) syncIngress(interface{}) error { } err := wait.ExponentialBackoff(retry, func() (bool, error) { - err := configureDynamically(pcfg) + err := n.configureDynamically(pcfg) if err == nil { klog.V(2).Infof("Dynamic reconfiguration succeeded.") return true, nil diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 992b813cf..bfbaa2c0e 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -27,6 +27,7 @@ import ( "os" "os/exec" "path/filepath" + "reflect" "strconv" "strings" "sync" @@ -849,10 +850,104 @@ func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configurati // configureDynamically encodes new Backends in JSON format and POSTs the // payload to an internal HTTP endpoint handled by Lua. -func configureDynamically(pcfg *ingress.Configuration) error { - backends := make([]*ingress.Backend, len(pcfg.Backends)) +func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error { + backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends) + if backendsChanged { + err := configureBackends(pcfg.Backends) + if err != nil { + return err + } + } - for i, backend := range pcfg.Backends { + streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints) + if streamConfigurationChanged { + err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints) + if err != nil { + return err + } + } + + if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount { + statusCode, _, err := nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{ + ControllerPodsCount: pcfg.ControllerPodsCount, + }) + if err != nil { + return err + } + if statusCode != http.StatusCreated { + return fmt.Errorf("unexpected error code: %d", statusCode) + } + } + + serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers) + if serversChanged { + err := configureCertificates(pcfg.Servers) + if err != nil { + return err + } + } + + return nil +} + +func updateStreamConfiguration(TCPEndpoints []ingress.L4Service, UDPEndpoints []ingress.L4Service) error { + streams := make([]ingress.Backend, 0) + for _, ep := range TCPEndpoints { + var service *apiv1.Service + if ep.Service != nil { + service = &apiv1.Service{Spec: ep.Service.Spec} + } + + key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String()) + streams = append(streams, ingress.Backend{ + Name: key, + Endpoints: ep.Endpoints, + Port: intstr.FromInt(ep.Port), + Service: service, + }) + } + for _, ep := range UDPEndpoints { + var service *apiv1.Service + if ep.Service != nil { + service = &apiv1.Service{Spec: ep.Service.Spec} + } + + key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String()) + streams = append(streams, ingress.Backend{ + Name: key, + Endpoints: ep.Endpoints, + Port: intstr.FromInt(ep.Port), + Service: service, + }) + } + + conn, err := net.Dial("unix", nginx.StreamSocket) + if err != nil { + return err + } + defer conn.Close() + + buf, err := json.Marshal(streams) + if err != nil { + return err + } + + _, err = conn.Write(buf) + if err != nil { + return err + } + _, err = fmt.Fprintf(conn, "\r\n") + if err != nil { + return err + } + + return nil +} + +func configureBackends(rawBackends []*ingress.Backend) error { + backends := make([]*ingress.Backend, len(rawBackends)) + + for i, backend := range rawBackends { var service *apiv1.Service if backend.Service != nil { service = &apiv1.Service{Spec: backend.Service.Spec} @@ -891,90 +986,15 @@ func configureDynamically(pcfg *ingress.Configuration) error { return fmt.Errorf("unexpected error code: %d", statusCode) } - streams := make([]ingress.Backend, 0) - for _, ep := range pcfg.TCPEndpoints { - var service *apiv1.Service - if ep.Service != nil { - service = &apiv1.Service{Spec: ep.Service.Spec} - } - - key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String()) - streams = append(streams, ingress.Backend{ - Name: key, - Endpoints: ep.Endpoints, - Port: intstr.FromInt(ep.Port), - Service: service, - }) - } - for _, ep := range pcfg.UDPEndpoints { - var service *apiv1.Service - if ep.Service != nil { - service = &apiv1.Service{Spec: ep.Service.Spec} - } - - key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String()) - streams = append(streams, ingress.Backend{ - Name: key, - Endpoints: ep.Endpoints, - Port: intstr.FromInt(ep.Port), - Service: service, - }) - } - - err = updateStreamConfiguration(streams) - if err != nil { - return err - } - - statusCode, _, err = nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{ - ControllerPodsCount: pcfg.ControllerPodsCount, - }) - if err != nil { - return err - } - - if statusCode != http.StatusCreated { - return fmt.Errorf("unexpected error code: %d", statusCode) - } - - err = configureCertificates(pcfg) - if err != nil { - return err - } - - return nil -} - -func updateStreamConfiguration(streams []ingress.Backend) error { - conn, err := net.Dial("unix", nginx.StreamSocket) - if err != nil { - return err - } - defer conn.Close() - - buf, err := json.Marshal(streams) - if err != nil { - return err - } - - _, err = conn.Write(buf) - if err != nil { - return err - } - _, err = fmt.Fprintf(conn, "\r\n") - if err != nil { - return err - } - return nil } // configureCertificates JSON encodes certificates and POSTs it to an internal HTTP endpoint // that is handled by Lua -func configureCertificates(pcfg *ingress.Configuration) error { +func configureCertificates(rawServers []*ingress.Server) error { servers := make([]*ingress.Server, 0) - for _, server := range pcfg.Servers { + for _, server := range rawServers { if server.SSLCert == nil { continue } @@ -996,7 +1016,7 @@ func configureCertificates(pcfg *ingress.Configuration) error { } } - redirects := buildRedirects(pcfg.Servers) + redirects := buildRedirects(rawServers) for _, redirect := range redirects { if redirect.SSLCert == nil { continue diff --git a/internal/ingress/controller/nginx_test.go b/internal/ingress/controller/nginx_test.go index 692ca9530..7590fceb3 100644 --- a/internal/ingress/controller/nginx_test.go +++ b/internal/ingress/controller/nginx_test.go @@ -162,6 +162,13 @@ func TestConfigureDynamically(t *testing.T) { defer streamListener.Close() defer os.Remove(nginx.StreamSocket) + endpointStats := map[string]int{"/configuration/backends": 0, "/configuration/general": 0, "/configuration/servers": 0} + resetEndpointStats := func() { + for k := range endpointStats { + endpointStats[k] = 0 + } + } + server := &httptest.Server{ Listener: listener, Config: &http.Server{ @@ -178,6 +185,8 @@ func TestConfigureDynamically(t *testing.T) { } body := string(b) + endpointStats[r.URL.Path] += 1 + switch r.URL.Path { case "/configuration/backends": { @@ -246,14 +255,67 @@ func TestConfigureDynamically(t *testing.T) { ControllerPodsCount: 2, } - err = configureDynamically(commonConfig) + n := &NGINXController{ + runningConfig: &ingress.Configuration{}, + cfg: &Configuration{}, + } + + err = n.configureDynamically(commonConfig) if err != nil { t.Errorf("unexpected error posting dynamic configuration: %v", err) } - if commonConfig.Backends[0].Endpoints[0].Target != target { t.Errorf("unexpected change in the configuration object after configureDynamically invocation") } + for endpoint, count := range endpointStats { + if count != 1 { + t.Errorf("Expected %v to receive %d requests but received %d.", endpoint, 1, count) + } + } + + resetEndpointStats() + n.runningConfig.Backends = backends + err = n.configureDynamically(commonConfig) + if err != nil { + t.Errorf("unexpected error posting dynamic configuration: %v", err) + } + for endpoint, count := range endpointStats { + if endpoint == "/configuration/backends" { + if count != 0 { + t.Errorf("Expected %v to receive %d requests but received %d.", endpoint, 0, count) + } + } else if count != 1 { + t.Errorf("Expected %v to receive %d requests but received %d.", endpoint, 1, count) + } + } + + resetEndpointStats() + n.runningConfig.Servers = servers + err = n.configureDynamically(commonConfig) + if err != nil { + t.Errorf("unexpected error posting dynamic configuration: %v", err) + } + if count, _ := endpointStats["/configuration/backends"]; count != 0 { + t.Errorf("Expected %v to receive %d requests but received %d.", "/configuration/backends", 0, count) + } + if count, _ := endpointStats["/configuration/servers"]; count != 0 { + t.Errorf("Expected %v to receive %d requests but received %d.", "/configuration/servers", 0, count) + } + if count, _ := endpointStats["/configuration/general"]; count != 1 { + t.Errorf("Expected %v to receive %d requests but received %d.", "/configuration/general", 0, count) + } + + resetEndpointStats() + n.runningConfig.ControllerPodsCount = commonConfig.ControllerPodsCount + err = n.configureDynamically(commonConfig) + if err != nil { + t.Errorf("unexpected error posting dynamic configuration: %v", err) + } + for endpoint, count := range endpointStats { + if count != 0 { + t.Errorf("Expected %v to receive %d requests but received %d.", endpoint, 0, count) + } + } } func TestConfigureCertificates(t *testing.T) { @@ -313,11 +375,7 @@ func TestConfigureCertificates(t *testing.T) { defer server.Close() server.Start() - commonConfig := &ingress.Configuration{ - Servers: servers, - } - - err = configureCertificates(commonConfig) + err = configureCertificates(servers) if err != nil { t.Errorf("unexpected error posting dynamic certificate configuration: %v", err) }