From 32ac61e7e38240c250eeb0a18e9a60f1ed941f9d Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Sat, 21 May 2016 17:46:09 -0700 Subject: [PATCH 1/6] Teach l7 controller about zones --- controllers/gce/backends/backends.go | 47 +++--- controllers/gce/controller/cluster_manager.go | 6 +- controllers/gce/controller/fakes.go | 1 - controllers/gce/controller/utils.go | 42 +++++ controllers/gce/instances/instances.go | 154 +++++++++++++----- controllers/gce/instances/interfaces.go | 15 +- controllers/gce/main.go | 1 + 7 files changed, 202 insertions(+), 64 deletions(-) diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index b307f8e33..a697748bd 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -107,7 +107,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) { return be, nil } -func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { +func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { // Create a new health check if err := b.healthChecker.Add(namedPort.Port, ""); err != nil { return nil, err @@ -120,11 +120,7 @@ func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPor backend := &compute.BackendService{ Name: name, Protocol: "HTTP", - Backends: []*compute.Backend{ - { - Group: ig.SelfLink, - }, - }, + Backends: getBackendsForIGs(igs), // Api expects one, means little to kubernetes. HealthChecks: []string{hc.SelfLink}, Port: namedPort.Port, @@ -143,20 +139,20 @@ func (b *Backends) Add(port int64) error { be := &compute.BackendService{} defer func() { b.snapshotter.Add(portKey(port), be) }() - ig, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port) + igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port) if err != nil { return err } be, _ = b.Get(port) if be == nil { - glog.Infof("Creating backend for instance group %v port %v named port %v", - ig.Name, port, namedPort) - be, err = b.create(ig, namedPort, b.namer.BeName(port)) + glog.Infof("Creating backend for %d instance groups, port %v named port %v", + len(igs), port, namedPort) + be, err = b.create(igs, namedPort, b.namer.BeName(port)) if err != nil { return err } } - if err := b.edgeHop(be, ig); err != nil { + if err := b.edgeHop(be, igs); err != nil { return err } return err @@ -201,18 +197,31 @@ func (b *Backends) List() ([]interface{}, error) { return interList, nil } +func getBackendsForIGs(igs []*compute.InstanceGroup) []*compute.Backend { + backends := []*compute.Backend{} + for _, ig := range igs { + backends = append(backends, &compute.Backend{Group: ig.SelfLink}) + } + return backends +} + // edgeHop checks the links of the given backend by executing an edge hop. // It fixes broken links. -func (b *Backends) edgeHop(be *compute.BackendService, ig *compute.InstanceGroup) error { - if len(be.Backends) == 1 && - utils.CompareLinks(be.Backends[0].Group, ig.SelfLink) { +func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error { + beIGs := sets.String{} + for _, beToIG := range be.Backends { + beIGs.Insert(beToIG.Group) + } + igLinks := sets.String{} + for _, igToBE := range igs { + igLinks.Insert(igToBE.SelfLink) + } + if igLinks.Equal(beIGs) { return nil } - glog.Infof("Backend %v has a broken edge, adding link to %v", - be.Name, ig.Name) - be.Backends = []*compute.Backend{ - {Group: ig.SelfLink}, - } + glog.Infof("Backend %v has a broken edge, expected igs %+v, current igs %+v", + be.Name, igLinks.List(), beIGs.List()) + be.Backends = getBackendsForIGs(igs) if err := b.cloud.UpdateBackendService(be); err != nil { return err } diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 860c8e20a..65f1b6a92 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -76,6 +76,11 @@ type ClusterManager struct { firewallPool firewalls.SingleFirewallPool } +func (c *ClusterManager) Init(tr *GCETranslator) { + c.instancePool.Init(tr) + // TODO: Initialize other members as needed. +} + // IsHealthy returns an error if the cluster manager is unhealthy. func (c *ClusterManager) IsHealthy() (err error) { // TODO: Expand on this, for now we just want to detect when the GCE client @@ -148,7 +153,6 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName if err := c.firewallPool.Sync(fwNodePorts, nodeNames); err != nil { return err } - return nil } diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index 911c71dda..fd649bc27 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -30,7 +30,6 @@ import ( const ( testDefaultBeNodePort = int64(3000) - defaultZone = "default-zone" ) var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 50a3bb846..8c2de8aa5 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" @@ -37,6 +38,10 @@ import ( const ( allowHTTPKey = "kubernetes.io/ingress.allow-http" staticIPNameKey = "kubernetes.io/ingress.global-static-ip-name" + + // Label key to denote which GCE zone a Kubernetes node is in. + zoneKey = "failure-domain.beta.kubernetes.io/zone" + defaultZone = "" ) // ingAnnotations represents Ingress annotations. @@ -315,3 +320,40 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { } return knownPorts } + +func getZone(n api.Node) string { + zone, ok := n.Labels[zoneKey] + if !ok { + return defaultZone + } + return zone +} + +// GetZoneForNode returns the zone for a given node by looking up its zone label. +func (t *GCETranslator) GetZoneForNode(name string) (string, error) { + nodes, err := t.nodeLister.NodeCondition(nodeReady).List() + if err != nil { + return "", err + } + for _, n := range nodes.Items { + if n.Name == name { + // TODO: Make this more resilient to label changes by listing + // cloud nodes and figuring out zone. + return getZone(n), nil + } + } + return "", fmt.Errorf("Node not found %v", name) +} + +// ListZones returns a list of zones this Kubernetes cluster spans. +func (t *GCETranslator) ListZones() ([]string, error) { + zones := sets.String{} + readyNodes, err := t.nodeLister.NodeCondition(nodeReady).List() + if err != nil { + return zones.List(), err + } + for _, n := range readyNodes.Items { + zones.Insert(getZone(n)) + } + return zones.List(), nil +} diff --git a/controllers/gce/instances/instances.go b/controllers/gce/instances/instances.go index 49e4d222d..54f17c33f 100644 --- a/controllers/gce/instances/instances.go +++ b/controllers/gce/instances/instances.go @@ -17,6 +17,7 @@ limitations under the License. package instances import ( + "fmt" "net/http" "strings" @@ -35,89 +36,162 @@ const ( // Instances implements NodePool. type Instances struct { - cloud InstanceGroups - zone string + cloud InstanceGroups + // zones is a list of zones seeded by Kubernetes node zones. + // TODO: we can figure this out. snapshotter storage.Snapshotter + zoneLister } // NewNodePool creates a new node pool. // - cloud: implements InstanceGroups, used to sync Kubernetes nodes with // members of the cloud InstanceGroup. -func NewNodePool(cloud InstanceGroups, zone string) NodePool { - glog.V(3).Infof("NodePool is only aware of instances in zone %v", zone) - return &Instances{cloud, zone, storage.NewInMemoryPool()} +func NewNodePool(cloud InstanceGroups, defaultZone string) NodePool { + return &Instances{cloud, storage.NewInMemoryPool(), nil} +} + +func (i *Instances) Init(zl zoneLister) { + i.zoneLister = zl } // AddInstanceGroup creates or gets an instance group if it doesn't exist -// and adds the given port to it. -func (i *Instances) AddInstanceGroup(name string, port int64) (*compute.InstanceGroup, *compute.NamedPort, error) { - ig, _ := i.Get(name) - if ig == nil { - glog.Infof("Creating instance group %v", name) +// and adds the given port to it. Returns a list of one instance group per zone, +// all of which have the exact same named port. +func (i *Instances) AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) { + igs := []*compute.InstanceGroup{} + namedPort := &compute.NamedPort{} + + zones, err := i.ListZones() + if err != nil { + return igs, namedPort, err + } + + for _, zone := range zones { + ig, _ := i.Get(name, zone) var err error - ig, err = i.cloud.CreateInstanceGroup(name, i.zone) + if ig == nil { + glog.Infof("Creating instance group %v in zone %v", name, zone) + ig, err = i.cloud.CreateInstanceGroup(name, zone) + if err != nil { + return nil, nil, err + } + } else { + glog.V(3).Infof("Instance group %v already exists in zone %v, adding port %d to it", name, zone, port) + } + defer i.snapshotter.Add(name, struct{}{}) + namedPort, err = i.cloud.AddPortToInstanceGroup(ig, port) if err != nil { return nil, nil, err } - } else { - glog.V(3).Infof("Instance group already exists %v", name) + igs = append(igs, ig) } - defer i.snapshotter.Add(name, ig) - namedPort, err := i.cloud.AddPortToInstanceGroup(ig, port) - if err != nil { - return nil, nil, err - } - - return ig, namedPort, nil + return igs, namedPort, nil } -// DeleteInstanceGroup deletes the given IG by name. +// DeleteInstanceGroup deletes the given IG by name, from all zones. func (i *Instances) DeleteInstanceGroup(name string) error { defer i.snapshotter.Delete(name) - return i.cloud.DeleteInstanceGroup(name, i.zone) + errs := []error{} + + zones, err := i.ListZones() + if err != nil { + return err + } + for _, zone := range zones { + glog.Infof("deleting instance group %v in zone %v", name, zone) + if err := i.cloud.DeleteInstanceGroup(name, zone); err != nil { + errs = append(errs, err) + } + } + if len(errs) == 0 { + return nil + } + return fmt.Errorf("%v", errs) } +// list lists all instances in all zones. func (i *Instances) list(name string) (sets.String, error) { nodeNames := sets.NewString() - instances, err := i.cloud.ListInstancesInInstanceGroup( - name, i.zone, allInstances) + zones, err := i.ListZones() if err != nil { return nodeNames, err } - for _, ins := range instances.Items { - // TODO: If round trips weren't so slow one would be inclided - // to GetInstance using this url and get the name. - parts := strings.Split(ins.Instance, "/") - nodeNames.Insert(parts[len(parts)-1]) + + for _, zone := range zones { + instances, err := i.cloud.ListInstancesInInstanceGroup( + name, zone, allInstances) + if err != nil { + return nodeNames, err + } + for _, ins := range instances.Items { + // TODO: If round trips weren't so slow one would be inclided + // to GetInstance using this url and get the name. + parts := strings.Split(ins.Instance, "/") + nodeNames.Insert(parts[len(parts)-1]) + } } return nodeNames, nil } // Get returns the Instance Group by name. -func (i *Instances) Get(name string) (*compute.InstanceGroup, error) { - ig, err := i.cloud.GetInstanceGroup(name, i.zone) +func (i *Instances) Get(name, zone string) (*compute.InstanceGroup, error) { + ig, err := i.cloud.GetInstanceGroup(name, zone) if err != nil { return nil, err } - i.snapshotter.Add(name, ig) + i.snapshotter.Add(name, struct{}{}) return ig, nil } -// Add adds the given instances to the Instance Group. -func (i *Instances) Add(groupName string, names []string) error { - glog.V(3).Infof("Adding nodes %v to %v", names, groupName) - return i.cloud.AddInstancesToInstanceGroup(groupName, i.zone, names) +func (i *Instances) splitNodesByZone(names []string) map[string][]string { + nodesByZone := map[string][]string{} + for _, name := range names { + zone, err := i.GetZoneForNode(name) + if err != nil { + glog.Errorf("Failed to get zones for %v: %v, skipping", name, err) + continue + } + if _, ok := nodesByZone[zone]; !ok { + nodesByZone[zone] = []string{} + } + nodesByZone[zone] = append(nodesByZone[zone], name) + } + return nodesByZone } -// Remove removes the given instances from the Instance Group. +// Add adds the given instances to the appropriately zoned Instance Group. +func (i *Instances) Add(groupName string, names []string) error { + errs := []error{} + for zone, nodeNames := range i.splitNodesByZone(names) { + glog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone) + if err := i.cloud.AddInstancesToInstanceGroup(groupName, zone, nodeNames); err != nil { + errs = append(errs, err) + } + } + if len(errs) == 0 { + return nil + } + return fmt.Errorf("%v", errs) +} + +// Remove removes the given instances from the appropriately zoned Instance Group. func (i *Instances) Remove(groupName string, names []string) error { - glog.V(3).Infof("Removing nodes %v from %v", names, groupName) - return i.cloud.RemoveInstancesFromInstanceGroup(groupName, i.zone, names) + errs := []error{} + for zone, nodeNames := range i.splitNodesByZone(names) { + glog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone) + if err := i.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, nodeNames); err != nil { + errs = append(errs, err) + } + } + if len(errs) == 0 { + return nil + } + return fmt.Errorf("%v", errs) } // Sync syncs kubernetes instances with the instances in the instance group. func (i *Instances) Sync(nodes []string) (err error) { - glog.V(3).Infof("Syncing nodes %v", nodes) + glog.V(1).Infof("Syncing nodes %v", nodes) defer func() { // The node pool is only responsible for syncing nodes to instance diff --git a/controllers/gce/instances/interfaces.go b/controllers/gce/instances/interfaces.go index 2bb2bf3b5..f7d03ed57 100644 --- a/controllers/gce/instances/interfaces.go +++ b/controllers/gce/instances/interfaces.go @@ -20,17 +20,26 @@ import ( compute "google.golang.org/api/compute/v1" ) +// zoneLister manages lookups for GCE instance groups/instances to zones. +type zoneLister interface { + ListZones() ([]string, error) + GetZoneForNode(name string) (string, error) +} + // NodePool is an interface to manage a pool of kubernetes nodes synced with vm instances in the cloud -// through the InstanceGroups interface. +// through the InstanceGroups interface. It handles zones opaquely using the zoneLister. type NodePool interface { - AddInstanceGroup(name string, port int64) (*compute.InstanceGroup, *compute.NamedPort, error) + Init(zl zoneLister) + + // The following 2 methods operate on instance groups. + AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) DeleteInstanceGroup(name string) error // TODO: Refactor for modularity Add(groupName string, nodeNames []string) error Remove(groupName string, nodeNames []string) error Sync(nodeNames []string) error - Get(name string) (*compute.InstanceGroup, error) + Get(name, zone string) (*compute.InstanceGroup, error) } // InstanceGroups is an interface for managing gce instances groups, and the instances therein. diff --git a/controllers/gce/main.go b/controllers/gce/main.go index f4199147f..cbdb8b837 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -220,6 +220,7 @@ func main() { if clusterManager.ClusterNamer.ClusterName != "" { glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.ClusterName) } + clusterManager.Init(&controller.GCETranslator{lbc}) go registerHandlers(lbc) go handleSigterm(lbc, *deleteAllOnQuit) From f84ca5483148ddd815fec3d55b9254c67a6e5444 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Sat, 28 May 2016 22:02:39 -0700 Subject: [PATCH 2/6] Readiness probe health check --- controllers/gce/backends/backends.go | 6 +- controllers/gce/backends/fakes.go | 44 --------- controllers/gce/backends/interfaces.go | 14 --- controllers/gce/controller/cluster_manager.go | 18 +++- controllers/gce/controller/controller.go | 56 +++++++++-- controllers/gce/controller/controller_test.go | 1 + controllers/gce/controller/utils.go | 96 +++++++++++++++++++ controllers/gce/healthchecks/fakes.go | 2 + controllers/gce/healthchecks/healthchecks.go | 53 +++++----- controllers/gce/healthchecks/interfaces.go | 5 +- .../gce/loadbalancers/loadbalancers.go | 2 +- controllers/gce/storage/pools.go | 14 ++- 12 files changed, 217 insertions(+), 94 deletions(-) diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index a697748bd..84b029ea9 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -109,7 +109,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) { func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { // Create a new health check - if err := b.healthChecker.Add(namedPort.Port, ""); err != nil { + if err := b.healthChecker.Add(namedPort.Port); err != nil { return nil, err } hc, err := b.healthChecker.Get(namedPort.Port) @@ -152,6 +152,10 @@ func (b *Backends) Add(port int64) error { return err } } + // we won't find any igs till the node pool syncs nodes. + if len(igs) == 0 { + return nil + } if err := b.edgeHop(be, igs); err != nil { return err } diff --git a/controllers/gce/backends/fakes.go b/controllers/gce/backends/fakes.go index ef8bfadaf..945e04847 100644 --- a/controllers/gce/backends/fakes.go +++ b/controllers/gce/backends/fakes.go @@ -99,47 +99,3 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput return &compute.BackendServiceGroupHealth{ HealthStatus: states}, nil } - -// NewFakeHealthChecks returns a health check fake. -func NewFakeHealthChecks() *FakeHealthChecks { - return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}} -} - -// FakeHealthChecks fakes out health checks. -type FakeHealthChecks struct { - hc []*compute.HttpHealthCheck -} - -// CreateHttpHealthCheck fakes health check creation. -func (f *FakeHealthChecks) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error { - f.hc = append(f.hc, hc) - return nil -} - -// GetHttpHealthCheck fakes getting a http health check. -func (f *FakeHealthChecks) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) { - for _, h := range f.hc { - if h.Name == name { - return h, nil - } - } - return nil, fmt.Errorf("Health check %v not found.", name) -} - -// DeleteHttpHealthCheck fakes deleting a http health check. -func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error { - healthChecks := []*compute.HttpHealthCheck{} - exists := false - for _, h := range f.hc { - if h.Name == name { - exists = true - continue - } - healthChecks = append(healthChecks, h) - } - if !exists { - return fmt.Errorf("Failed to find health check %v", name) - } - f.hc = healthChecks - return nil -} diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 3e199f9e5..25802570e 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -42,17 +42,3 @@ type BackendServices interface { ListBackendServices() (*compute.BackendServiceList, error) GetHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) } - -// SingleHealthCheck is an interface to manage a single GCE health check. -type SingleHealthCheck interface { - CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error - DeleteHttpHealthCheck(name string) error - GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) -} - -// HealthChecker is an interface to manage cloud HTTPHealthChecks. -type HealthChecker interface { - Add(port int64, path string) error - Delete(port int64) error - Get(port int64) (*compute.HttpHealthCheck, error) -} diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 65f1b6a92..b20299ada 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -74,10 +74,21 @@ type ClusterManager struct { backendPool backends.BackendPool l7Pool loadbalancers.LoadBalancerPool firewallPool firewalls.SingleFirewallPool + + // TODO: Refactor so we simply init a health check pool. + // Currently health checks are tied to backends because each backend needs + // the link of the associated health, but both the backend pool and + // loadbalancer pool manage backends, because the lifetime of the default + // backend is tied to the last/first loadbalancer not the life of the + // nodeport service or Ingress. + healthCheckers []healthchecks.HealthChecker } func (c *ClusterManager) Init(tr *GCETranslator) { c.instancePool.Init(tr) + for _, h := range c.healthCheckers { + h.Init(tr) + } // TODO: Initialize other members as needed. } @@ -221,7 +232,7 @@ func getGCEClient(config io.Reader) *gce.GCECloud { // string passed to glbc via --gce-cluster-name. // - defaultBackendNodePort: is the node port of glbc's default backend. This is // the kubernetes Service that serves the 404 page if no urls match. -// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz" +// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz". func NewClusterManager( configFilePath string, name string, @@ -258,11 +269,14 @@ func NewClusterManager( // BackendPool creates GCE BackendServices and associated health checks. healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer) + // Loadbalancer pool manages the default backend and its health check. + defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer) + + cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker} // TODO: This needs to change to a consolidated management of the default backend. cluster.backendPool = backends.NewBackendPool( cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true) - defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer) defaultBackendPool := backends.NewBackendPool( cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) cluster.defaultBackendNodePort = defaultBackendNodePort diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index 1fb5d926a..8c03337cf 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -44,18 +44,25 @@ var ( // DefaultClusterUID is the uid to use for clusters resources created by an // L7 controller created without specifying the --cluster-uid flag. DefaultClusterUID = "" + + // Frequency to poll on local stores to sync. + storeSyncPollPeriod = 5 * time.Second ) // LoadBalancerController watches the kubernetes api and adds/removes services // from the loadbalancer, via loadBalancerConfig. type LoadBalancerController struct { - client *client.Client - ingController *framework.Controller - nodeController *framework.Controller - svcController *framework.Controller - ingLister StoreToIngressLister - nodeLister cache.StoreToNodeLister - svcLister cache.StoreToServiceLister + client *client.Client + ingController *framework.Controller + nodeController *framework.Controller + svcController *framework.Controller + podController *framework.Controller + ingLister StoreToIngressLister + nodeLister cache.StoreToNodeLister + svcLister cache.StoreToServiceLister + // Health checks are the readiness probes of containers on pods. + podLister cache.StoreToPodLister + // TODO: Watch secrets CloudClusterManager *ClusterManager recorder record.EventRecorder nodeQueue *taskQueue @@ -69,6 +76,9 @@ type LoadBalancerController struct { shutdown bool // tlsLoader loads secrets from the Kubernetes apiserver for Ingresses. tlsLoader tlsLoader + // hasSynced returns true if all associated sub-controllers have synced. + // Abstracted into a func for testing. + hasSynced func() bool } // NewLoadBalancerController creates a controller for gce loadbalancers. @@ -90,6 +100,7 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste } lbc.nodeQueue = NewTaskQueue(lbc.syncNodes) lbc.ingQueue = NewTaskQueue(lbc.sync) + lbc.hasSynced = lbc.storesSynced // Ingress watch handlers pathHandlers := framework.ResourceEventHandlerFuncs{ @@ -130,12 +141,19 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste lbc.client, "services", namespace, fields.Everything()), &api.Service{}, resyncPeriod, svcHandlers) + lbc.podLister.Indexer, lbc.podController = framework.NewIndexerInformer( + cache.NewListWatchFromClient(lbc.client, "pods", namespace, fields.Everything()), + &api.Pod{}, + resyncPeriod, + framework.ResourceEventHandlerFuncs{}, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + nodeHandlers := framework.ResourceEventHandlerFuncs{ AddFunc: lbc.nodeQueue.enqueue, DeleteFunc: lbc.nodeQueue.enqueue, // Nodes are updated every 10s and we don't care, so no update handler. } - // Node watch handlers lbc.nodeLister.Store, lbc.nodeController = framework.NewInformer( &cache.ListWatch{ @@ -194,6 +212,7 @@ func (lbc *LoadBalancerController) Run() { go lbc.ingController.Run(lbc.stopCh) go lbc.nodeController.Run(lbc.stopCh) go lbc.svcController.Run(lbc.stopCh) + go lbc.podController.Run(lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh) go lbc.nodeQueue.run(time.Second, lbc.stopCh) <-lbc.stopCh @@ -224,8 +243,29 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error { return nil } +// storesSynced returns true if all the sub-controllers have finished their +// first sync with apiserver. +func (lbc *LoadBalancerController) storesSynced() bool { + return ( + // wait for pods to sync so we don't allocate a default health check when + // an endpoint has a readiness probe. + lbc.podController.HasSynced() && + // wait for services so we don't thrash on backend creation. + lbc.svcController.HasSynced() && + // wait for nodes so we don't disconnect a backend from an instance + // group just because we don't realize there are nodes in that zone. + lbc.nodeController.HasSynced() && + // Wait for ingresses as a safety measure. We don't really need this. + lbc.ingController.HasSynced()) +} + // sync manages Ingress create/updates/deletes. func (lbc *LoadBalancerController) sync(key string) { + if !lbc.hasSynced() { + time.Sleep(storeSyncPollPeriod) + lbc.ingQueue.requeue(key, fmt.Errorf("Waiting for stores to sync")) + return + } glog.V(3).Infof("Syncing %v", key) paths, err := lbc.ingLister.List() diff --git a/controllers/gce/controller/controller_test.go b/controllers/gce/controller/controller_test.go index 34d98b2d0..26aab545f 100644 --- a/controllers/gce/controller/controller_test.go +++ b/controllers/gce/controller/controller_test.go @@ -55,6 +55,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager, masterUrl s if err != nil { t.Fatalf("%v", err) } + lb.hasSynced = func() { return true } return lb } diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 8c2de8aa5..274ea02c6 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -357,3 +358,98 @@ func (t *GCETranslator) ListZones() ([]string, error) { } return zones.List(), nil } + +// isPortEqual compares the given IntOrString ports +func isPortEqual(port, targetPort intstr.IntOrString) bool { + if targetPort.Type == intstr.Int { + return port.IntVal == targetPort.IntVal + } + return port.StrVal == targetPort.StrVal +} + +// geHTTPProbe returns the http readiness probe from the first container +// that matches targetPort, from the set of pods matching the given labels. +func (t *GCETranslator) getHTTPProbe(l map[string]string, targetPort intstr.IntOrString) (*api.Probe, error) { + // Lookup any container with a matching targetPort from the set of pods + // with a matching label selector. + pl, err := t.podLister.List(labels.SelectorFromSet(labels.Set(l))) + if err != nil { + return nil, err + } + for _, pod := range pl { + logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort) + for _, c := range pod.Spec.Containers { + if c.ReadinessProbe == nil || c.ReadinessProbe.Handler.HTTPGet == nil { + continue + } + for _, p := range c.Ports { + cPort := intstr.IntOrString{IntVal: p.ContainerPort, StrVal: p.Name} + if isPortEqual(cPort, targetPort) { + if isPortEqual(c.ReadinessProbe.Handler.HTTPGet.Port, targetPort) { + return c.ReadinessProbe, nil + } else { + glog.Infof("%v: found matching targetPort on container %v, but not on readinessProbe (%+v)", + logStr, c.Name, c.ReadinessProbe.Handler.HTTPGet.Port) + } + } + } + } + glog.V(4).Infof("%v: lacks a matching HTTP probe for use in health checks.", logStr) + } + return nil, nil +} + +// HealthCheck returns the http readiness probe for the endpoint backing the +// given nodePort. If no probe is found it returns a health check with "" as +// the request path, callers are responsible for swapping this out for the +// appropriate default. +func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error) { + sl, err := t.svcLister.List() + if err != nil { + return nil, err + } + // Find the label and target port of the one service with the given nodePort + for _, s := range sl.Items { + for _, p := range s.Spec.Ports { + if int32(port) == p.NodePort { + rp, err := t.getHTTPProbe(s.Spec.Selector, p.TargetPort) + if err != nil { + return nil, err + } + if rp == nil { + glog.Infof("No pod in service %v with node port %v has declared a matching readiness probe for health checks.", s.Name, port) + break + } + healthPath := rp.Handler.HTTPGet.Path + host := rp.Handler.HTTPGet.Host + glog.Infof("Found custom health check for Service %v nodeport %v: %v%v", s.Name, port, host, healthPath) + return &compute.HttpHealthCheck{ + Port: port, + RequestPath: healthPath, + Host: host, + Description: "kubernetes L7 health check from readiness probe.", + CheckIntervalSec: int64(rp.PeriodSeconds), + TimeoutSec: int64(rp.TimeoutSeconds), + HealthyThreshold: int64(rp.SuccessThreshold), + UnhealthyThreshold: int64(rp.FailureThreshold), + // TODO: include headers after updating compute godep. + }, nil + } + } + } + return &compute.HttpHealthCheck{ + Port: port, + // Empty string is used as a signal to the caller to use the appropriate + // default. + RequestPath: "", + Description: "Default kubernetes L7 Loadbalancing health check.", + // How often to health check. + CheckIntervalSec: 1, + // How long to wait before claiming failure of a health check. + TimeoutSec: 1, + // Number of healthchecks to pass for a vm to be deemed healthy. + HealthyThreshold: 1, + // Number of healthchecks to fail before the vm is deemed unhealthy. + UnhealthyThreshold: 10, + }, nil +} diff --git a/controllers/gce/healthchecks/fakes.go b/controllers/gce/healthchecks/fakes.go index 7889b2b50..51fc07174 100644 --- a/controllers/gce/healthchecks/fakes.go +++ b/controllers/gce/healthchecks/fakes.go @@ -65,3 +65,5 @@ func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error { f.hc = healthChecks return nil } + +func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { return nil } diff --git a/controllers/gce/healthchecks/healthchecks.go b/controllers/gce/healthchecks/healthchecks.go index 849899850..fdd97427b 100644 --- a/controllers/gce/healthchecks/healthchecks.go +++ b/controllers/gce/healthchecks/healthchecks.go @@ -29,43 +29,52 @@ type HealthChecks struct { cloud SingleHealthCheck defaultPath string namer *utils.Namer + healthCheckGetter +} + +type healthCheckGetter interface { + // HealthCheck returns the HTTP readiness check for a node port. + HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error) } // NewHealthChecker creates a new health checker. // cloud: the cloud object implementing SingleHealthCheck. // defaultHealthCheckPath: is the HTTP path to use for health checks. func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { - return &HealthChecks{cloud, defaultHealthCheckPath, namer} + return &HealthChecks{cloud, defaultHealthCheckPath, namer, nil} +} + +// Init initializes the health checker. +func (h *HealthChecks) Init(r healthCheckGetter) { + h.healthCheckGetter = r } // Add adds a healthcheck if one for the same port doesn't already exist. -func (h *HealthChecks) Add(port int64, path string) error { - hc, _ := h.Get(port) - name := h.namer.BeName(port) - if path == "" { - path = h.defaultPath +func (h *HealthChecks) Add(port int64) error { + wantHC, err := h.healthCheckGetter.HealthCheck(port) + if err != nil { + return err } + if wantHC.RequestPath == "" { + wantHC.RequestPath = h.defaultPath + } + name := h.namer.BeName(port) + wantHC.Name = name + hc, _ := h.Get(port) if hc == nil { + // TODO: check if the readiness probe has changed and update the + // health check. glog.Infof("Creating health check %v", name) - if err := h.cloud.CreateHttpHealthCheck( - &compute.HttpHealthCheck{ - Name: name, - Port: port, - RequestPath: path, - Description: "Default kubernetes L7 Loadbalancing health check.", - // How often to health check. - CheckIntervalSec: 1, - // How long to wait before claiming failure of a health check. - TimeoutSec: 1, - // Number of healthchecks to pass for a vm to be deemed healthy. - HealthyThreshold: 1, - // Number of healthchecks to fail before the vm is deemed unhealthy. - UnhealthyThreshold: 10, - }); err != nil { + if err := h.cloud.CreateHttpHealthCheck(wantHC); err != nil { + return err + } + } else if wantHC.RequestPath != hc.RequestPath { + // TODO: also compare headers interval etc. + glog.Infof("Updating health check %v, path %v -> %v", name, hc.RequestPath, wantHC.RequestPath) + if err := h.cloud.UpdateHttpHealthCheck(wantHC); err != nil { return err } } else { - // TODO: Does this health check need an edge hop? glog.Infof("Health check %v already exists", hc.Name) } return nil diff --git a/controllers/gce/healthchecks/interfaces.go b/controllers/gce/healthchecks/interfaces.go index efbea9669..6e04bafc6 100644 --- a/controllers/gce/healthchecks/interfaces.go +++ b/controllers/gce/healthchecks/interfaces.go @@ -23,13 +23,16 @@ import ( // SingleHealthCheck is an interface to manage a single GCE health check. type SingleHealthCheck interface { CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error + UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error DeleteHttpHealthCheck(name string) error GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) } // HealthChecker is an interface to manage cloud HTTPHealthChecks. type HealthChecker interface { - Add(port int64, path string) error + Init(h healthCheckGetter) + + Add(port int64) error Delete(port int64) error Get(port int64) (*compute.HttpHealthCheck, error) } diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index 45d2a76cc..0047db957 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -368,7 +368,7 @@ func (l *L7) checkSSLCert() (err error) { cert, _ := l.cloud.GetSslCertificate(certName) // PrivateKey is write only, so compare certs alone. We're assuming that - // no one will change just the key. We can remembe the key and compare, + // no one will change just the key. We can remember the key and compare, // but a bug could end up leaking it, which feels worse. if cert == nil || ingCert != cert.Certificate { diff --git a/controllers/gce/storage/pools.go b/controllers/gce/storage/pools.go index 3d1fd235e..a09dc6c95 100644 --- a/controllers/gce/storage/pools.go +++ b/controllers/gce/storage/pools.go @@ -78,16 +78,28 @@ type CloudListingPool struct { keyGetter keyFunc } -// ReplenishPool lists through the cloudLister and inserts into the pool. +// ReplenishPool lists through the cloudLister and inserts into the pool. This +// is especially useful in scenarios like deleting an Ingress while the +// controller is restarting. As long as the resource exists in the shared +// memory pool, it is visible to the caller and they can take corrective +// actions, eg: backend pool deletes backends with non-matching node ports +// in its sync method. func (c *CloudListingPool) ReplenishPool() { c.lock.Lock() defer c.lock.Unlock() glog.V(4).Infof("Replenishing pool") + + // We must list with the lock, because the controller also lists through + // Snapshot(). It's ok if the controller takes a snpshot, we list, we + // delete, because we have delete based on the most recent state. Worst + // case we thrash. It's not ok if we list, the controller lists and + // creates a backend, and we delete that backend based on stale state. items, err := c.lister.List() if err != nil { glog.Warningf("Failed to list: %v", err) return } + for i := range items { key, err := c.keyGetter(items[i]) if err != nil { From 22c6e5ddd76e291a42dbf43bd47a2fb944004328 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Sun, 29 May 2016 16:05:38 -0700 Subject: [PATCH 3/6] Unittests --- controllers/gce/backends/backends_test.go | 8 +- controllers/gce/controller/cluster_manager.go | 1 + controllers/gce/controller/controller_test.go | 2 +- controllers/gce/controller/fakes.go | 5 + controllers/gce/controller/util_test.go | 174 ++++++++++++++++++ controllers/gce/controller/utils.go | 39 ++-- controllers/gce/healthchecks/fakes.go | 34 +++- controllers/gce/healthchecks/healthchecks.go | 5 - controllers/gce/healthchecks/interfaces.go | 6 + controllers/gce/instances/fakes.go | 66 +++++-- controllers/gce/instances/instances.go | 13 +- controllers/gce/instances/instances_test.go | 12 +- .../gce/loadbalancers/loadbalancers_test.go | 5 +- controllers/gce/utils/utils.go | 19 ++ 14 files changed, 340 insertions(+), 49 deletions(-) create mode 100644 controllers/gce/controller/util_test.go diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index 6c7c07c5b..db1991fef 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -29,10 +29,12 @@ import ( func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool { namer := &utils.Namer{} + nodePool := instances.NewNodePool(fakeIGs, "default-zone") + nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}}) + healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer) + healthChecks.Init(&healthchecks.FakeHealthCheckGetter{nil}) return NewBackendPool( - f, - healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer), - instances.NewNodePool(fakeIGs, "default-zone"), namer, []int64{}, syncWithCloud) + f, healthChecks, nodePool, namer, []int64{}, syncWithCloud) } func TestBackendPoolAdd(t *testing.T) { diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index b20299ada..5e7c15f93 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -84,6 +84,7 @@ type ClusterManager struct { healthCheckers []healthchecks.HealthChecker } +// Init initializes the cluster manager. func (c *ClusterManager) Init(tr *GCETranslator) { c.instancePool.Init(tr) for _, h := range c.healthCheckers { diff --git a/controllers/gce/controller/controller_test.go b/controllers/gce/controller/controller_test.go index 26aab545f..8fa636dcc 100644 --- a/controllers/gce/controller/controller_test.go +++ b/controllers/gce/controller/controller_test.go @@ -55,7 +55,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager, masterUrl s if err != nil { t.Fatalf("%v", err) } - lb.hasSynced = func() { return true } + lb.hasSynced = func() bool { return true } return lb } diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index fd649bc27..db783ca0d 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -49,8 +49,13 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager { fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) fakeHCs := healthchecks.NewFakeHealthChecks() namer := &utils.Namer{clusterName} + nodePool := instances.NewNodePool(fakeIGs, defaultZone) + nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}}) + healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) + healthChecker.Init(&healthchecks.FakeHealthCheckGetter{nil}) + backendPool := backends.NewBackendPool( fakeBackends, healthChecker, nodePool, namer, []int64{}, false) diff --git a/controllers/gce/controller/util_test.go b/controllers/gce/controller/util_test.go new file mode 100644 index 000000000..dc6c76bf1 --- /dev/null +++ b/controllers/gce/controller/util_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/sets" +) + +func TestZoneListing(t *testing.T) { + cm := NewFakeClusterManager(DefaultClusterUID) + lbc := newLoadBalancerController(t, cm, "") + zoneToNode := map[string][]string{ + "zone-1": {"n1"}, + "zone-2": {"n2"}, + } + addNodes(lbc, zoneToNode) + zones, err := lbc.tr.ListZones() + if err != nil { + t.Errorf("Failed to list zones: %v", err) + } + for expectedZone := range zoneToNode { + found := false + for _, gotZone := range zones { + if gotZone == expectedZone { + found = true + } + } + if !found { + t.Fatalf("Expected zones %v; Got zones %v", zoneToNode, zones) + } + } +} + +func TestInstancesAddedToZones(t *testing.T) { + cm := NewFakeClusterManager(DefaultClusterUID) + lbc := newLoadBalancerController(t, cm, "") + zoneToNode := map[string][]string{ + "zone-1": {"n1", "n2"}, + "zone-2": {"n3"}, + } + addNodes(lbc, zoneToNode) + + // Create 2 igs, one per zone. + testIG := "test-ig" + testPort := int64(3001) + lbc.CloudClusterManager.instancePool.AddInstanceGroup(testIG, testPort) + + // node pool syncs kube-nodes, this will add them to both igs. + lbc.CloudClusterManager.instancePool.Sync([]string{"n1", "n2", "n3"}) + gotZonesToNode := cm.fakeIGs.GetInstancesByZone() + + i := 0 + for z, nodeNames := range zoneToNode { + if ig, err := cm.fakeIGs.GetInstanceGroup(testIG, z); err != nil { + t.Errorf("Failed to find ig %v in zone %v, found %+v: %v", testIG, z, ig, err) + } + if cm.fakeIGs.Ports[i] != testPort { + t.Errorf("Expected the same node port on all igs, got ports %+v", cm.fakeIGs.Ports) + } + expNodes := sets.NewString(nodeNames...) + gotNodes := sets.NewString(gotZonesToNode[z]...) + if !gotNodes.Equal(expNodes) { + t.Errorf("Nodes not added to zones, expected %+v got %+v", expNodes, gotNodes) + } + i++ + } +} + +func TestProbeGetter(t *testing.T) { + cm := NewFakeClusterManager(DefaultClusterUID) + lbc := newLoadBalancerController(t, cm, "") + nodePortToHealthCheck := map[int64]string{ + 3001: "/healthz", + 3002: "/foo", + } + addPods(lbc, nodePortToHealthCheck) + for p, exp := range nodePortToHealthCheck { + got, err := lbc.tr.HealthCheck(p) + if err != nil { + t.Errorf("Failed to get health check for node port %v: %v", p, err) + } else if got.RequestPath != exp { + t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + } + } +} + +func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string) { + for np, u := range nodePortToHealthCheck { + l := map[string]string{fmt.Sprintf("app-%d", np): "test"} + svc := &api.Service{ + Spec: api.ServiceSpec{ + Selector: l, + Ports: []api.ServicePort{ + { + NodePort: int32(np), + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, + }, + } + svc.Name = fmt.Sprintf("%d", np) + lbc.svcLister.Store.Add(svc) + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Labels: l, + Name: fmt.Sprintf("%d", np), + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Ports: []api.ContainerPort{{ContainerPort: 80}}, + ReadinessProbe: &api.Probe{ + Handler: api.Handler{ + HTTPGet: &api.HTTPGetAction{ + Path: u, + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, + }, + }, + }, + }, + } + lbc.podLister.Indexer.Add(pod) + } +} + +func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { + for zone, nodes := range zoneToNode { + for _, node := range nodes { + n := &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: node, + Labels: map[string]string{ + zoneKey: zone, + }, + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + {Type: api.NodeReady, Status: api.ConditionTrue}, + }, + }, + } + lbc.nodeLister.Store.Add(n) + } + } + lbc.CloudClusterManager.instancePool.Init(lbc.tr) +} diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 274ea02c6..68f158429 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -18,6 +18,7 @@ package controller import ( "fmt" + "sort" "strconv" "time" @@ -376,6 +377,10 @@ func (t *GCETranslator) getHTTPProbe(l map[string]string, targetPort intstr.IntO if err != nil { return nil, err } + + // If multiple endpoints have different health checks, take the first + sort.Sort(PodsByCreationTimestamp(pl)) + for _, pod := range pl { logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort) for _, c := range pod.Spec.Containers { @@ -387,10 +392,9 @@ func (t *GCETranslator) getHTTPProbe(l map[string]string, targetPort intstr.IntO if isPortEqual(cPort, targetPort) { if isPortEqual(c.ReadinessProbe.Handler.HTTPGet.Port, targetPort) { return c.ReadinessProbe, nil - } else { - glog.Infof("%v: found matching targetPort on container %v, but not on readinessProbe (%+v)", - logStr, c.Name, c.ReadinessProbe.Handler.HTTPGet.Port) } + glog.Infof("%v: found matching targetPort on container %v, but not on readinessProbe (%+v)", + logStr, c.Name, c.ReadinessProbe.Handler.HTTPGet.Port) } } } @@ -437,19 +441,18 @@ func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error } } } - return &compute.HttpHealthCheck{ - Port: port, - // Empty string is used as a signal to the caller to use the appropriate - // default. - RequestPath: "", - Description: "Default kubernetes L7 Loadbalancing health check.", - // How often to health check. - CheckIntervalSec: 1, - // How long to wait before claiming failure of a health check. - TimeoutSec: 1, - // Number of healthchecks to pass for a vm to be deemed healthy. - HealthyThreshold: 1, - // Number of healthchecks to fail before the vm is deemed unhealthy. - UnhealthyThreshold: 10, - }, nil + return utils.DefaultHealthCheckTemplate(port), nil +} + +// PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker. +type PodsByCreationTimestamp []*api.Pod + +func (o PodsByCreationTimestamp) Len() int { return len(o) } +func (o PodsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o PodsByCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) } diff --git a/controllers/gce/healthchecks/fakes.go b/controllers/gce/healthchecks/fakes.go index 51fc07174..83083b048 100644 --- a/controllers/gce/healthchecks/fakes.go +++ b/controllers/gce/healthchecks/fakes.go @@ -20,6 +20,7 @@ import ( "fmt" compute "google.golang.org/api/compute/v1" + "k8s.io/contrib/ingress/controllers/gce/utils" ) // NewFakeHealthChecks returns a new FakeHealthChecks. @@ -27,6 +28,20 @@ func NewFakeHealthChecks() *FakeHealthChecks { return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}} } +// FakeHealthCheckGetter implements the healthCheckGetter interface for tests. +type FakeHealthCheckGetter struct { + DefaultHealthCheck *compute.HttpHealthCheck +} + +// HealthCheck returns the health check for the given port. If a health check +// isn't stored under the DefaultHealthCheck member, it constructs one. +func (h *FakeHealthCheckGetter) HealthCheck(port int64) (*compute.HttpHealthCheck, error) { + if h.DefaultHealthCheck == nil { + return utils.DefaultHealthCheckTemplate(port), nil + } + return h.DefaultHealthCheck, nil +} + // FakeHealthChecks fakes out health checks. type FakeHealthChecks struct { hc []*compute.HttpHealthCheck @@ -66,4 +81,21 @@ func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error { return nil } -func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { return nil } +// UpdateHttpHealthCheck sends the given health check as an update. +func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { + healthChecks := []*compute.HttpHealthCheck{} + found := false + for _, h := range f.hc { + if h.Name == name { + healthChecks = append(healthChecks, hc) + found = true + } else { + healthChecks = append(healthChecks, h) + } + } + if !found { + return fmt.Errorf("Cannot update a non-existent health check %v", hc.Name) + } + f.hc = healthChecks + return nil +} diff --git a/controllers/gce/healthchecks/healthchecks.go b/controllers/gce/healthchecks/healthchecks.go index fdd97427b..a62eba8fa 100644 --- a/controllers/gce/healthchecks/healthchecks.go +++ b/controllers/gce/healthchecks/healthchecks.go @@ -32,11 +32,6 @@ type HealthChecks struct { healthCheckGetter } -type healthCheckGetter interface { - // HealthCheck returns the HTTP readiness check for a node port. - HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error) -} - // NewHealthChecker creates a new health checker. // cloud: the cloud object implementing SingleHealthCheck. // defaultHealthCheckPath: is the HTTP path to use for health checks. diff --git a/controllers/gce/healthchecks/interfaces.go b/controllers/gce/healthchecks/interfaces.go index 6e04bafc6..dca9c6f7f 100644 --- a/controllers/gce/healthchecks/interfaces.go +++ b/controllers/gce/healthchecks/interfaces.go @@ -20,6 +20,12 @@ import ( compute "google.golang.org/api/compute/v1" ) +// healthCheckGetter retrieves health checks. +type healthCheckGetter interface { + // HealthCheck returns the HTTP readiness check for a node port. + HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error) +} + // SingleHealthCheck is an interface to manage a single GCE health check. type SingleHealthCheck interface { CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error diff --git a/controllers/gce/instances/fakes.go b/controllers/gce/instances/fakes.go index 7c99f0519..f3a39178b 100644 --- a/controllers/gce/instances/fakes.go +++ b/controllers/gce/instances/fakes.go @@ -27,30 +27,50 @@ import ( // NewFakeInstanceGroups creates a new FakeInstanceGroups. func NewFakeInstanceGroups(nodes sets.String) *FakeInstanceGroups { return &FakeInstanceGroups{ - instances: nodes, - listResult: getInstanceList(nodes), - namer: utils.Namer{}, + instances: nodes, + listResult: getInstanceList(nodes), + namer: utils.Namer{}, + zonesToInstances: map[string][]string{}, } } // InstanceGroup fakes +// FakeZoneLister records zones for nodes. +type FakeZoneLister struct { + Zones []string +} + +// ListZones returns the list of zones. +func (z *FakeZoneLister) ListZones() ([]string, error) { + return z.Zones, nil +} + +// GetZoneForNode returns the only zone stored in the fake zone lister. +func (z *FakeZoneLister) GetZoneForNode(name string) (string, error) { + // TODO: evolve as required, it's currently needed just to satisfy the + // interface in unittests that don't care about zones. See unittests in + // controller/util_test for actual zoneLister testing. + return z.Zones[0], nil +} + // FakeInstanceGroups fakes out the instance groups api. type FakeInstanceGroups struct { - instances sets.String - instanceGroups []*compute.InstanceGroup - Ports []int64 - getResult *compute.InstanceGroup - listResult *compute.InstanceGroupsListInstances - calls []int - namer utils.Namer + instances sets.String + instanceGroups []*compute.InstanceGroup + Ports []int64 + getResult *compute.InstanceGroup + listResult *compute.InstanceGroupsListInstances + calls []int + namer utils.Namer + zonesToInstances map[string][]string } // GetInstanceGroup fakes getting an instance group from the cloud. func (f *FakeInstanceGroups) GetInstanceGroup(name, zone string) (*compute.InstanceGroup, error) { f.calls = append(f.calls, utils.Get) for _, ig := range f.instanceGroups { - if ig.Name == name { + if ig.Name == name && ig.Zone == zone { return ig, nil } } @@ -60,7 +80,7 @@ func (f *FakeInstanceGroups) GetInstanceGroup(name, zone string) (*compute.Insta // CreateInstanceGroup fakes instance group creation. func (f *FakeInstanceGroups) CreateInstanceGroup(name, zone string) (*compute.InstanceGroup, error) { - newGroup := &compute.InstanceGroup{Name: name, SelfLink: name} + newGroup := &compute.InstanceGroup{Name: name, SelfLink: name, Zone: zone} f.instanceGroups = append(f.instanceGroups, newGroup) return newGroup, nil } @@ -92,13 +112,35 @@ func (f *FakeInstanceGroups) ListInstancesInInstanceGroup(name, zone string, sta func (f *FakeInstanceGroups) AddInstancesToInstanceGroup(name, zone string, instanceNames []string) error { f.calls = append(f.calls, utils.AddInstances) f.instances.Insert(instanceNames...) + if _, ok := f.zonesToInstances[zone]; !ok { + f.zonesToInstances[zone] = []string{} + } + f.zonesToInstances[zone] = append(f.zonesToInstances[zone], instanceNames...) return nil } +// GetInstancesByZone returns the zone to instances map. +func (f *FakeInstanceGroups) GetInstancesByZone() map[string][]string { + return f.zonesToInstances +} + // RemoveInstancesFromInstanceGroup fakes removing instances from an instance group. func (f *FakeInstanceGroups) RemoveInstancesFromInstanceGroup(name, zone string, instanceNames []string) error { f.calls = append(f.calls, utils.RemoveInstances) f.instances.Delete(instanceNames...) + l, ok := f.zonesToInstances[zone] + if !ok { + return nil + } + newIns := []string{} + delIns := sets.NewString(instanceNames...) + for _, oldIns := range l { + if delIns.Has(oldIns) { + continue + } + newIns = append(newIns, oldIns) + } + f.zonesToInstances[zone] = newIns return nil } diff --git a/controllers/gce/instances/instances.go b/controllers/gce/instances/instances.go index 54f17c33f..7ca045e6e 100644 --- a/controllers/gce/instances/instances.go +++ b/controllers/gce/instances/instances.go @@ -50,6 +50,9 @@ func NewNodePool(cloud InstanceGroups, defaultZone string) NodePool { return &Instances{cloud, storage.NewInMemoryPool(), nil} } +// Init initializes the instance pool. The given zoneLister is used to list +// all zones that require an instance group, and to lookup which zone a +// given Kubernetes node is in so we can add it to the right instance group. func (i *Instances) Init(zl zoneLister) { i.zoneLister = zl } @@ -191,7 +194,7 @@ func (i *Instances) Remove(groupName string, names []string) error { // Sync syncs kubernetes instances with the instances in the instance group. func (i *Instances) Sync(nodes []string) (err error) { - glog.V(1).Infof("Syncing nodes %v", nodes) + glog.V(4).Infof("Syncing nodes %v", nodes) defer func() { // The node pool is only responsible for syncing nodes to instance @@ -207,9 +210,9 @@ func (i *Instances) Sync(nodes []string) (err error) { }() pool := i.snapshotter.Snapshot() - for name := range pool { + for igName := range pool { gceNodes := sets.NewString() - gceNodes, err = i.list(name) + gceNodes, err = i.list(igName) if err != nil { return err } @@ -223,14 +226,14 @@ func (i *Instances) Sync(nodes []string) (err error) { addNodes := kubeNodes.Difference(gceNodes).List() if len(removeNodes) != 0 { if err = i.Remove( - name, gceNodes.Difference(kubeNodes).List()); err != nil { + igName, gceNodes.Difference(kubeNodes).List()); err != nil { return err } } if len(addNodes) != 0 { if err = i.Add( - name, kubeNodes.Difference(gceNodes).List()); err != nil { + igName, kubeNodes.Difference(gceNodes).List()); err != nil { return err } } diff --git a/controllers/gce/instances/instances_test.go b/controllers/gce/instances/instances_test.go index 5e51c5c2d..a20caf8b5 100644 --- a/controllers/gce/instances/instances_test.go +++ b/controllers/gce/instances/instances_test.go @@ -24,10 +24,16 @@ import ( const defaultZone = "default-zone" +func newNodePool(f *FakeInstanceGroups, zone string) NodePool { + pool := NewNodePool(f, zone) + pool.Init(&FakeZoneLister{[]string{zone}}) + return pool +} + func TestNodePoolSync(t *testing.T) { f := NewFakeInstanceGroups(sets.NewString( []string{"n1", "n2"}...)) - pool := NewNodePool(f, defaultZone) + pool := newNodePool(f, defaultZone) pool.AddInstanceGroup("test", 80) // KubeNodes: n1 @@ -46,7 +52,7 @@ func TestNodePoolSync(t *testing.T) { // Try to add n2 to the instance group. f = NewFakeInstanceGroups(sets.NewString([]string{"n1"}...)) - pool = NewNodePool(f, defaultZone) + pool = newNodePool(f, defaultZone) pool.AddInstanceGroup("test", 80) f.calls = []int{} @@ -62,7 +68,7 @@ func TestNodePoolSync(t *testing.T) { // Do nothing. f = NewFakeInstanceGroups(sets.NewString([]string{"n1", "n2"}...)) - pool = NewNodePool(f, defaultZone) + pool = newNodePool(f, defaultZone) pool.AddInstanceGroup("test", 80) f.calls = []int{} diff --git a/controllers/gce/loadbalancers/loadbalancers_test.go b/controllers/gce/loadbalancers/loadbalancers_test.go index 795cf2c89..84f08c8cf 100644 --- a/controllers/gce/loadbalancers/loadbalancers_test.go +++ b/controllers/gce/loadbalancers/loadbalancers_test.go @@ -38,8 +38,11 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { fakeHCs := healthchecks.NewFakeHealthChecks() namer := &utils.Namer{} healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) + healthChecker.Init(&healthchecks.FakeHealthCheckGetter{nil}) + nodePool := instances.NewNodePool(fakeIGs, defaultZone) + nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}}) backendPool := backends.NewBackendPool( - fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer, []int64{}, false) + fakeBackends, healthChecker, nodePool, namer, []int64{}, false) return NewLoadBalancerPool(f, backendPool, testDefaultBeNodePort, namer) } diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index b0a6c5145..930c2a2fd 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -238,3 +238,22 @@ func CompareLinks(l1, l2 string) bool { // FakeIngressRuleValueMap is a convenience type used by multiple submodules // that share the same testing methods. type FakeIngressRuleValueMap map[string]string + +// DefaultHealthCheckTemplate simply returns the default health check template. +func DefaultHealthCheckTemplate(port int64) *compute.HttpHealthCheck { + return &compute.HttpHealthCheck{ + Port: port, + // Empty string is used as a signal to the caller to use the appropriate + // default. + RequestPath: "", + Description: "Default kubernetes L7 Loadbalancing health check.", + // How often to health check. + CheckIntervalSec: 1, + // How long to wait before claiming failure of a health check. + TimeoutSec: 1, + // Number of healthchecks to pass for a vm to be deemed healthy. + HealthyThreshold: 1, + // Number of healthchecks to fail before the vm is deemed unhealthy. + UnhealthyThreshold: 10, + } +} From 9abd4e0ea6bb615938153c1ce8b451a5198a64e6 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Tue, 31 May 2016 19:47:37 -0700 Subject: [PATCH 4/6] append / to healthcheck url --- controllers/gce/controller/utils.go | 4 ++++ controllers/gce/healthchecks/fakes.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 68f158429..9eb52a3ad 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -425,6 +425,10 @@ func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error break } healthPath := rp.Handler.HTTPGet.Path + // GCE requires a leading "/" for health check urls. + if string(healthPath[0]) != "/" { + healthPath = fmt.Sprintf("/%v", healthPath) + } host := rp.Handler.HTTPGet.Host glog.Infof("Found custom health check for Service %v nodeport %v: %v%v", s.Name, port, host, healthPath) return &compute.HttpHealthCheck{ diff --git a/controllers/gce/healthchecks/fakes.go b/controllers/gce/healthchecks/fakes.go index 83083b048..10a8c8791 100644 --- a/controllers/gce/healthchecks/fakes.go +++ b/controllers/gce/healthchecks/fakes.go @@ -86,7 +86,7 @@ func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) er healthChecks := []*compute.HttpHealthCheck{} found := false for _, h := range f.hc { - if h.Name == name { + if h.Name == hc.Name { healthChecks = append(healthChecks, hc) found = true } else { From 3bed62f51e480c84028f2e849e0504b205451a28 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Thu, 2 Jun 2016 10:30:13 -0700 Subject: [PATCH 5/6] Forget oldSSL cert after first cleanup --- controllers/gce/controller/cluster_manager.go | 1 + controllers/gce/loadbalancers/loadbalancers.go | 1 + 2 files changed, 2 insertions(+) diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 5e7c15f93..85ca39263 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -165,6 +165,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName if err := c.firewallPool.Sync(fwNodePorts, nodeNames); err != nil { return err } + return nil } diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index 0047db957..fab810d89 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -344,6 +344,7 @@ func (l *L7) deleteOldSSLCert() (err error) { return err } } + l.oldSSLCert = nil return nil } From 61558f4d19d977bf07b14873abc339ef271a6c0b Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Fri, 3 Jun 2016 11:22:56 -0700 Subject: [PATCH 6/6] Get rid of default-zone everywhere. --- controllers/gce/backends/backends_test.go | 16 ++++++++++++---- controllers/gce/controller/cluster_manager.go | 6 +----- controllers/gce/controller/fakes.go | 2 +- controllers/gce/instances/instances.go | 4 +++- controllers/gce/instances/instances_test.go | 2 +- .../gce/loadbalancers/loadbalancers_test.go | 6 +++--- 6 files changed, 21 insertions(+), 15 deletions(-) diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index db1991fef..65ffbbb7e 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -27,10 +27,12 @@ import ( "k8s.io/kubernetes/pkg/util/sets" ) +const defaultZone = "zone-a" + func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool { namer := &utils.Namer{} - nodePool := instances.NewNodePool(fakeIGs, "default-zone") - nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}}) + nodePool := instances.NewNodePool(fakeIGs) + nodePool.Init(&instances.FakeZoneLister{[]string{defaultZone}}) healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer) healthChecks.Init(&healthchecks.FakeHealthCheckGetter{nil}) return NewBackendPool( @@ -82,8 +84,14 @@ func TestBackendPoolAdd(t *testing.T) { t.Fatalf("Unexpected create for existing backend service") } } - gotBackend, _ := f.GetBackendService(beName) - gotGroup, _ := fakeIGs.GetInstanceGroup(namer.IGName(), "default-zone") + gotBackend, err := f.GetBackendService(beName) + if err != nil { + t.Fatalf("Failed to find a backend with name %v: %v", beName, err) + } + gotGroup, err := fakeIGs.GetInstanceGroup(namer.IGName(), defaultZone) + if err != nil { + t.Fatalf("Failed to find instance group %v", namer.IGName()) + } if gotBackend.Backends[0].Group != gotGroup.SelfLink { t.Fatalf( "Broken instance group link: %v %v", diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 85ca39263..2e1057b39 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -261,13 +261,9 @@ func NewClusterManager( // Names are fundamental to the cluster, the uid allocator makes sure names don't collide. cluster := ClusterManager{ClusterNamer: &utils.Namer{name}} - zone, err := cloud.GetZone() - if err != nil { - return nil, err - } // NodePool stores GCE vms that are in this Kubernetes cluster. - cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain) + cluster.instancePool = instances.NewNodePool(cloud) // BackendPool creates GCE BackendServices and associated health checks. healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer) diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index db783ca0d..a2d8bb632 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -50,7 +50,7 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager { fakeHCs := healthchecks.NewFakeHealthChecks() namer := &utils.Namer{clusterName} - nodePool := instances.NewNodePool(fakeIGs, defaultZone) + nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}}) healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) diff --git a/controllers/gce/instances/instances.go b/controllers/gce/instances/instances.go index 7ca045e6e..b49a6df79 100644 --- a/controllers/gce/instances/instances.go +++ b/controllers/gce/instances/instances.go @@ -46,7 +46,7 @@ type Instances struct { // NewNodePool creates a new node pool. // - cloud: implements InstanceGroups, used to sync Kubernetes nodes with // members of the cloud InstanceGroup. -func NewNodePool(cloud InstanceGroups, defaultZone string) NodePool { +func NewNodePool(cloud InstanceGroups) NodePool { return &Instances{cloud, storage.NewInMemoryPool(), nil} } @@ -146,6 +146,8 @@ func (i *Instances) Get(name, zone string) (*compute.InstanceGroup, error) { return ig, nil } +// splitNodesByZones takes a list of node names and returns a map of zone:node names. +// It figures out the zones by asking the zoneLister. func (i *Instances) splitNodesByZone(names []string) map[string][]string { nodesByZone := map[string][]string{} for _, name := range names { diff --git a/controllers/gce/instances/instances_test.go b/controllers/gce/instances/instances_test.go index a20caf8b5..515e22a6f 100644 --- a/controllers/gce/instances/instances_test.go +++ b/controllers/gce/instances/instances_test.go @@ -25,7 +25,7 @@ import ( const defaultZone = "default-zone" func newNodePool(f *FakeInstanceGroups, zone string) NodePool { - pool := NewNodePool(f, zone) + pool := NewNodePool(f) pool.Init(&FakeZoneLister{[]string{zone}}) return pool } diff --git a/controllers/gce/loadbalancers/loadbalancers_test.go b/controllers/gce/loadbalancers/loadbalancers_test.go index 84f08c8cf..ef5bcaae3 100644 --- a/controllers/gce/loadbalancers/loadbalancers_test.go +++ b/controllers/gce/loadbalancers/loadbalancers_test.go @@ -29,7 +29,7 @@ import ( const ( testDefaultBeNodePort = int64(3000) - defaultZone = "default-zone" + defaultZone = "zone-a" ) func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { @@ -39,8 +39,8 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { namer := &utils.Namer{} healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) healthChecker.Init(&healthchecks.FakeHealthCheckGetter{nil}) - nodePool := instances.NewNodePool(fakeIGs, defaultZone) - nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}}) + nodePool := instances.NewNodePool(fakeIGs) + nodePool.Init(&instances.FakeZoneLister{[]string{defaultZone}}) backendPool := backends.NewBackendPool( fakeBackends, healthChecker, nodePool, namer, []int64{}, false) return NewLoadBalancerPool(f, backendPool, testDefaultBeNodePort, namer)