Merge pull request #4451 from ElvinEfendi/avoid-redundant-lua-sync

post data to Lua only if it changes
This commit is contained in:
Kubernetes Prow Robot 2019-08-15 16:20:34 -07:00 committed by GitHub
commit 4b0aabc0c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 167 additions and 89 deletions

View file

@ -167,7 +167,7 @@ func (n *NGINXController) syncIngress(interface{}) error {
}
err := wait.ExponentialBackoff(retry, func() (bool, error) {
err := configureDynamically(pcfg)
err := n.configureDynamically(pcfg)
if err == nil {
klog.V(2).Infof("Dynamic reconfiguration succeeded.")
return true, nil

View file

@ -27,6 +27,7 @@ import (
"os"
"os/exec"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
@ -849,10 +850,104 @@ func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configurati
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func configureDynamically(pcfg *ingress.Configuration) error {
backends := make([]*ingress.Backend, len(pcfg.Backends))
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
if backendsChanged {
err := configureBackends(pcfg.Backends)
if err != nil {
return err
}
}
for i, backend := range pcfg.Backends {
streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
if streamConfigurationChanged {
err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
if err != nil {
return err
}
}
if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount {
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
ControllerPodsCount: pcfg.ControllerPodsCount,
})
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
}
serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
if serversChanged {
err := configureCertificates(pcfg.Servers)
if err != nil {
return err
}
}
return nil
}
func updateStreamConfiguration(TCPEndpoints []ingress.L4Service, UDPEndpoints []ingress.L4Service) error {
streams := make([]ingress.Backend, 0)
for _, ep := range TCPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
for _, ep := range UDPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
conn, err := net.Dial("unix", nginx.StreamSocket)
if err != nil {
return err
}
defer conn.Close()
buf, err := json.Marshal(streams)
if err != nil {
return err
}
_, err = conn.Write(buf)
if err != nil {
return err
}
_, err = fmt.Fprintf(conn, "\r\n")
if err != nil {
return err
}
return nil
}
func configureBackends(rawBackends []*ingress.Backend) error {
backends := make([]*ingress.Backend, len(rawBackends))
for i, backend := range rawBackends {
var service *apiv1.Service
if backend.Service != nil {
service = &apiv1.Service{Spec: backend.Service.Spec}
@ -891,90 +986,15 @@ func configureDynamically(pcfg *ingress.Configuration) error {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
streams := make([]ingress.Backend, 0)
for _, ep := range pcfg.TCPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
for _, ep := range pcfg.UDPEndpoints {
var service *apiv1.Service
if ep.Service != nil {
service = &apiv1.Service{Spec: ep.Service.Spec}
}
key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
Service: service,
})
}
err = updateStreamConfiguration(streams)
if err != nil {
return err
}
statusCode, _, err = nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{
ControllerPodsCount: pcfg.ControllerPodsCount,
})
if err != nil {
return err
}
if statusCode != http.StatusCreated {
return fmt.Errorf("unexpected error code: %d", statusCode)
}
err = configureCertificates(pcfg)
if err != nil {
return err
}
return nil
}
func updateStreamConfiguration(streams []ingress.Backend) error {
conn, err := net.Dial("unix", nginx.StreamSocket)
if err != nil {
return err
}
defer conn.Close()
buf, err := json.Marshal(streams)
if err != nil {
return err
}
_, err = conn.Write(buf)
if err != nil {
return err
}
_, err = fmt.Fprintf(conn, "\r\n")
if err != nil {
return err
}
return nil
}
// configureCertificates JSON encodes certificates and POSTs it to an internal HTTP endpoint
// that is handled by Lua
func configureCertificates(pcfg *ingress.Configuration) error {
func configureCertificates(rawServers []*ingress.Server) error {
servers := make([]*ingress.Server, 0)
for _, server := range pcfg.Servers {
for _, server := range rawServers {
if server.SSLCert == nil {
continue
}
@ -996,7 +1016,7 @@ func configureCertificates(pcfg *ingress.Configuration) error {
}
}
redirects := buildRedirects(pcfg.Servers)
redirects := buildRedirects(rawServers)
for _, redirect := range redirects {
if redirect.SSLCert == nil {
continue

View file

@ -162,6 +162,13 @@ func TestConfigureDynamically(t *testing.T) {
defer streamListener.Close()
defer os.Remove(nginx.StreamSocket)
endpointStats := map[string]int{"/configuration/backends": 0, "/configuration/general": 0, "/configuration/servers": 0}
resetEndpointStats := func() {
for k := range endpointStats {
endpointStats[k] = 0
}
}
server := &httptest.Server{
Listener: listener,
Config: &http.Server{
@ -178,6 +185,8 @@ func TestConfigureDynamically(t *testing.T) {
}
body := string(b)
endpointStats[r.URL.Path] += 1
switch r.URL.Path {
case "/configuration/backends":
{
@ -246,14 +255,67 @@ func TestConfigureDynamically(t *testing.T) {
ControllerPodsCount: 2,
}
err = configureDynamically(commonConfig)
n := &NGINXController{
runningConfig: &ingress.Configuration{},
cfg: &Configuration{},
}
err = n.configureDynamically(commonConfig)
if err != nil {
t.Errorf("unexpected error posting dynamic configuration: %v", err)
}
if commonConfig.Backends[0].Endpoints[0].Target != target {
t.Errorf("unexpected change in the configuration object after configureDynamically invocation")
}
for endpoint, count := range endpointStats {
if count != 1 {
t.Errorf("Expected %v to receive %d requests but received %d.", endpoint, 1, count)
}
}
resetEndpointStats()
n.runningConfig.Backends = backends
err = n.configureDynamically(commonConfig)
if err != nil {
t.Errorf("unexpected error posting dynamic configuration: %v", err)
}
for endpoint, count := range endpointStats {
if endpoint == "/configuration/backends" {
if count != 0 {
t.Errorf("Expected %v to receive %d requests but received %d.", endpoint, 0, count)
}
} else if count != 1 {
t.Errorf("Expected %v to receive %d requests but received %d.", endpoint, 1, count)
}
}
resetEndpointStats()
n.runningConfig.Servers = servers
err = n.configureDynamically(commonConfig)
if err != nil {
t.Errorf("unexpected error posting dynamic configuration: %v", err)
}
if count, _ := endpointStats["/configuration/backends"]; count != 0 {
t.Errorf("Expected %v to receive %d requests but received %d.", "/configuration/backends", 0, count)
}
if count, _ := endpointStats["/configuration/servers"]; count != 0 {
t.Errorf("Expected %v to receive %d requests but received %d.", "/configuration/servers", 0, count)
}
if count, _ := endpointStats["/configuration/general"]; count != 1 {
t.Errorf("Expected %v to receive %d requests but received %d.", "/configuration/general", 0, count)
}
resetEndpointStats()
n.runningConfig.ControllerPodsCount = commonConfig.ControllerPodsCount
err = n.configureDynamically(commonConfig)
if err != nil {
t.Errorf("unexpected error posting dynamic configuration: %v", err)
}
for endpoint, count := range endpointStats {
if count != 0 {
t.Errorf("Expected %v to receive %d requests but received %d.", endpoint, 0, count)
}
}
}
func TestConfigureCertificates(t *testing.T) {
@ -313,11 +375,7 @@ func TestConfigureCertificates(t *testing.T) {
defer server.Close()
server.Start()
commonConfig := &ingress.Configuration{
Servers: servers,
}
err = configureCertificates(commonConfig)
err = configureCertificates(servers)
if err != nil {
t.Errorf("unexpected error posting dynamic certificate configuration: %v", err)
}