diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 4f84a74e8..391c4eea6 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -64,7 +64,8 @@ import ( ) const ( - ngxHealthPath = "/healthz" + ngxHealthPath = "/healthz" + nginxStreamSocket = "/tmp/ingress-stream.sock" ) var ( @@ -794,13 +795,6 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif return err } - streamSocket := "/tmp/ingress-stream.sock" - conn, err := net.Dial("unix", streamSocket) - if err != nil { - return err - } - defer conn.Close() - streams := make([]ingress.Backend, 0) for _, ep := range pcfg.TCPEndpoints { key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String()) @@ -819,6 +813,28 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif }) } + err = updateStreamConfiguration(streams) + if err != nil { + return err + } + + if isDynamicCertificatesEnabled { + err = configureCertificates(pcfg, port) + if err != nil { + return err + } + } + + return nil +} + +func updateStreamConfiguration(streams []ingress.Backend) error { + conn, err := net.Dial("unix", nginxStreamSocket) + if err != nil { + return err + } + defer conn.Close() + buf, err := json.Marshal(streams) if err != nil { return err @@ -832,14 +848,6 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif if err != nil { return err } - defer conn.Close() - - if isDynamicCertificatesEnabled { - err = configureCertificates(pcfg, port) - if err != nil { - return err - } - } return nil } diff --git a/internal/ingress/controller/nginx_test.go b/internal/ingress/controller/nginx_test.go index e2a815c85..b18cb1a92 100644 --- a/internal/ingress/controller/nginx_test.go +++ b/internal/ingress/controller/nginx_test.go @@ -24,6 +24,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" jsoniter "github.com/json-iterator/go" apiv1 "k8s.io/api/core/v1" @@ -146,7 +147,33 @@ func TestIsDynamicConfigurationEnough(t *testing.T) { } } +func mockUnixSocket(t *testing.T) net.Listener { + l, err := net.Listen("unix", nginxStreamSocket) + if err != nil { + t.Fatalf("unexpected error creating unix socket: %v", err) + } + if l == nil { + t.Fatalf("expected a listener but none returned") + } + + go func() { + for { + conn, err := l.Accept() + if err != nil { + continue + } + + time.Sleep(100 * time.Millisecond) + defer conn.Close() + } + }() + + return l +} func TestConfigureDynamically(t *testing.T) { + l := mockUnixSocket(t) + defer l.Close() + target := &apiv1.ObjectReference{} backends := []*ingress.Backend{{ diff --git a/internal/ingress/types_equals.go b/internal/ingress/types_equals.go index 64cf268ff..5b1808c92 100644 --- a/internal/ingress/types_equals.go +++ b/internal/ingress/types_equals.go @@ -68,6 +68,7 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool { return false } } + if len(c1.UDPEndpoints) != len(c2.UDPEndpoints) { return false } @@ -83,10 +84,10 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool { return false } } + if len(c1.PassthroughBackends) != len(c2.PassthroughBackends) { return false } - for _, ptb1 := range c1.PassthroughBackends { found := false for _, ptb2 := range c2.PassthroughBackends { diff --git a/rootfs/etc/nginx/lua/tcp_udp_balancer.lua b/rootfs/etc/nginx/lua/tcp_udp_balancer.lua index 2c09224c0..75d29c5bd 100644 --- a/rootfs/etc/nginx/lua/tcp_udp_balancer.lua +++ b/rootfs/etc/nginx/lua/tcp_udp_balancer.lua @@ -148,14 +148,7 @@ function _M.balance() return end - local ctx = ngx.ctx - if not ctx.has_run then - ctx.has_run = true - local _, err = ngx_balancer.set_more_tries(1) - if err then - ngx.log(ngx.ERR, "failed to set more tries: ", err) - end - end + ngx_balancer.set_more_tries(1) local ok, err = ngx_balancer.set_current_peer(peer) if not ok then diff --git a/test/e2e/tcpudp/tcp.go b/test/e2e/tcpudp/tcp.go index f5f481c8f..5391d0488 100644 --- a/test/e2e/tcpudp/tcp.go +++ b/test/e2e/tcpudp/tcp.go @@ -91,7 +91,7 @@ var _ = framework.IngressNginxDescribe("TCP Feature", func() { resp, _, errs := gorequest.New(). Get(fmt.Sprintf("http://%v:%v", ip, port)). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(len(errs)).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(200)) }) })