Add tests for controller getEndpoints

This commit is contained in:
Manuel de Brito Fontes 2018-04-21 22:51:58 -03:00
parent 18a6a3051d
commit 777c637cf6
No known key found for this signature in database
GPG key ID: 786136016A8BA02A
3 changed files with 572 additions and 99 deletions

View file

@ -19,8 +19,6 @@ package controller
import (
"fmt"
"math/rand"
"net"
"reflect"
"sort"
"strconv"
"strings"
@ -291,7 +289,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
for _, sp := range svc.Spec.Ports {
if sp.Name == svcPort {
if sp.Protocol == proto {
endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Config{})
endps = getEndpoints(svc, &sp, proto, &healthcheck.Config{}, n.store.GetServiceEndpoints)
break
}
}
@ -302,7 +300,7 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
for _, sp := range svc.Spec.Ports {
if sp.Port == int32(targetPort) {
if sp.Protocol == proto {
endps = n.getEndpoints(svc, &sp, proto, &healthcheck.Config{})
endps = getEndpoints(svc, &sp, proto, &healthcheck.Config{}, n.store.GetServiceEndpoints)
break
}
}
@ -347,7 +345,7 @@ func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
return upstream
}
endps := n.getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{})
endps := getEndpoints(svc, &svc.Spec.Ports[0], apiv1.ProtocolTCP, &healthcheck.Config{}, n.store.GetServiceEndpoints)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
endps = []ingress.Endpoint{n.DefaultEndpoint()}
@ -528,7 +526,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
// check if the location contains endpoints and a custom default backend
if location.DefaultBackend != nil {
sp := location.DefaultBackend.Spec.Ports[0]
endps := n.getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Config{})
endps := getEndpoints(location.DefaultBackend, &sp, apiv1.ProtocolTCP, &healthcheck.Config{}, n.store.GetServiceEndpoints)
if len(endps) > 0 {
glog.V(3).Infof("using custom default backend in server %v location %v (service %v/%v)",
server.Hostname, location.Path, location.DefaultBackend.Namespace, location.DefaultBackend.Name)
@ -770,7 +768,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
servicePort.TargetPort.String() == backendPort ||
servicePort.Name == backendPort {
endps := n.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz)
endps := getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz, n.store.GetServiceEndpoints)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
}
@ -804,7 +802,7 @@ func (n *NGINXController) serviceEndpoints(svcKey, backendPort string,
Port: int32(externalPort),
TargetPort: intstr.FromString(backendPort),
}
endps := n.getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz)
endps := getEndpoints(svc, &servicePort, apiv1.ProtocolTCP, hz, n.store.GetServiceEndpoints)
if len(endps) == 0 {
glog.Warningf("service %v does not have any active endpoints", svcKey)
return upstreams, nil
@ -1057,97 +1055,6 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
return servers
}
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func (n *NGINXController) getEndpoints(
s *apiv1.Service,
servicePort *apiv1.ServicePort,
proto apiv1.Protocol,
hz *healthcheck.Config) []ingress.Endpoint {
upsServers := []ingress.Endpoint{}
// avoid duplicated upstream servers when the service
// contains multiple port definitions sharing the same
// targetport.
adus := make(map[string]bool)
// ExternalName services
if s.Spec.Type == apiv1.ServiceTypeExternalName {
glog.V(3).Infof("Ingress using a service %v of type=ExternalName : %v", s.Name)
targetPort := servicePort.TargetPort.IntValue()
// check for invalid port value
if targetPort <= 0 {
glog.Errorf("ExternalName service with an invalid port: %v", targetPort)
return upsServers
}
if net.ParseIP(s.Spec.ExternalName) == nil {
_, err := net.LookupHost(s.Spec.ExternalName)
if err != nil {
glog.Errorf("unexpected error resolving host %v: %v", s.Spec.ExternalName, err)
return upsServers
}
}
return append(upsServers, ingress.Endpoint{
Address: s.Spec.ExternalName,
Port: fmt.Sprintf("%v", targetPort),
MaxFails: hz.MaxFails,
FailTimeout: hz.FailTimeout,
})
}
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, servicePort.String())
ep, err := n.store.GetServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return upsServers
}
for _, ss := range ep.Subsets {
for _, epPort := range ss.Ports {
if !reflect.DeepEqual(epPort.Protocol, proto) {
continue
}
var targetPort int32
if servicePort.Name == "" {
// ServicePort.Name is optional if there is only one port
targetPort = epPort.Port
} else if servicePort.Name == epPort.Name {
targetPort = epPort.Port
}
// check for invalid port value
if targetPort <= 0 {
continue
}
for _, epAddress := range ss.Addresses {
ep := fmt.Sprintf("%v:%v", epAddress.IP, targetPort)
if _, exists := adus[ep]; exists {
continue
}
ups := ingress.Endpoint{
Address: epAddress.IP,
Port: fmt.Sprintf("%v", targetPort),
MaxFails: hz.MaxFails,
FailTimeout: hz.FailTimeout,
Target: epAddress.TargetRef,
}
upsServers = append(upsServers, ups)
adus[ep] = true
}
}
}
glog.V(3).Infof("endpoints found: %v", upsServers)
return upsServers
}
func (n *NGINXController) isForceReload() bool {
return atomic.LoadInt32(&n.forceReload) != 0
}

View file

@ -0,0 +1,128 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"net"
"reflect"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck"
)
// getEndpoints returns a list of <endpoint ip>:<port> for a given service/target port combination.
func getEndpoints(
s *corev1.Service,
port *corev1.ServicePort,
proto corev1.Protocol,
hz *healthcheck.Config,
getServiceEndpoints func(*corev1.Service) (*corev1.Endpoints, error),
) []ingress.Endpoint {
upsServers := []ingress.Endpoint{}
if s == nil || port == nil {
return upsServers
}
// avoid duplicated upstream servers when the service
// contains multiple port definitions sharing the same
// targetport.
adus := make(map[string]bool)
// ExternalName services
if s.Spec.Type == corev1.ServiceTypeExternalName {
glog.V(3).Infof("Ingress using a service %v of type=ExternalName : %v", s.Name)
targetPort := port.TargetPort.IntValue()
// check for invalid port value
if targetPort <= 0 {
glog.Errorf("ExternalName service with an invalid port: %v", targetPort)
return upsServers
}
if net.ParseIP(s.Spec.ExternalName) == nil {
_, err := net.LookupHost(s.Spec.ExternalName)
if err != nil {
glog.Errorf("unexpected error resolving host %v: %v", s.Spec.ExternalName, err)
return upsServers
}
}
return append(upsServers, ingress.Endpoint{
Address: s.Spec.ExternalName,
Port: fmt.Sprintf("%v", targetPort),
MaxFails: hz.MaxFails,
FailTimeout: hz.FailTimeout,
})
}
glog.V(3).Infof("getting endpoints for service %v/%v and port %v", s.Namespace, s.Name, port.String())
ep, err := getServiceEndpoints(s)
if err != nil {
glog.Warningf("unexpected error obtaining service endpoints: %v", err)
return upsServers
}
for _, ss := range ep.Subsets {
for _, epPort := range ss.Ports {
if !reflect.DeepEqual(epPort.Protocol, proto) {
continue
}
var targetPort int32
if port.Name == "" {
// port.Name is optional if there is only one port
targetPort = epPort.Port
} else if port.Name == epPort.Name {
targetPort = epPort.Port
}
glog.Infof("TP: %v", targetPort)
// check for invalid port value
if targetPort <= 0 {
continue
}
for _, epAddress := range ss.Addresses {
ep := fmt.Sprintf("%v:%v", epAddress.IP, targetPort)
if _, exists := adus[ep]; exists {
continue
}
ups := ingress.Endpoint{
Address: epAddress.IP,
Port: fmt.Sprintf("%v", targetPort),
MaxFails: hz.MaxFails,
FailTimeout: hz.FailTimeout,
Target: epAddress.TargetRef,
}
upsServers = append(upsServers, ups)
adus[ep] = true
}
}
}
glog.V(3).Infof("endpoints found: %v", upsServers)
return upsServers
}

View file

@ -0,0 +1,438 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"testing"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck"
)
func TestGetEndpoints(t *testing.T) {
tests := []struct {
name string
svc *corev1.Service
port *corev1.ServicePort
proto corev1.Protocol
hz *healthcheck.Config
fn func(*corev1.Service) (*corev1.Endpoints, error)
result []ingress.Endpoint
}{
{
"no service should return 0 endpoints",
nil,
nil,
corev1.ProtocolTCP,
nil,
func(*corev1.Service) (*corev1.Endpoints, error) {
return nil, nil
},
[]ingress.Endpoint{},
},
{
"no service port should return 0 endpoints",
&corev1.Service{},
nil,
corev1.ProtocolTCP,
nil,
func(*corev1.Service) (*corev1.Endpoints, error) {
return nil, nil
},
[]ingress.Endpoint{},
},
{
"a service without endpoints should return 0 endpoints",
&corev1.Service{},
&corev1.ServicePort{Name: "default"},
corev1.ProtocolTCP,
nil,
func(*corev1.Service) (*corev1.Endpoints, error) {
return &corev1.Endpoints{}, nil
},
[]ingress.Endpoint{},
},
{
"a service type ServiceTypeExternalName service with an invalid port should return 0 endpoints",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeExternalName,
},
},
&corev1.ServicePort{Name: "default"},
corev1.ProtocolTCP,
nil,
func(*corev1.Service) (*corev1.Endpoints, error) {
return &corev1.Endpoints{}, nil
},
[]ingress.Endpoint{},
},
{
"a service type ServiceTypeExternalName with a valid port should return one endpoint",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeExternalName,
ExternalName: "10.0.0.1.xip.io",
Ports: []corev1.ServicePort{
{
Name: "default",
TargetPort: intstr.FromInt(80),
},
},
},
},
&corev1.ServicePort{
Name: "default",
TargetPort: intstr.FromInt(80),
},
corev1.ProtocolTCP,
&healthcheck.Config{
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
return &corev1.Endpoints{}, nil
},
[]ingress.Endpoint{
{
Address: "10.0.0.1.xip.io",
Port: "80",
MaxFails: 0,
FailTimeout: 0,
},
},
},
{
"a service type ServiceTypeExternalName with an invalid ExternalName value should return one endpoint",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeExternalName,
ExternalName: "foo.bar",
Ports: []corev1.ServicePort{
{
Name: "default",
TargetPort: intstr.FromInt(80),
},
},
},
},
&corev1.ServicePort{
Name: "default",
TargetPort: intstr.FromInt(80),
},
corev1.ProtocolTCP,
&healthcheck.Config{
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
return &corev1.Endpoints{}, nil
},
[]ingress.Endpoint{},
},
{
"should return no endpoints when there is an error searching for endpoints",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "1.1.1.1",
Ports: []corev1.ServicePort{
{
Name: "default",
TargetPort: intstr.FromInt(80),
},
},
},
},
&corev1.ServicePort{
Name: "default",
TargetPort: intstr.FromInt(80),
},
corev1.ProtocolTCP,
&healthcheck.Config{
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
return nil, fmt.Errorf("unexpected error")
},
[]ingress.Endpoint{},
},
{
"should return no endpoints when the protocol does not match",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "1.1.1.1",
Ports: []corev1.ServicePort{
{
Name: "default",
TargetPort: intstr.FromInt(80),
},
},
},
},
&corev1.ServicePort{
Name: "default",
TargetPort: intstr.FromInt(80),
},
corev1.ProtocolTCP,
&healthcheck.Config{
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "1.1.1.1",
NodeName: &nodeName,
},
},
Ports: []corev1.EndpointPort{
{
Protocol: corev1.ProtocolUDP,
},
},
},
},
}, nil
},
[]ingress.Endpoint{},
},
{
"should return no endpoints when there is no ready Addresses",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "1.1.1.1",
Ports: []corev1.ServicePort{
{
Name: "default",
TargetPort: intstr.FromInt(80),
},
},
},
},
&corev1.ServicePort{
Name: "default",
TargetPort: intstr.FromInt(80),
},
corev1.ProtocolTCP,
&healthcheck.Config{
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
{
NotReadyAddresses: []corev1.EndpointAddress{
{
IP: "1.1.1.1",
NodeName: &nodeName,
},
},
Ports: []corev1.EndpointPort{
{
Protocol: corev1.ProtocolUDP,
},
},
},
},
}, nil
},
[]ingress.Endpoint{},
},
{
"should return no endpoints when the name of the port name do not match any port in the endpoint Subsets",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "1.1.1.1",
Ports: []corev1.ServicePort{
{
Name: "default",
TargetPort: intstr.FromInt(80),
},
},
},
},
&corev1.ServicePort{
Name: "default",
TargetPort: intstr.FromInt(80),
},
corev1.ProtocolTCP,
&healthcheck.Config{
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "1.1.1.1",
NodeName: &nodeName,
},
},
Ports: []corev1.EndpointPort{
{
Protocol: corev1.ProtocolTCP,
Port: int32(80),
Name: "another-name",
},
},
},
},
}, nil
},
[]ingress.Endpoint{},
},
{
"should return one endpoint when the name of the port name match a port in the endpoint Subsets",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "1.1.1.1",
Ports: []corev1.ServicePort{
{
Name: "default",
TargetPort: intstr.FromInt(80),
},
},
},
},
&corev1.ServicePort{
Name: "default",
TargetPort: intstr.FromInt(80),
},
corev1.ProtocolTCP,
&healthcheck.Config{
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "1.1.1.1",
NodeName: &nodeName,
},
},
Ports: []corev1.EndpointPort{
{
Protocol: corev1.ProtocolTCP,
Port: int32(80),
Name: "default",
},
},
},
},
}, nil
},
[]ingress.Endpoint{
{
Address: "1.1.1.1",
Port: "80",
MaxFails: 0,
FailTimeout: 0,
},
},
},
{
"should return one endpoint when the name of the port name match more than one port in the endpoint Subsets",
&corev1.Service{
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "1.1.1.1",
Ports: []corev1.ServicePort{
{
Name: "default",
TargetPort: intstr.FromString("port-1"),
},
},
},
},
&corev1.ServicePort{
Name: "port-1",
TargetPort: intstr.FromString("port-1"),
},
corev1.ProtocolTCP,
&healthcheck.Config{
MaxFails: 0,
FailTimeout: 0,
},
func(*corev1.Service) (*corev1.Endpoints, error) {
nodeName := "dummy"
return &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "1.1.1.1",
NodeName: &nodeName,
},
},
Ports: []corev1.EndpointPort{
{
Name: "port-1",
Protocol: corev1.ProtocolTCP,
Port: 80,
},
{
Name: "port-1",
Protocol: corev1.ProtocolTCP,
Port: 80,
},
},
},
},
}, nil
},
[]ingress.Endpoint{
{
Address: "1.1.1.1",
Port: "80",
MaxFails: 0,
FailTimeout: 0,
},
},
},
}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
result := getEndpoints(testCase.svc, testCase.port, testCase.proto, testCase.hz, testCase.fn)
if len(testCase.result) != len(result) {
t.Errorf("expected %v Endpoints but got %v", testCase.result, len(result))
}
})
}
}