From 5b2a9475dc2260dc15561fd117dc221b892464da Mon Sep 17 00:00:00 2001 From: Tomas Hulata Date: Mon, 16 Jan 2023 03:46:50 +0100 Subject: [PATCH] feat: support topology aware hints (#9165) * support topology aware hints Signed-off-by: tombokombo * add flag to enable topology and fixes Signed-off-by: tombokombo * update readme Signed-off-by: tombokombo * add e2e test Signed-off-by: tombokombo * isolate topology test Signed-off-by: tombokombo * gofmt fix Signed-off-by: tombokombo Signed-off-by: tombokombo --- charts/ingress-nginx/README.md | 1 + charts/ingress-nginx/templates/_params.tpl | 3 + charts/ingress-nginx/values.yaml | 4 + docs/user-guide/cli-arguments.md | 1 + internal/ingress/controller/controller.go | 55 +++- internal/ingress/controller/endpointslices.go | 31 ++- .../ingress/controller/endpointslices_test.go | 261 +++++++++++++++++- internal/k8s/main.go | 20 ++ pkg/flags/flags.go | 3 + test/e2e-image/e2e.sh | 12 +- .../namespace-overlays/topology/values.yaml | 51 ++++ test/e2e/endpointslices/topology.go | 112 ++++++++ test/e2e/framework/deployment.go | 22 +- test/e2e/kind.yaml | 6 + 14 files changed, 564 insertions(+), 18 deletions(-) create mode 100644 test/e2e-image/namespace-overlays/topology/values.yaml create mode 100644 test/e2e/endpointslices/topology.go diff --git a/charts/ingress-nginx/README.md b/charts/ingress-nginx/README.md index 00d85156e..817059af2 100644 --- a/charts/ingress-nginx/README.md +++ b/charts/ingress-nginx/README.md @@ -315,6 +315,7 @@ Kubernetes: `>=1.20.0-0` | controller.dnsPolicy | string | `"ClusterFirst"` | Optionally change this to ClusterFirstWithHostNet in case you have 'hostNetwork: true'. By default, while using host network, name resolution uses the host's DNS. If you wish nginx-controller to keep resolving names inside the k8s network, use ClusterFirstWithHostNet. | | controller.electionID | string | `""` | Election ID to use for status update, by default it uses the controller name combined with a suffix of 'leader' | | controller.enableMimalloc | bool | `true` | Enable mimalloc as a drop-in replacement for malloc. # ref: https://github.com/microsoft/mimalloc # | +| controller.enableTopologyAwareRouting | bool | `false` | This configuration enables Topology Aware Routing feature, used together with service annotation service.kubernetes.io/topology-aware-hints="auto" Defaults to false | | controller.existingPsp | string | `""` | Use an existing PSP instead of creating one | | controller.extraArgs | object | `{}` | Additional command line arguments to pass to nginx-ingress-controller E.g. to specify the default SSL certificate you can use | | controller.extraContainers | list | `[]` | Additional containers to be added to the controller pod. See https://github.com/lemonldap-ng-controller/lemonldap-ng-controller as example. | diff --git a/charts/ingress-nginx/templates/_params.tpl b/charts/ingress-nginx/templates/_params.tpl index 66c581fa6..a1aef01ae 100644 --- a/charts/ingress-nginx/templates/_params.tpl +++ b/charts/ingress-nginx/templates/_params.tpl @@ -51,6 +51,9 @@ {{- if .Values.controller.watchIngressWithoutClass }} - --watch-ingress-without-class=true {{- end }} +{{- if .Values.controller.enableTopologyAwareRouting }} +- --enable-topology-aware-routing=true +{{- end }} {{- range $key, $value := .Values.controller.extraArgs }} {{- /* Accept keys without values or with false as value */}} {{- if eq ($value | quote | len) 2 }} diff --git a/charts/ingress-nginx/values.yaml b/charts/ingress-nginx/values.yaml index a7fab0fe9..a6df47694 100644 --- a/charts/ingress-nginx/values.yaml +++ b/charts/ingress-nginx/values.yaml @@ -77,6 +77,10 @@ controller: # -- Process IngressClass per name (additionally as per spec.controller). ingressClassByName: false + # -- This configuration enables Topology Aware Routing feature, used together with service annotation service.kubernetes.io/topology-aware-hints="auto" + # Defaults to false + enableTopologyAwareRouting: false + # -- This configuration defines if Ingress Controller should allow users to set # their own *-snippet annotations, otherwise this is forbidden / dropped # when users add those annotations. diff --git a/docs/user-guide/cli-arguments.md b/docs/user-guide/cli-arguments.md index 4379d0b34..febc6f762 100644 --- a/docs/user-guide/cli-arguments.md +++ b/docs/user-guide/cli-arguments.md @@ -24,6 +24,7 @@ They are set in the container spec of the `ingress-nginx-controller` Deployment | `--enable-metrics` | Enables the collection of NGINX metrics. (default true) | | `--enable-ssl-chain-completion` | Autocomplete SSL certificate chains with missing intermediate CA certificates. Certificates uploaded to Kubernetes must have the "Authority Information Access" X.509 v3 extension for this to succeed. (default false)| | `--enable-ssl-passthrough` | Enable SSL Passthrough. (default false) | +| `--enable-topology-aware-routing` | Enable topology aware hints feature, needs service object annotation service.kubernetes.io/topology-aware-hints sets to auto. (default false) | | `--health-check-path` | URL path of the health check endpoint. Configured inside the NGINX status server. All requests received on the port defined by the healthz-port parameter are forwarded internally to this path. (default "/healthz") | | `--health-check-timeout` | Time limit, in seconds, for a probe to health-check-path to succeed. (default 10) | | `--healthz-port` | Port to use for the healthz endpoint. (default 10254) | diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 010eba162..8bacb3025 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -53,6 +53,7 @@ const ( defUpstreamName = "upstream-default-backend" defServerName = "_" rootLocation = "/" + emptyZone = "" ) // Configuration contains all the settings required by an Ingress controller @@ -131,6 +132,21 @@ type Configuration struct { DynamicConfigurationRetries int DisableSyncEvents bool + + EnableTopologyAwareRouting bool +} + +func getIngressPodZone(svc *apiv1.Service) string { + svcKey := k8s.MetaNamespaceKey(svc) + if svcZoneAnnotation, ok := svc.ObjectMeta.GetAnnotations()[apiv1.AnnotationTopologyAwareHints]; ok { + if strings.ToLower(svcZoneAnnotation) == "auto" { + if foundZone, ok := k8s.IngressNodeDetails.GetLabels()[apiv1.LabelTopologyZone]; ok { + klog.V(3).Infof("Svc has topology aware annotation enabled, try to use zone %q where controller pod is running for Service %q ", foundZone, svcKey) + return foundZone + } + } + } + return emptyZone } // GetPublishService returns the Service used to set the load-balancer status of Ingresses. @@ -429,6 +445,13 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr var endps []ingress.Endpoint /* #nosec */ targetPort, err := strconv.Atoi(svcPort) // #nosec + var zone string + if n.cfg.EnableTopologyAwareRouting { + zone = getIngressPodZone(svc) + } else { + zone = emptyZone + } + if err != nil { // not a port number, fall back to using port name klog.V(3).Infof("Searching Endpoints with %v port name %q for Service %q", proto, svcPort, nsName) @@ -436,7 +459,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr sp := svc.Spec.Ports[i] if sp.Name == svcPort { if sp.Protocol == proto { - endps = getEndpointsFromSlices(svc, &sp, proto, n.store.GetServiceEndpointsSlices) + endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices) break } } @@ -447,7 +470,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr sp := svc.Spec.Ports[i] if sp.Port == int32(targetPort) { if sp.Protocol == proto { - endps = getEndpointsFromSlices(svc, &sp, proto, n.store.GetServiceEndpointsSlices) + endps = getEndpointsFromSlices(svc, &sp, proto, zone, n.store.GetServiceEndpointsSlices) break } } @@ -498,8 +521,13 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend { upstream.Endpoints = append(upstream.Endpoints, n.DefaultEndpoint()) return upstream } - - endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices) + var zone string + if n.cfg.EnableTopologyAwareRouting { + zone = getIngressPodZone(svc) + } else { + zone = emptyZone + } + endps := getEndpointsFromSlices(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint", svcKey) endps = []ingress.Endpoint{n.DefaultEndpoint()} @@ -827,7 +855,13 @@ func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*in } sp := location.DefaultBackend.Spec.Ports[0] - endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices) + var zone string + if n.cfg.EnableTopologyAwareRouting { + zone = getIngressPodZone(location.DefaultBackend) + } else { + zone = emptyZone + } + endps := getEndpointsFromSlices(location.DefaultBackend, &sp, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) // custom backend is valid only if contains at least one endpoint if len(endps) > 0 { name := fmt.Sprintf("custom-default-backend-%v-%v", location.DefaultBackend.GetNamespace(), location.DefaultBackend.GetName()) @@ -1083,7 +1117,12 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres if err != nil { return upstreams, err } - + var zone string + if n.cfg.EnableTopologyAwareRouting { + zone = getIngressPodZone(svc) + } else { + zone = emptyZone + } klog.V(3).Infof("Obtaining ports information for Service %q", svcKey) // Ingress with an ExternalName Service and no port defined for that Service if svc.Spec.Type == apiv1.ServiceTypeExternalName { @@ -1092,7 +1131,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres return upstreams, nil } servicePort := externalNamePorts(backendPort, svc) - endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(svc, servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint.", svcKey) return upstreams, nil @@ -1109,7 +1148,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string) ([]ingres servicePort.TargetPort.String() == backendPort || servicePort.Name == backendPort { - endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, n.store.GetServiceEndpointsSlices) + endps := getEndpointsFromSlices(svc, &servicePort, apiv1.ProtocolTCP, zone, n.store.GetServiceEndpointsSlices) if len(endps) == 0 { klog.Warningf("Service %q does not have any active Endpoint.", svcKey) } diff --git a/internal/ingress/controller/endpointslices.go b/internal/ingress/controller/endpointslices.go index 5a24c3880..34d5266dd 100644 --- a/internal/ingress/controller/endpointslices.go +++ b/internal/ingress/controller/endpointslices.go @@ -35,7 +35,7 @@ import ( ) // getEndpoints returns a list of Endpoint structs for a given service/target port combination. -func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, +func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto corev1.Protocol, zoneForHints string, getServiceEndpointsSlices func(string) ([]*discoveryv1.EndpointSlice, error)) []ingress.Endpoint { upsServers := []ingress.Endpoint{} @@ -49,6 +49,7 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c processedUpstreamServers := make(map[string]struct{}) svcKey := k8s.MetaNamespaceKey(s) + var useTopologyHints bool // ExternalName services if s.Spec.Type == corev1.ServiceTypeExternalName { @@ -111,12 +112,38 @@ func getEndpointsFromSlices(s *corev1.Service, port *corev1.ServicePort, proto c ports = append(ports, targetPort) } } + useTopologyHints = false + if zoneForHints != emptyZone { + useTopologyHints = true + // check if all endpointslices has zone hints + for _, ep := range eps.Endpoints { + if ep.Hints == nil || len(ep.Hints.ForZones) == 0 { + useTopologyHints = false + break + } + } + if useTopologyHints { + klog.V(3).Infof("All endpoint slices has zone hint, using zone %q for Service %q", zoneForHints, svcKey) + } + } + for _, ep := range eps.Endpoints { if !(*ep.Conditions.Ready) { continue } + epHasZone := false + if useTopologyHints { + for _, epzone := range ep.Hints.ForZones { + if epzone.Name == zoneForHints { + epHasZone = true + break + } + } + } - // ep.Hints + if useTopologyHints && !epHasZone { + continue + } for _, epPort := range ports { for _, epAddress := range ep.Addresses { diff --git a/internal/ingress/controller/endpointslices_test.go b/internal/ingress/controller/endpointslices_test.go index e404c4949..b61e9a4f3 100644 --- a/internal/ingress/controller/endpointslices_test.go +++ b/internal/ingress/controller/endpointslices_test.go @@ -33,6 +33,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { svc *corev1.Service port *corev1.ServicePort proto corev1.Protocol + zone string fn func(string) ([]*discoveryv1.EndpointSlice, error) result []ingress.Endpoint }{ @@ -41,6 +42,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { nil, nil, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, nil }, @@ -51,6 +53,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { &corev1.Service{}, nil, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, nil }, @@ -61,6 +64,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { &corev1.Service{}, &corev1.ServicePort{Name: "default"}, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -75,6 +79,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, &corev1.ServicePort{Name: "default"}, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -99,6 +104,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -123,6 +129,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -147,6 +154,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -176,6 +184,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -205,6 +214,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{}, nil }, @@ -229,6 +239,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return nil, fmt.Errorf("unexpected error") }, @@ -253,6 +264,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -296,6 +308,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -339,6 +352,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromString("port-1"), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -382,6 +396,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -430,6 +445,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromInt(80), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -478,6 +494,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromString("port-1"), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{ { @@ -552,6 +569,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromString("port-1"), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{ { @@ -622,6 +640,7 @@ func TestGetEndpointsFromSlices(t *testing.T) { TargetPort: intstr.FromString("port-1"), }, corev1.ProtocolTCP, + "", func(string) ([]*discoveryv1.EndpointSlice, error) { return []*discoveryv1.EndpointSlice{{ ObjectMeta: metav1.ObjectMeta{ @@ -656,11 +675,251 @@ func TestGetEndpointsFromSlices(t *testing.T) { }, }, }, + { + "should return one endpoint which belongs to zone", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromString("port-1"), + }, + }, + }, + }, + &corev1.ServicePort{ + Name: "port-1", + TargetPort: intstr.FromString("port-1"), + }, + corev1.ProtocolTCP, + "eu-west-1b", + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1b", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.2"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1a", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.3"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1c", + }}, + }}[0], + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"port-1"}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{ + { + Address: "1.1.1.1", + Port: "80", + }, + }, + }, + { + "should return all endpoints because one is missing zone hint", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromString("port-1"), + }, + }, + }, + }, + &corev1.ServicePort{ + Name: "port-1", + TargetPort: intstr.FromString("port-1"), + }, + corev1.ProtocolTCP, + "eu-west-1b", + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1b", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.2"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1b", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.3"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{}}[0], + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"port-1"}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{ + { + Address: "1.1.1.1", + Port: "80", + }, + { + Address: "1.1.1.2", + Port: "80", + }, + { + Address: "1.1.1.3", + Port: "80", + }, + }, + }, + { + "should return all endpoints because no zone from controller node", + &corev1.Service{ + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []corev1.ServicePort{ + { + Name: "default", + TargetPort: intstr.FromString("port-1"), + }, + }, + }, + }, + &corev1.ServicePort{ + Name: "port-1", + TargetPort: intstr.FromString("port-1"), + }, + corev1.ProtocolTCP, + "", + func(string) ([]*discoveryv1.EndpointSlice, error) { + return []*discoveryv1.EndpointSlice{{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{discoveryv1.LabelServiceName: "default"}, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1a", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.2"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1b", + }}, + }}[0], + }, + { + Addresses: []string{"1.1.1.3"}, + Conditions: discoveryv1.EndpointConditions{ + Ready: &[]bool{true}[0], + }, + Hints: &[]discoveryv1.EndpointHints{{ + ForZones: []discoveryv1.ForZone{{ + Name: "eu-west-1c", + }}, + }}[0], + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Protocol: &[]corev1.Protocol{corev1.ProtocolTCP}[0], + Port: &[]int32{80}[0], + Name: &[]string{"port-1"}[0], + }, + }, + }}, nil + }, + []ingress.Endpoint{ + { + Address: "1.1.1.1", + Port: "80", + }, + { + Address: "1.1.1.2", + Port: "80", + }, + { + Address: "1.1.1.3", + Port: "80", + }, + }, + }, } for _, testCase := range tests { t.Run(testCase.name, func(t *testing.T) { - result := getEndpointsFromSlices(testCase.svc, testCase.port, testCase.proto, testCase.fn) + result := getEndpointsFromSlices(testCase.svc, testCase.port, testCase.proto, testCase.zone, testCase.fn) if len(testCase.result) != len(result) { t.Errorf("Expected %d Endpoints but got %d", len(testCase.result), len(result)) } diff --git a/internal/k8s/main.go b/internal/k8s/main.go index 5332631a7..e0d2a1660 100644 --- a/internal/k8s/main.go +++ b/internal/k8s/main.go @@ -78,6 +78,8 @@ func GetNodeIPOrName(kubeClient clientset.Interface, name string, useInternalIP var ( // IngressPodDetails hold information about the ingress-nginx pod IngressPodDetails *PodInfo + // IngressNodeDetails old information about the node running ingress-nginx pod + IngressNodeDetails *NodeInfo ) // PodInfo contains runtime information about the pod running the Ingres controller @@ -87,6 +89,12 @@ type PodInfo struct { metav1.ObjectMeta } +// NodeInfo contains runtime information about the node pod running the Ingres controller, eg. zone where pod is running +type NodeInfo struct { + metav1.TypeMeta + metav1.ObjectMeta +} + // GetIngressPod load the ingress-nginx pod func GetIngressPod(kubeClient clientset.Interface) error { podName := os.Getenv("POD_NAME") @@ -108,6 +116,18 @@ func GetIngressPod(kubeClient clientset.Interface) error { pod.ObjectMeta.DeepCopyInto(&IngressPodDetails.ObjectMeta) IngressPodDetails.SetLabels(pod.GetLabels()) + IngressNodeDetails = &NodeInfo{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Node"}, + } + // Try to get node info/labels to determine topology zone where pod is running + node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{}) + if err != nil { + klog.Warningf("Unable to get NODE information: %v", err) + } else { + node.ObjectMeta.DeepCopyInto(&IngressNodeDetails.ObjectMeta) + IngressNodeDetails.SetLabels(node.GetLabels()) + } + return nil } diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 6e183289c..911ab775c 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -216,6 +216,8 @@ Takes the form ":port". If not provided, no admission controller is starte dynamicConfigurationRetries = flags.Int("dynamic-configuration-retries", 15, "Number of times to retry failed dynamic configuration before failing to sync an ingress.") disableSyncEvents = flags.Bool("disable-sync-events", false, "Disables the creation of 'Sync' event resources") + + enableTopologyAwareRouting = flags.Bool("enable-topology-aware-routing", false, "Enable topology aware hints feature, needs service object annotation service.kubernetes.io/topology-aware-hints sets to auto.") ) flags.StringVar(&nginx.MaxmindMirror, "maxmind-mirror", "", `Maxmind mirror url (example: http://geoip.local/databases.`) @@ -348,6 +350,7 @@ https://blog.maxmind.com/2019/12/18/significant-changes-to-accessing-and-using-g SyncRateLimit: *syncRateLimit, HealthCheckHost: *healthzHost, DynamicConfigurationRetries: *dynamicConfigurationRetries, + EnableTopologyAwareRouting: *enableTopologyAwareRouting, ListenPorts: &ngx_config.ListenPorts{ Default: *defServerPort, Health: *healthzPort, diff --git a/test/e2e-image/e2e.sh b/test/e2e-image/e2e.sh index 7b4d56f10..ed13926fb 100755 --- a/test/e2e-image/e2e.sh +++ b/test/e2e-image/e2e.sh @@ -41,12 +41,22 @@ reportFileNamePrefix="report-e2e-test-suite" echo -e "${BGREEN}Running e2e test suite (FOCUS=${FOCUS})...${NC}" ginkgo "${ginkgo_args[@]}" \ -focus="${FOCUS}" \ - -skip="\[Serial\]|\[MemoryLeak\]" \ + -skip="\[Serial\]|\[MemoryLeak\]|\[TopologyHints\]" \ -nodes="${E2E_NODES}" \ --junit-report=$reportFileNamePrefix.xml \ /e2e.test # Create configMap out of a compressed report file for extraction later +# Must be isolated, there is a collision if multiple helms tries to install same clusterRole at same time +echo -e "${BGREEN}Running e2e test for topology aware hints...${NC}" +ginkgo "${ginkgo_args[@]}" \ + -focus="\[TopologyHints\]" \ + -skip="\[Serial\]|\[MemoryLeak\]]" \ + -nodes="${E2E_NODES}" \ + --junit-report=$reportFileNamePrefix-topology.xml \ + /e2e.test +# Create configMap out of a compressed report file for extraction later + echo -e "${BGREEN}Running e2e test suite with tests that require serial execution...${NC}" ginkgo "${ginkgo_args[@]}" \ -focus="\[Serial\]" \ diff --git a/test/e2e-image/namespace-overlays/topology/values.yaml b/test/e2e-image/namespace-overlays/topology/values.yaml new file mode 100644 index 000000000..28b1cad19 --- /dev/null +++ b/test/e2e-image/namespace-overlays/topology/values.yaml @@ -0,0 +1,51 @@ +# TODO: remove the need to use fullnameOverride +fullnameOverride: nginx-ingress +controller: + image: + repository: ingress-controller/controller + chroot: true + tag: 1.0.0-dev + digest: + digestChroot: + scope: + enabled: false + config: + worker-processes: "1" + readinessProbe: + initialDelaySeconds: 3 + periodSeconds: 1 + livenessProbe: + initialDelaySeconds: 3 + periodSeconds: 1 + service: + type: NodePort + electionID: ingress-controller-leader + ingressClassResource: + # We will create and remove each IC/ClusterRole/ClusterRoleBinding per test so there's no conflict + enabled: false + extraArgs: + tcp-services-configmap: $NAMESPACE/tcp-services + # e2e tests do not require information about ingress status + update-status: "false" + terminationGracePeriodSeconds: 1 + admissionWebhooks: + enabled: false + + enableTopologyAwareRouting: true + + # ulimit -c unlimited + # mkdir -p /tmp/coredump + # chmod a+rwx /tmp/coredump + # echo "/tmp/coredump/core.%e.%p.%h.%t" > /proc/sys/kernel/core_pattern + extraVolumeMounts: + - name: coredump + mountPath: /tmp/coredump + + extraVolumes: + - name: coredump + hostPath: + path: /tmp/coredump + +rbac: + create: true + scope: false diff --git a/test/e2e/endpointslices/topology.go b/test/e2e/endpointslices/topology.go new file mode 100644 index 000000000..ce913e966 --- /dev/null +++ b/test/e2e/endpointslices/topology.go @@ -0,0 +1,112 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpointslices + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os/exec" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + + "k8s.io/ingress-nginx/internal/nginx" + "k8s.io/ingress-nginx/test/e2e/framework" +) + +var _ = framework.IngressNginxDescribe("[TopologyHints] topology aware routing", func() { + f := framework.NewDefaultFramework("topology") + host := "topology-svc.foo.com" + + ginkgo.BeforeEach(func() { + f.NewEchoDeployment(framework.WithDeploymentReplicas(2), framework.WithSvcTopologyAnnotations()) + }) + + ginkgo.AfterEach(func() { + // we need to uninstall chart because of clusterRole which is not destroyed with namespace + err := uninstallChart(f) + assert.Nil(ginkgo.GinkgoT(), err, "uninstalling helm chart") + }) + + ginkgo.It("should return 200 when service has topology hints", func() { + + annotations := make(map[string]string) + ing := framework.NewSingleIngress(host, "/", host, f.Namespace, framework.EchoService, 80, annotations) + f.EnsureIngress(ing) + + f.WaitForNginxServer(host, func(server string) bool { + return strings.Contains(server, fmt.Sprintf("server_name %s", host)) + }) + + ginkgo.By("checking if the service is reached") + f.HTTPTestClient(). + GET("/"). + WithHeader("Host", host). + Expect(). + Status(http.StatusOK) + + slices, err := f.KubeClientSet.DiscoveryV1().EndpointSlices(f.Namespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: "kubernetes.io/service-name=echo", + Limit: 1, + }) + assert.Nil(ginkgo.GinkgoT(), err) + + // check if we have hints, really depends on k8s endpoint slice controller + gotHints := true + for _, ep := range slices.Items[0].Endpoints { + if ep.Hints == nil || len(ep.Hints.ForZones) == 0 { + gotHints = false + break + } + } + + curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort) + status, err := f.ExecIngressPod(curlCmd) + assert.Nil(ginkgo.GinkgoT(), err) + var backends []map[string]interface{} + json.Unmarshal([]byte(status), &backends) + gotBackends := 0 + for _, bck := range backends { + if strings.Contains(bck["name"].(string), "topology") { + gotBackends = len(bck["endpoints"].([]interface{})) + } + } + + if gotHints { + //we have 2 replics, if there is just one backend it means that we are routing according slices hints to same zone as controller is + assert.Equal(ginkgo.GinkgoT(), 1, gotBackends) + } else { + // two replicas should have two endpoints without topology hints + assert.Equal(ginkgo.GinkgoT(), 2, gotBackends) + } + }) +}) + +func uninstallChart(f *framework.Framework) error { + cmd := exec.Command("helm", "uninstall", "--namespace", f.Namespace, "nginx-ingress") + _, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("unexpected error uninstalling ingress-nginx release: %v", err) + } + + return nil +} diff --git a/test/e2e/framework/deployment.go b/test/e2e/framework/deployment.go index 3cfe8f360..8115fd12e 100644 --- a/test/e2e/framework/deployment.go +++ b/test/e2e/framework/deployment.go @@ -40,10 +40,10 @@ const SlowEchoService = "slow-echo" const HTTPBinService = "httpbin" type deploymentOptions struct { - namespace string - name string - replicas int - image string + namespace string + name string + replicas int + svcAnnotations map[string]string } // WithDeploymentNamespace allows configuring the deployment's namespace @@ -53,6 +53,15 @@ func WithDeploymentNamespace(n string) func(*deploymentOptions) { } } +// WithSvcTopologyAnnotations create svc with topology aware hints sets to auto +func WithSvcTopologyAnnotations() func(*deploymentOptions) { + return func(o *deploymentOptions) { + o.svcAnnotations = map[string]string{ + "service.kubernetes.io/topology-aware-hints": "auto", + } + } +} + // WithDeploymentName allows configuring the deployment's names func WithDeploymentName(n string) func(*deploymentOptions) { return func(o *deploymentOptions) { @@ -95,8 +104,9 @@ func (f *Framework) NewEchoDeployment(opts ...func(*deploymentOptions)) { service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: options.name, - Namespace: options.namespace, + Name: options.name, + Namespace: options.namespace, + Annotations: options.svcAnnotations, }, Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ diff --git a/test/e2e/kind.yaml b/test/e2e/kind.yaml index 97dc7082d..07a56dae8 100644 --- a/test/e2e/kind.yaml +++ b/test/e2e/kind.yaml @@ -2,8 +2,14 @@ kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 nodes: - role: control-plane + labels: + topology.kubernetes.io/zone: zone-1 - role: worker + labels: + topology.kubernetes.io/zone: zone-1 - role: worker + labels: + topology.kubernetes.io/zone: zone-2 kubeadmConfigPatches: - | kind: ClusterConfiguration