Unittests

This commit is contained in:
Prashanth Balasubramanian 2016-05-29 16:05:38 -07:00
parent f84ca54831
commit 22c6e5ddd7
14 changed files with 340 additions and 49 deletions

View file

@ -29,10 +29,12 @@ import (
func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool {
namer := &utils.Namer{}
nodePool := instances.NewNodePool(fakeIGs, "default-zone")
nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}})
healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer)
healthChecks.Init(&healthchecks.FakeHealthCheckGetter{nil})
return NewBackendPool(
f,
healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer),
instances.NewNodePool(fakeIGs, "default-zone"), namer, []int64{}, syncWithCloud)
f, healthChecks, nodePool, namer, []int64{}, syncWithCloud)
}
func TestBackendPoolAdd(t *testing.T) {

View file

@ -84,6 +84,7 @@ type ClusterManager struct {
healthCheckers []healthchecks.HealthChecker
}
// Init initializes the cluster manager.
func (c *ClusterManager) Init(tr *GCETranslator) {
c.instancePool.Init(tr)
for _, h := range c.healthCheckers {

View file

@ -55,7 +55,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager, masterUrl s
if err != nil {
t.Fatalf("%v", err)
}
lb.hasSynced = func() { return true }
lb.hasSynced = func() bool { return true }
return lb
}

View file

@ -49,8 +49,13 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
fakeHCs := healthchecks.NewFakeHealthChecks()
namer := &utils.Namer{clusterName}
nodePool := instances.NewNodePool(fakeIGs, defaultZone)
nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}})
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
healthChecker.Init(&healthchecks.FakeHealthCheckGetter{nil})
backendPool := backends.NewBackendPool(
fakeBackends,
healthChecker, nodePool, namer, []int64{}, false)

View file

@ -0,0 +1,174 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
)
func TestZoneListing(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
zoneToNode := map[string][]string{
"zone-1": {"n1"},
"zone-2": {"n2"},
}
addNodes(lbc, zoneToNode)
zones, err := lbc.tr.ListZones()
if err != nil {
t.Errorf("Failed to list zones: %v", err)
}
for expectedZone := range zoneToNode {
found := false
for _, gotZone := range zones {
if gotZone == expectedZone {
found = true
}
}
if !found {
t.Fatalf("Expected zones %v; Got zones %v", zoneToNode, zones)
}
}
}
func TestInstancesAddedToZones(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
zoneToNode := map[string][]string{
"zone-1": {"n1", "n2"},
"zone-2": {"n3"},
}
addNodes(lbc, zoneToNode)
// Create 2 igs, one per zone.
testIG := "test-ig"
testPort := int64(3001)
lbc.CloudClusterManager.instancePool.AddInstanceGroup(testIG, testPort)
// node pool syncs kube-nodes, this will add them to both igs.
lbc.CloudClusterManager.instancePool.Sync([]string{"n1", "n2", "n3"})
gotZonesToNode := cm.fakeIGs.GetInstancesByZone()
i := 0
for z, nodeNames := range zoneToNode {
if ig, err := cm.fakeIGs.GetInstanceGroup(testIG, z); err != nil {
t.Errorf("Failed to find ig %v in zone %v, found %+v: %v", testIG, z, ig, err)
}
if cm.fakeIGs.Ports[i] != testPort {
t.Errorf("Expected the same node port on all igs, got ports %+v", cm.fakeIGs.Ports)
}
expNodes := sets.NewString(nodeNames...)
gotNodes := sets.NewString(gotZonesToNode[z]...)
if !gotNodes.Equal(expNodes) {
t.Errorf("Nodes not added to zones, expected %+v got %+v", expNodes, gotNodes)
}
i++
}
}
func TestProbeGetter(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
nodePortToHealthCheck := map[int64]string{
3001: "/healthz",
3002: "/foo",
}
addPods(lbc, nodePortToHealthCheck)
for p, exp := range nodePortToHealthCheck {
got, err := lbc.tr.HealthCheck(p)
if err != nil {
t.Errorf("Failed to get health check for node port %v: %v", p, err)
} else if got.RequestPath != exp {
t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp)
}
}
}
func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string) {
for np, u := range nodePortToHealthCheck {
l := map[string]string{fmt.Sprintf("app-%d", np): "test"}
svc := &api.Service{
Spec: api.ServiceSpec{
Selector: l,
Ports: []api.ServicePort{
{
NodePort: int32(np),
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
},
}
svc.Name = fmt.Sprintf("%d", np)
lbc.svcLister.Store.Add(svc)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: l,
Name: fmt.Sprintf("%d", np),
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Ports: []api.ContainerPort{{ContainerPort: 80}},
ReadinessProbe: &api.Probe{
Handler: api.Handler{
HTTPGet: &api.HTTPGetAction{
Path: u,
Port: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
},
},
},
},
}
lbc.podLister.Indexer.Add(pod)
}
}
func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) {
for zone, nodes := range zoneToNode {
for _, node := range nodes {
n := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: node,
Labels: map[string]string{
zoneKey: zone,
},
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{Type: api.NodeReady, Status: api.ConditionTrue},
},
},
}
lbc.nodeLister.Store.Add(n)
}
}
lbc.CloudClusterManager.instancePool.Init(lbc.tr)
}

View file

@ -18,6 +18,7 @@ package controller
import (
"fmt"
"sort"
"strconv"
"time"
@ -376,6 +377,10 @@ func (t *GCETranslator) getHTTPProbe(l map[string]string, targetPort intstr.IntO
if err != nil {
return nil, err
}
// If multiple endpoints have different health checks, take the first
sort.Sort(PodsByCreationTimestamp(pl))
for _, pod := range pl {
logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort)
for _, c := range pod.Spec.Containers {
@ -387,10 +392,9 @@ func (t *GCETranslator) getHTTPProbe(l map[string]string, targetPort intstr.IntO
if isPortEqual(cPort, targetPort) {
if isPortEqual(c.ReadinessProbe.Handler.HTTPGet.Port, targetPort) {
return c.ReadinessProbe, nil
} else {
glog.Infof("%v: found matching targetPort on container %v, but not on readinessProbe (%+v)",
logStr, c.Name, c.ReadinessProbe.Handler.HTTPGet.Port)
}
glog.Infof("%v: found matching targetPort on container %v, but not on readinessProbe (%+v)",
logStr, c.Name, c.ReadinessProbe.Handler.HTTPGet.Port)
}
}
}
@ -437,19 +441,18 @@ func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error
}
}
}
return &compute.HttpHealthCheck{
Port: port,
// Empty string is used as a signal to the caller to use the appropriate
// default.
RequestPath: "",
Description: "Default kubernetes L7 Loadbalancing health check.",
// How often to health check.
CheckIntervalSec: 1,
// How long to wait before claiming failure of a health check.
TimeoutSec: 1,
// Number of healthchecks to pass for a vm to be deemed healthy.
HealthyThreshold: 1,
// Number of healthchecks to fail before the vm is deemed unhealthy.
UnhealthyThreshold: 10,
}, nil
return utils.DefaultHealthCheckTemplate(port), nil
}
// PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker.
type PodsByCreationTimestamp []*api.Pod
func (o PodsByCreationTimestamp) Len() int { return len(o) }
func (o PodsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o PodsByCreationTimestamp) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}

View file

@ -20,6 +20,7 @@ import (
"fmt"
compute "google.golang.org/api/compute/v1"
"k8s.io/contrib/ingress/controllers/gce/utils"
)
// NewFakeHealthChecks returns a new FakeHealthChecks.
@ -27,6 +28,20 @@ func NewFakeHealthChecks() *FakeHealthChecks {
return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}}
}
// FakeHealthCheckGetter implements the healthCheckGetter interface for tests.
type FakeHealthCheckGetter struct {
DefaultHealthCheck *compute.HttpHealthCheck
}
// HealthCheck returns the health check for the given port. If a health check
// isn't stored under the DefaultHealthCheck member, it constructs one.
func (h *FakeHealthCheckGetter) HealthCheck(port int64) (*compute.HttpHealthCheck, error) {
if h.DefaultHealthCheck == nil {
return utils.DefaultHealthCheckTemplate(port), nil
}
return h.DefaultHealthCheck, nil
}
// FakeHealthChecks fakes out health checks.
type FakeHealthChecks struct {
hc []*compute.HttpHealthCheck
@ -66,4 +81,21 @@ func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error {
return nil
}
func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { return nil }
// UpdateHttpHealthCheck sends the given health check as an update.
func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
healthChecks := []*compute.HttpHealthCheck{}
found := false
for _, h := range f.hc {
if h.Name == name {
healthChecks = append(healthChecks, hc)
found = true
} else {
healthChecks = append(healthChecks, h)
}
}
if !found {
return fmt.Errorf("Cannot update a non-existent health check %v", hc.Name)
}
f.hc = healthChecks
return nil
}

View file

@ -32,11 +32,6 @@ type HealthChecks struct {
healthCheckGetter
}
type healthCheckGetter interface {
// HealthCheck returns the HTTP readiness check for a node port.
HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error)
}
// NewHealthChecker creates a new health checker.
// cloud: the cloud object implementing SingleHealthCheck.
// defaultHealthCheckPath: is the HTTP path to use for health checks.

View file

@ -20,6 +20,12 @@ import (
compute "google.golang.org/api/compute/v1"
)
// healthCheckGetter retrieves health checks.
type healthCheckGetter interface {
// HealthCheck returns the HTTP readiness check for a node port.
HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error)
}
// SingleHealthCheck is an interface to manage a single GCE health check.
type SingleHealthCheck interface {
CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error

View file

@ -27,30 +27,50 @@ import (
// NewFakeInstanceGroups creates a new FakeInstanceGroups.
func NewFakeInstanceGroups(nodes sets.String) *FakeInstanceGroups {
return &FakeInstanceGroups{
instances: nodes,
listResult: getInstanceList(nodes),
namer: utils.Namer{},
instances: nodes,
listResult: getInstanceList(nodes),
namer: utils.Namer{},
zonesToInstances: map[string][]string{},
}
}
// InstanceGroup fakes
// FakeZoneLister records zones for nodes.
type FakeZoneLister struct {
Zones []string
}
// ListZones returns the list of zones.
func (z *FakeZoneLister) ListZones() ([]string, error) {
return z.Zones, nil
}
// GetZoneForNode returns the only zone stored in the fake zone lister.
func (z *FakeZoneLister) GetZoneForNode(name string) (string, error) {
// TODO: evolve as required, it's currently needed just to satisfy the
// interface in unittests that don't care about zones. See unittests in
// controller/util_test for actual zoneLister testing.
return z.Zones[0], nil
}
// FakeInstanceGroups fakes out the instance groups api.
type FakeInstanceGroups struct {
instances sets.String
instanceGroups []*compute.InstanceGroup
Ports []int64
getResult *compute.InstanceGroup
listResult *compute.InstanceGroupsListInstances
calls []int
namer utils.Namer
instances sets.String
instanceGroups []*compute.InstanceGroup
Ports []int64
getResult *compute.InstanceGroup
listResult *compute.InstanceGroupsListInstances
calls []int
namer utils.Namer
zonesToInstances map[string][]string
}
// GetInstanceGroup fakes getting an instance group from the cloud.
func (f *FakeInstanceGroups) GetInstanceGroup(name, zone string) (*compute.InstanceGroup, error) {
f.calls = append(f.calls, utils.Get)
for _, ig := range f.instanceGroups {
if ig.Name == name {
if ig.Name == name && ig.Zone == zone {
return ig, nil
}
}
@ -60,7 +80,7 @@ func (f *FakeInstanceGroups) GetInstanceGroup(name, zone string) (*compute.Insta
// CreateInstanceGroup fakes instance group creation.
func (f *FakeInstanceGroups) CreateInstanceGroup(name, zone string) (*compute.InstanceGroup, error) {
newGroup := &compute.InstanceGroup{Name: name, SelfLink: name}
newGroup := &compute.InstanceGroup{Name: name, SelfLink: name, Zone: zone}
f.instanceGroups = append(f.instanceGroups, newGroup)
return newGroup, nil
}
@ -92,13 +112,35 @@ func (f *FakeInstanceGroups) ListInstancesInInstanceGroup(name, zone string, sta
func (f *FakeInstanceGroups) AddInstancesToInstanceGroup(name, zone string, instanceNames []string) error {
f.calls = append(f.calls, utils.AddInstances)
f.instances.Insert(instanceNames...)
if _, ok := f.zonesToInstances[zone]; !ok {
f.zonesToInstances[zone] = []string{}
}
f.zonesToInstances[zone] = append(f.zonesToInstances[zone], instanceNames...)
return nil
}
// GetInstancesByZone returns the zone to instances map.
func (f *FakeInstanceGroups) GetInstancesByZone() map[string][]string {
return f.zonesToInstances
}
// RemoveInstancesFromInstanceGroup fakes removing instances from an instance group.
func (f *FakeInstanceGroups) RemoveInstancesFromInstanceGroup(name, zone string, instanceNames []string) error {
f.calls = append(f.calls, utils.RemoveInstances)
f.instances.Delete(instanceNames...)
l, ok := f.zonesToInstances[zone]
if !ok {
return nil
}
newIns := []string{}
delIns := sets.NewString(instanceNames...)
for _, oldIns := range l {
if delIns.Has(oldIns) {
continue
}
newIns = append(newIns, oldIns)
}
f.zonesToInstances[zone] = newIns
return nil
}

View file

@ -50,6 +50,9 @@ func NewNodePool(cloud InstanceGroups, defaultZone string) NodePool {
return &Instances{cloud, storage.NewInMemoryPool(), nil}
}
// Init initializes the instance pool. The given zoneLister is used to list
// all zones that require an instance group, and to lookup which zone a
// given Kubernetes node is in so we can add it to the right instance group.
func (i *Instances) Init(zl zoneLister) {
i.zoneLister = zl
}
@ -191,7 +194,7 @@ func (i *Instances) Remove(groupName string, names []string) error {
// Sync syncs kubernetes instances with the instances in the instance group.
func (i *Instances) Sync(nodes []string) (err error) {
glog.V(1).Infof("Syncing nodes %v", nodes)
glog.V(4).Infof("Syncing nodes %v", nodes)
defer func() {
// The node pool is only responsible for syncing nodes to instance
@ -207,9 +210,9 @@ func (i *Instances) Sync(nodes []string) (err error) {
}()
pool := i.snapshotter.Snapshot()
for name := range pool {
for igName := range pool {
gceNodes := sets.NewString()
gceNodes, err = i.list(name)
gceNodes, err = i.list(igName)
if err != nil {
return err
}
@ -223,14 +226,14 @@ func (i *Instances) Sync(nodes []string) (err error) {
addNodes := kubeNodes.Difference(gceNodes).List()
if len(removeNodes) != 0 {
if err = i.Remove(
name, gceNodes.Difference(kubeNodes).List()); err != nil {
igName, gceNodes.Difference(kubeNodes).List()); err != nil {
return err
}
}
if len(addNodes) != 0 {
if err = i.Add(
name, kubeNodes.Difference(gceNodes).List()); err != nil {
igName, kubeNodes.Difference(gceNodes).List()); err != nil {
return err
}
}

View file

@ -24,10 +24,16 @@ import (
const defaultZone = "default-zone"
func newNodePool(f *FakeInstanceGroups, zone string) NodePool {
pool := NewNodePool(f, zone)
pool.Init(&FakeZoneLister{[]string{zone}})
return pool
}
func TestNodePoolSync(t *testing.T) {
f := NewFakeInstanceGroups(sets.NewString(
[]string{"n1", "n2"}...))
pool := NewNodePool(f, defaultZone)
pool := newNodePool(f, defaultZone)
pool.AddInstanceGroup("test", 80)
// KubeNodes: n1
@ -46,7 +52,7 @@ func TestNodePoolSync(t *testing.T) {
// Try to add n2 to the instance group.
f = NewFakeInstanceGroups(sets.NewString([]string{"n1"}...))
pool = NewNodePool(f, defaultZone)
pool = newNodePool(f, defaultZone)
pool.AddInstanceGroup("test", 80)
f.calls = []int{}
@ -62,7 +68,7 @@ func TestNodePoolSync(t *testing.T) {
// Do nothing.
f = NewFakeInstanceGroups(sets.NewString([]string{"n1", "n2"}...))
pool = NewNodePool(f, defaultZone)
pool = newNodePool(f, defaultZone)
pool.AddInstanceGroup("test", 80)
f.calls = []int{}

View file

@ -38,8 +38,11 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool {
fakeHCs := healthchecks.NewFakeHealthChecks()
namer := &utils.Namer{}
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
healthChecker.Init(&healthchecks.FakeHealthCheckGetter{nil})
nodePool := instances.NewNodePool(fakeIGs, defaultZone)
nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}})
backendPool := backends.NewBackendPool(
fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer, []int64{}, false)
fakeBackends, healthChecker, nodePool, namer, []int64{}, false)
return NewLoadBalancerPool(f, backendPool, testDefaultBeNodePort, namer)
}

View file

@ -238,3 +238,22 @@ func CompareLinks(l1, l2 string) bool {
// FakeIngressRuleValueMap is a convenience type used by multiple submodules
// that share the same testing methods.
type FakeIngressRuleValueMap map[string]string
// DefaultHealthCheckTemplate simply returns the default health check template.
func DefaultHealthCheckTemplate(port int64) *compute.HttpHealthCheck {
return &compute.HttpHealthCheck{
Port: port,
// Empty string is used as a signal to the caller to use the appropriate
// default.
RequestPath: "",
Description: "Default kubernetes L7 Loadbalancing health check.",
// How often to health check.
CheckIntervalSec: 1,
// How long to wait before claiming failure of a health check.
TimeoutSec: 1,
// Number of healthchecks to pass for a vm to be deemed healthy.
HealthyThreshold: 1,
// Number of healthchecks to fail before the vm is deemed unhealthy.
UnhealthyThreshold: 10,
}
}