Allow the usage of Services as Upstream on a global level (#7469)

It is possible to change this behavior on an ingress level, which works
well when you only have a few of them. When running several dozen
ingress and with a high change rate of running pods it makes it easier
to define this configuration on a global level.

This change is completely backwards compatible, only adding the
possibility of defining a new key in the configmap.
This commit is contained in:
Renan Gonçalves 2021-09-07 21:47:15 +02:00 committed by GitHub
parent 82e1fc8cac
commit 48601bcd0e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 209 additions and 6 deletions

View file

@ -203,6 +203,7 @@ The following table shows a configuration option's name, type, and the default v
|[global-rate-limit-memcached-max-idle-timeout](#global-rate-limit)|int|10000|
|[global-rate-limit-memcached-pool-size](#global-rate-limit)|int|50|
|[global-rate-limit-status-code](#global-rate-limit)|int|429|
|[service-upstream](#service-upstream)|bool|"false"|
## add-headers
@ -1224,3 +1225,8 @@ Configure `memcached` client for [Global Rate Limiting](https://github.com/kuber
These settings get used by [lua-resty-global-throttle](https://github.com/ElvinEfendi/lua-resty-global-throttle)
that ingress-nginx includes. Refer to the link to learn more about `lua-resty-global-throttle`.
## service-upstream
Set if the service's Cluster IP and port should be used instead of a list of all endpoints. This can be overwritten by an annotation on an Ingress rule.
_**default:**_ "false"

View file

@ -20,6 +20,7 @@ import (
networking "k8s.io/api/networking/v1"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/errors"
"k8s.io/ingress-nginx/internal/ingress/resolver"
)
@ -33,5 +34,13 @@ func NewParser(r resolver.Resolver) parser.IngressAnnotation {
}
func (s serviceUpstream) Parse(ing *networking.Ingress) (interface{}, error) {
return parser.GetBoolAnnotation("service-upstream", ing)
defBackend := s.r.GetDefaultBackend()
val, err := parser.GetBoolAnnotation("service-upstream", ing)
// A missing annotation is not a problem, just use the default
if err == errors.ErrMissingAnnotations {
return defBackend.ServiceUpstream, nil
}
return val, nil
}

View file

@ -23,6 +23,7 @@ import (
networking "k8s.io/api/networking/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/defaults"
"k8s.io/ingress-nginx/internal/ingress/resolver"
)
@ -119,3 +120,52 @@ func TestIngressAnnotationServiceUpstreamSetFalse(t *testing.T) {
t.Errorf("expected annotation value to be false, got true")
}
}
type mockBackend struct {
resolver.Mock
}
// GetDefaultBackend returns the backend that must be used as default
func (m mockBackend) GetDefaultBackend() defaults.Backend {
return defaults.Backend{
ServiceUpstream: true,
}
}
// Test that when we have a default configuration set on the Backend that is used
// when we don't have the annotation
func TestParseAnnotationsWithDefaultConfig(t *testing.T) {
ing := buildIngress()
val, _ := NewParser(mockBackend{}).Parse(ing)
enabled, ok := val.(bool)
if !ok {
t.Errorf("expected a bool type")
}
if !enabled {
t.Errorf("expected annotation value to be true, got false")
}
}
// Test that the annotation will disable the service upstream when enabled
// in the default configuration
func TestParseAnnotationsOverridesDefaultConfig(t *testing.T) {
ing := buildIngress()
data := map[string]string{}
data[parser.GetAnnotationWithPrefix("service-upstream")] = "false"
ing.SetAnnotations(data)
val, _ := NewParser(mockBackend{}).Parse(ing)
enabled, ok := val.(bool)
if !ok {
t.Errorf("expected a bool type")
}
if enabled {
t.Errorf("expected annotation value to be false, got true")
}
}

View file

@ -860,6 +860,7 @@ func NewDefault() Configuration {
ProxyBuffering: "off",
ProxyHTTPVersion: "1.1",
ProxyMaxTempFileSize: "1024m",
ServiceUpstream: false,
},
UpstreamKeepaliveConnections: 320,
UpstreamKeepaliveTimeout: 60,

View file

@ -161,4 +161,8 @@ type Backend struct {
// Sets the maximum temp file size when proxy-buffers capacity is exceeded.
// http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_max_temp_file_size
ProxyMaxTempFileSize string `json:"proxy-max-temp-file-size"`
// By default, the NGINX ingress controller uses a list of all endpoints (Pod IP/port) in the NGINX upstream configuration.
// It disables that behavior and instead uses a single upstream in NGINX, the service's Cluster IP and port.
ServiceUpstream bool `json:"service-upstream"`
}

View file

@ -0,0 +1,128 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package annotations
import (
"fmt"
"net/http"
"strings"
"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"
"k8s.io/ingress-nginx/test/e2e/framework"
"k8s.io/ingress-nginx/internal/nginx"
)
var _ = framework.DescribeAnnotation("service-upstream", func() {
f := framework.NewDefaultFramework("serviceupstream")
host := "serviceupstream"
ginkgo.BeforeEach(func() {
f.NewEchoDeployment()
})
ginkgo.Context("when using the default value (false) and enabling in the annotations", func() {
ginkgo.It("should use the Service Cluster IP and Port ", func() {
annotations := map[string]string{
"nginx.ingress.kubernetes.io/service-upstream": "true",
}
ing := framework.NewSingleIngress(host, "/", host, f.Namespace, framework.EchoService, 80, annotations)
f.EnsureIngress(ing)
f.WaitForNginxServer(host, func(server string) bool {
return strings.Contains(server, fmt.Sprintf("server_name %s", host))
})
ginkgo.By("checking if the service is reached")
f.HTTPTestClient().
GET("/").
WithHeader("Host", host).
Expect().
Status(http.StatusOK)
ginkgo.By("checking if the Service Cluster IP and Port are used")
s := f.GetService(f.Namespace, framework.EchoService)
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort)
output, err := f.ExecIngressPod(curlCmd)
assert.Nil(ginkgo.GinkgoT(), err)
assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`{"address":"%s"`, s.Spec.ClusterIP))
})
})
ginkgo.Context("when enabling in the configmap", func() {
ginkgo.It("should use the Service Cluster IP and Port ", func() {
annotations := map[string]string{}
f.UpdateNginxConfigMapData("service-upstream", "true")
ing := framework.NewSingleIngress(host, "/", host, f.Namespace, framework.EchoService, 80, annotations)
f.EnsureIngress(ing)
f.WaitForNginxServer(host, func(server string) bool {
return strings.Contains(server, fmt.Sprintf("server_name %s", host))
})
ginkgo.By("checking if the service is reached")
f.HTTPTestClient().
GET("/").
WithHeader("Host", host).
Expect().
Status(http.StatusOK)
ginkgo.By("checking if the Service Cluster IP and Port are used")
s := f.GetService(f.Namespace, framework.EchoService)
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort)
output, err := f.ExecIngressPod(curlCmd)
assert.Nil(ginkgo.GinkgoT(), err)
assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`{"address":"%s"`, s.Spec.ClusterIP))
})
})
ginkgo.Context("when enabling in the configmap and disabling in the annotations", func() {
ginkgo.It("should not use the Service Cluster IP and Port", func() {
annotations := map[string]string{
"nginx.ingress.kubernetes.io/service-upstream": "false",
}
f.UpdateNginxConfigMapData("service-upstream", "true")
ing := framework.NewSingleIngress(host, "/", host, f.Namespace, framework.EchoService, 80, annotations)
f.EnsureIngress(ing)
f.WaitForNginxServer(host, func(server string) bool {
return strings.Contains(server, fmt.Sprintf("server_name %s", host))
})
ginkgo.By("checking if the service is reached")
f.HTTPTestClient().
GET("/").
WithHeader("Host", host).
Expect().
Status(http.StatusOK)
ginkgo.By("checking if the Service Cluster IP and Port are not used")
s := f.GetService(f.Namespace, framework.EchoService)
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort)
output, err := f.ExecIngressPod(curlCmd)
assert.Nil(ginkgo.GinkgoT(), err)
assert.NotContains(ginkgo.GinkgoT(), output, fmt.Sprintf(`{"address":"%s"`, s.Spec.ClusterIP))
})
})
})

View file

@ -102,16 +102,21 @@ func (f *Framework) UpdateIngress(ingress *networking.Ingress) *networking.Ingre
return ing
}
// GetService gets a Service object from the given namespace, name and returns it, throws error if it does not exist.
func (f *Framework) GetService(namespace string, name string) *core.Service {
s, err := f.KubeClientSet.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
assert.Nil(ginkgo.GinkgoT(), err, "getting service")
assert.NotNil(ginkgo.GinkgoT(), s, "expected a service but none returned")
return s
}
// EnsureService creates a Service object and returns it, throws error if it already exists.
func (f *Framework) EnsureService(service *core.Service) *core.Service {
err := createServiceWithRetries(f.KubeClientSet, f.Namespace, service)
assert.Nil(ginkgo.GinkgoT(), err, "creating service")
s, err := f.KubeClientSet.CoreV1().Services(f.Namespace).Get(context.TODO(), service.Name, metav1.GetOptions{})
assert.Nil(ginkgo.GinkgoT(), err, "getting service")
assert.NotNil(ginkgo.GinkgoT(), s, "expected a service but none returned")
return s
return f.GetService(f.Namespace, service.Name)
}
// EnsureDeployment creates a Deployment object and returns it, throws error if it already exists.