Merge pull request #1133 from bprashanth/ubernetes_healthchecks

Ubernetes multizone and custom healthchecks
This commit is contained in:
Prashanth B 2016-06-09 13:17:38 -07:00 committed by GitHub
commit b395b714be
22 changed files with 753 additions and 189 deletions

View file

@ -107,9 +107,9 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
return be, nil return be, nil
} }
func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) {
// Create a new health check // Create a new health check
if err := b.healthChecker.Add(namedPort.Port, ""); err != nil { if err := b.healthChecker.Add(namedPort.Port); err != nil {
return nil, err return nil, err
} }
hc, err := b.healthChecker.Get(namedPort.Port) hc, err := b.healthChecker.Get(namedPort.Port)
@ -120,11 +120,7 @@ func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPor
backend := &compute.BackendService{ backend := &compute.BackendService{
Name: name, Name: name,
Protocol: "HTTP", Protocol: "HTTP",
Backends: []*compute.Backend{ Backends: getBackendsForIGs(igs),
{
Group: ig.SelfLink,
},
},
// Api expects one, means little to kubernetes. // Api expects one, means little to kubernetes.
HealthChecks: []string{hc.SelfLink}, HealthChecks: []string{hc.SelfLink},
Port: namedPort.Port, Port: namedPort.Port,
@ -143,20 +139,24 @@ func (b *Backends) Add(port int64) error {
be := &compute.BackendService{} be := &compute.BackendService{}
defer func() { b.snapshotter.Add(portKey(port), be) }() defer func() { b.snapshotter.Add(portKey(port), be) }()
ig, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port) igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port)
if err != nil { if err != nil {
return err return err
} }
be, _ = b.Get(port) be, _ = b.Get(port)
if be == nil { if be == nil {
glog.Infof("Creating backend for instance group %v port %v named port %v", glog.Infof("Creating backend for %d instance groups, port %v named port %v",
ig.Name, port, namedPort) len(igs), port, namedPort)
be, err = b.create(ig, namedPort, b.namer.BeName(port)) be, err = b.create(igs, namedPort, b.namer.BeName(port))
if err != nil { if err != nil {
return err return err
} }
} }
if err := b.edgeHop(be, ig); err != nil { // we won't find any igs till the node pool syncs nodes.
if len(igs) == 0 {
return nil
}
if err := b.edgeHop(be, igs); err != nil {
return err return err
} }
return err return err
@ -201,18 +201,31 @@ func (b *Backends) List() ([]interface{}, error) {
return interList, nil return interList, nil
} }
func getBackendsForIGs(igs []*compute.InstanceGroup) []*compute.Backend {
backends := []*compute.Backend{}
for _, ig := range igs {
backends = append(backends, &compute.Backend{Group: ig.SelfLink})
}
return backends
}
// edgeHop checks the links of the given backend by executing an edge hop. // edgeHop checks the links of the given backend by executing an edge hop.
// It fixes broken links. // It fixes broken links.
func (b *Backends) edgeHop(be *compute.BackendService, ig *compute.InstanceGroup) error { func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error {
if len(be.Backends) == 1 && beIGs := sets.String{}
utils.CompareLinks(be.Backends[0].Group, ig.SelfLink) { for _, beToIG := range be.Backends {
beIGs.Insert(beToIG.Group)
}
igLinks := sets.String{}
for _, igToBE := range igs {
igLinks.Insert(igToBE.SelfLink)
}
if igLinks.Equal(beIGs) {
return nil return nil
} }
glog.Infof("Backend %v has a broken edge, adding link to %v", glog.Infof("Backend %v has a broken edge, expected igs %+v, current igs %+v",
be.Name, ig.Name) be.Name, igLinks.List(), beIGs.List())
be.Backends = []*compute.Backend{ be.Backends = getBackendsForIGs(igs)
{Group: ig.SelfLink},
}
if err := b.cloud.UpdateBackendService(be); err != nil { if err := b.cloud.UpdateBackendService(be); err != nil {
return err return err
} }

View file

@ -27,12 +27,16 @@ import (
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
const defaultZone = "zone-a"
func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool { func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool {
namer := &utils.Namer{} namer := &utils.Namer{}
nodePool := instances.NewNodePool(fakeIGs)
nodePool.Init(&instances.FakeZoneLister{[]string{defaultZone}})
healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer)
healthChecks.Init(&healthchecks.FakeHealthCheckGetter{nil})
return NewBackendPool( return NewBackendPool(
f, f, healthChecks, nodePool, namer, []int64{}, syncWithCloud)
healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer),
instances.NewNodePool(fakeIGs, "default-zone"), namer, []int64{}, syncWithCloud)
} }
func TestBackendPoolAdd(t *testing.T) { func TestBackendPoolAdd(t *testing.T) {
@ -80,8 +84,14 @@ func TestBackendPoolAdd(t *testing.T) {
t.Fatalf("Unexpected create for existing backend service") t.Fatalf("Unexpected create for existing backend service")
} }
} }
gotBackend, _ := f.GetBackendService(beName) gotBackend, err := f.GetBackendService(beName)
gotGroup, _ := fakeIGs.GetInstanceGroup(namer.IGName(), "default-zone") if err != nil {
t.Fatalf("Failed to find a backend with name %v: %v", beName, err)
}
gotGroup, err := fakeIGs.GetInstanceGroup(namer.IGName(), defaultZone)
if err != nil {
t.Fatalf("Failed to find instance group %v", namer.IGName())
}
if gotBackend.Backends[0].Group != gotGroup.SelfLink { if gotBackend.Backends[0].Group != gotGroup.SelfLink {
t.Fatalf( t.Fatalf(
"Broken instance group link: %v %v", "Broken instance group link: %v %v",

View file

@ -99,47 +99,3 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput
return &compute.BackendServiceGroupHealth{ return &compute.BackendServiceGroupHealth{
HealthStatus: states}, nil HealthStatus: states}, nil
} }
// NewFakeHealthChecks returns a health check fake.
func NewFakeHealthChecks() *FakeHealthChecks {
return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}}
}
// FakeHealthChecks fakes out health checks.
type FakeHealthChecks struct {
hc []*compute.HttpHealthCheck
}
// CreateHttpHealthCheck fakes health check creation.
func (f *FakeHealthChecks) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
f.hc = append(f.hc, hc)
return nil
}
// GetHttpHealthCheck fakes getting a http health check.
func (f *FakeHealthChecks) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) {
for _, h := range f.hc {
if h.Name == name {
return h, nil
}
}
return nil, fmt.Errorf("Health check %v not found.", name)
}
// DeleteHttpHealthCheck fakes deleting a http health check.
func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error {
healthChecks := []*compute.HttpHealthCheck{}
exists := false
for _, h := range f.hc {
if h.Name == name {
exists = true
continue
}
healthChecks = append(healthChecks, h)
}
if !exists {
return fmt.Errorf("Failed to find health check %v", name)
}
f.hc = healthChecks
return nil
}

View file

@ -42,17 +42,3 @@ type BackendServices interface {
ListBackendServices() (*compute.BackendServiceList, error) ListBackendServices() (*compute.BackendServiceList, error)
GetHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) GetHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error)
} }
// SingleHealthCheck is an interface to manage a single GCE health check.
type SingleHealthCheck interface {
CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error
DeleteHttpHealthCheck(name string) error
GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error)
}
// HealthChecker is an interface to manage cloud HTTPHealthChecks.
type HealthChecker interface {
Add(port int64, path string) error
Delete(port int64) error
Get(port int64) (*compute.HttpHealthCheck, error)
}

View file

@ -74,6 +74,23 @@ type ClusterManager struct {
backendPool backends.BackendPool backendPool backends.BackendPool
l7Pool loadbalancers.LoadBalancerPool l7Pool loadbalancers.LoadBalancerPool
firewallPool firewalls.SingleFirewallPool firewallPool firewalls.SingleFirewallPool
// TODO: Refactor so we simply init a health check pool.
// Currently health checks are tied to backends because each backend needs
// the link of the associated health, but both the backend pool and
// loadbalancer pool manage backends, because the lifetime of the default
// backend is tied to the last/first loadbalancer not the life of the
// nodeport service or Ingress.
healthCheckers []healthchecks.HealthChecker
}
// Init initializes the cluster manager.
func (c *ClusterManager) Init(tr *GCETranslator) {
c.instancePool.Init(tr)
for _, h := range c.healthCheckers {
h.Init(tr)
}
// TODO: Initialize other members as needed.
} }
// IsHealthy returns an error if the cluster manager is unhealthy. // IsHealthy returns an error if the cluster manager is unhealthy.
@ -217,7 +234,7 @@ func getGCEClient(config io.Reader) *gce.GCECloud {
// string passed to glbc via --gce-cluster-name. // string passed to glbc via --gce-cluster-name.
// - defaultBackendNodePort: is the node port of glbc's default backend. This is // - defaultBackendNodePort: is the node port of glbc's default backend. This is
// the kubernetes Service that serves the 404 page if no urls match. // the kubernetes Service that serves the 404 page if no urls match.
// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz" // - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz".
func NewClusterManager( func NewClusterManager(
configFilePath string, configFilePath string,
name string, name string,
@ -244,21 +261,20 @@ func NewClusterManager(
// Names are fundamental to the cluster, the uid allocator makes sure names don't collide. // Names are fundamental to the cluster, the uid allocator makes sure names don't collide.
cluster := ClusterManager{ClusterNamer: &utils.Namer{name}} cluster := ClusterManager{ClusterNamer: &utils.Namer{name}}
zone, err := cloud.GetZone()
if err != nil {
return nil, err
}
// NodePool stores GCE vms that are in this Kubernetes cluster. // NodePool stores GCE vms that are in this Kubernetes cluster.
cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain) cluster.instancePool = instances.NewNodePool(cloud)
// BackendPool creates GCE BackendServices and associated health checks. // BackendPool creates GCE BackendServices and associated health checks.
healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer) healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer)
// Loadbalancer pool manages the default backend and its health check.
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker}
// TODO: This needs to change to a consolidated management of the default backend. // TODO: This needs to change to a consolidated management of the default backend.
cluster.backendPool = backends.NewBackendPool( cluster.backendPool = backends.NewBackendPool(
cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true) cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true)
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
defaultBackendPool := backends.NewBackendPool( defaultBackendPool := backends.NewBackendPool(
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
cluster.defaultBackendNodePort = defaultBackendNodePort cluster.defaultBackendNodePort = defaultBackendNodePort

View file

@ -44,18 +44,25 @@ var (
// DefaultClusterUID is the uid to use for clusters resources created by an // DefaultClusterUID is the uid to use for clusters resources created by an
// L7 controller created without specifying the --cluster-uid flag. // L7 controller created without specifying the --cluster-uid flag.
DefaultClusterUID = "" DefaultClusterUID = ""
// Frequency to poll on local stores to sync.
storeSyncPollPeriod = 5 * time.Second
) )
// LoadBalancerController watches the kubernetes api and adds/removes services // LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig. // from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct { type LoadBalancerController struct {
client *client.Client client *client.Client
ingController *framework.Controller ingController *framework.Controller
nodeController *framework.Controller nodeController *framework.Controller
svcController *framework.Controller svcController *framework.Controller
ingLister StoreToIngressLister podController *framework.Controller
nodeLister cache.StoreToNodeLister ingLister StoreToIngressLister
svcLister cache.StoreToServiceLister nodeLister cache.StoreToNodeLister
svcLister cache.StoreToServiceLister
// Health checks are the readiness probes of containers on pods.
podLister cache.StoreToPodLister
// TODO: Watch secrets
CloudClusterManager *ClusterManager CloudClusterManager *ClusterManager
recorder record.EventRecorder recorder record.EventRecorder
nodeQueue *taskQueue nodeQueue *taskQueue
@ -69,6 +76,9 @@ type LoadBalancerController struct {
shutdown bool shutdown bool
// tlsLoader loads secrets from the Kubernetes apiserver for Ingresses. // tlsLoader loads secrets from the Kubernetes apiserver for Ingresses.
tlsLoader tlsLoader tlsLoader tlsLoader
// hasSynced returns true if all associated sub-controllers have synced.
// Abstracted into a func for testing.
hasSynced func() bool
} }
// NewLoadBalancerController creates a controller for gce loadbalancers. // NewLoadBalancerController creates a controller for gce loadbalancers.
@ -90,6 +100,7 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
} }
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes) lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
lbc.ingQueue = NewTaskQueue(lbc.sync) lbc.ingQueue = NewTaskQueue(lbc.sync)
lbc.hasSynced = lbc.storesSynced
// Ingress watch handlers // Ingress watch handlers
pathHandlers := framework.ResourceEventHandlerFuncs{ pathHandlers := framework.ResourceEventHandlerFuncs{
@ -130,12 +141,19 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
lbc.client, "services", namespace, fields.Everything()), lbc.client, "services", namespace, fields.Everything()),
&api.Service{}, resyncPeriod, svcHandlers) &api.Service{}, resyncPeriod, svcHandlers)
lbc.podLister.Indexer, lbc.podController = framework.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client, "pods", namespace, fields.Everything()),
&api.Pod{},
resyncPeriod,
framework.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nodeHandlers := framework.ResourceEventHandlerFuncs{ nodeHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: lbc.nodeQueue.enqueue, AddFunc: lbc.nodeQueue.enqueue,
DeleteFunc: lbc.nodeQueue.enqueue, DeleteFunc: lbc.nodeQueue.enqueue,
// Nodes are updated every 10s and we don't care, so no update handler. // Nodes are updated every 10s and we don't care, so no update handler.
} }
// Node watch handlers // Node watch handlers
lbc.nodeLister.Store, lbc.nodeController = framework.NewInformer( lbc.nodeLister.Store, lbc.nodeController = framework.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
@ -194,6 +212,7 @@ func (lbc *LoadBalancerController) Run() {
go lbc.ingController.Run(lbc.stopCh) go lbc.ingController.Run(lbc.stopCh)
go lbc.nodeController.Run(lbc.stopCh) go lbc.nodeController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh) go lbc.svcController.Run(lbc.stopCh)
go lbc.podController.Run(lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh)
go lbc.nodeQueue.run(time.Second, lbc.stopCh) go lbc.nodeQueue.run(time.Second, lbc.stopCh)
<-lbc.stopCh <-lbc.stopCh
@ -224,8 +243,29 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error {
return nil return nil
} }
// storesSynced returns true if all the sub-controllers have finished their
// first sync with apiserver.
func (lbc *LoadBalancerController) storesSynced() bool {
return (
// wait for pods to sync so we don't allocate a default health check when
// an endpoint has a readiness probe.
lbc.podController.HasSynced() &&
// wait for services so we don't thrash on backend creation.
lbc.svcController.HasSynced() &&
// wait for nodes so we don't disconnect a backend from an instance
// group just because we don't realize there are nodes in that zone.
lbc.nodeController.HasSynced() &&
// Wait for ingresses as a safety measure. We don't really need this.
lbc.ingController.HasSynced())
}
// sync manages Ingress create/updates/deletes. // sync manages Ingress create/updates/deletes.
func (lbc *LoadBalancerController) sync(key string) { func (lbc *LoadBalancerController) sync(key string) {
if !lbc.hasSynced() {
time.Sleep(storeSyncPollPeriod)
lbc.ingQueue.requeue(key, fmt.Errorf("Waiting for stores to sync"))
return
}
glog.V(3).Infof("Syncing %v", key) glog.V(3).Infof("Syncing %v", key)
paths, err := lbc.ingLister.List() paths, err := lbc.ingLister.List()

View file

@ -55,6 +55,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager, masterUrl s
if err != nil { if err != nil {
t.Fatalf("%v", err) t.Fatalf("%v", err)
} }
lb.hasSynced = func() bool { return true }
return lb return lb
} }

View file

@ -30,7 +30,6 @@ import (
const ( const (
testDefaultBeNodePort = int64(3000) testDefaultBeNodePort = int64(3000)
defaultZone = "default-zone"
) )
var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}
@ -50,8 +49,13 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
fakeHCs := healthchecks.NewFakeHealthChecks() fakeHCs := healthchecks.NewFakeHealthChecks()
namer := &utils.Namer{clusterName} namer := &utils.Namer{clusterName}
nodePool := instances.NewNodePool(fakeIGs, defaultZone)
nodePool := instances.NewNodePool(fakeIGs)
nodePool.Init(&instances.FakeZoneLister{[]string{"zone-a"}})
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
healthChecker.Init(&healthchecks.FakeHealthCheckGetter{nil})
backendPool := backends.NewBackendPool( backendPool := backends.NewBackendPool(
fakeBackends, fakeBackends,
healthChecker, nodePool, namer, []int64{}, false) healthChecker, nodePool, namer, []int64{}, false)

View file

@ -0,0 +1,174 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
)
func TestZoneListing(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
zoneToNode := map[string][]string{
"zone-1": {"n1"},
"zone-2": {"n2"},
}
addNodes(lbc, zoneToNode)
zones, err := lbc.tr.ListZones()
if err != nil {
t.Errorf("Failed to list zones: %v", err)
}
for expectedZone := range zoneToNode {
found := false
for _, gotZone := range zones {
if gotZone == expectedZone {
found = true
}
}
if !found {
t.Fatalf("Expected zones %v; Got zones %v", zoneToNode, zones)
}
}
}
func TestInstancesAddedToZones(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
zoneToNode := map[string][]string{
"zone-1": {"n1", "n2"},
"zone-2": {"n3"},
}
addNodes(lbc, zoneToNode)
// Create 2 igs, one per zone.
testIG := "test-ig"
testPort := int64(3001)
lbc.CloudClusterManager.instancePool.AddInstanceGroup(testIG, testPort)
// node pool syncs kube-nodes, this will add them to both igs.
lbc.CloudClusterManager.instancePool.Sync([]string{"n1", "n2", "n3"})
gotZonesToNode := cm.fakeIGs.GetInstancesByZone()
i := 0
for z, nodeNames := range zoneToNode {
if ig, err := cm.fakeIGs.GetInstanceGroup(testIG, z); err != nil {
t.Errorf("Failed to find ig %v in zone %v, found %+v: %v", testIG, z, ig, err)
}
if cm.fakeIGs.Ports[i] != testPort {
t.Errorf("Expected the same node port on all igs, got ports %+v", cm.fakeIGs.Ports)
}
expNodes := sets.NewString(nodeNames...)
gotNodes := sets.NewString(gotZonesToNode[z]...)
if !gotNodes.Equal(expNodes) {
t.Errorf("Nodes not added to zones, expected %+v got %+v", expNodes, gotNodes)
}
i++
}
}
func TestProbeGetter(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID)
lbc := newLoadBalancerController(t, cm, "")
nodePortToHealthCheck := map[int64]string{
3001: "/healthz",
3002: "/foo",
}
addPods(lbc, nodePortToHealthCheck)
for p, exp := range nodePortToHealthCheck {
got, err := lbc.tr.HealthCheck(p)
if err != nil {
t.Errorf("Failed to get health check for node port %v: %v", p, err)
} else if got.RequestPath != exp {
t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp)
}
}
}
func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string) {
for np, u := range nodePortToHealthCheck {
l := map[string]string{fmt.Sprintf("app-%d", np): "test"}
svc := &api.Service{
Spec: api.ServiceSpec{
Selector: l,
Ports: []api.ServicePort{
{
NodePort: int32(np),
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
},
}
svc.Name = fmt.Sprintf("%d", np)
lbc.svcLister.Store.Add(svc)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: l,
Name: fmt.Sprintf("%d", np),
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Ports: []api.ContainerPort{{ContainerPort: 80}},
ReadinessProbe: &api.Probe{
Handler: api.Handler{
HTTPGet: &api.HTTPGetAction{
Path: u,
Port: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
},
},
},
},
}
lbc.podLister.Indexer.Add(pod)
}
}
func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) {
for zone, nodes := range zoneToNode {
for _, node := range nodes {
n := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: node,
Labels: map[string]string{
zoneKey: zone,
},
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{Type: api.NodeReady, Status: api.ConditionTrue},
},
},
}
lbc.nodeLister.Store.Add(n)
}
}
lbc.CloudClusterManager.instancePool.Init(lbc.tr)
}

View file

@ -18,6 +18,7 @@ package controller
import ( import (
"fmt" "fmt"
"sort"
"strconv" "strconv"
"time" "time"
@ -27,7 +28,9 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
@ -37,6 +40,10 @@ import (
const ( const (
allowHTTPKey = "kubernetes.io/ingress.allow-http" allowHTTPKey = "kubernetes.io/ingress.allow-http"
staticIPNameKey = "kubernetes.io/ingress.global-static-ip-name" staticIPNameKey = "kubernetes.io/ingress.global-static-ip-name"
// Label key to denote which GCE zone a Kubernetes node is in.
zoneKey = "failure-domain.beta.kubernetes.io/zone"
defaultZone = ""
) )
// ingAnnotations represents Ingress annotations. // ingAnnotations represents Ingress annotations.
@ -315,3 +322,141 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 {
} }
return knownPorts return knownPorts
} }
func getZone(n api.Node) string {
zone, ok := n.Labels[zoneKey]
if !ok {
return defaultZone
}
return zone
}
// GetZoneForNode returns the zone for a given node by looking up its zone label.
func (t *GCETranslator) GetZoneForNode(name string) (string, error) {
nodes, err := t.nodeLister.NodeCondition(nodeReady).List()
if err != nil {
return "", err
}
for _, n := range nodes.Items {
if n.Name == name {
// TODO: Make this more resilient to label changes by listing
// cloud nodes and figuring out zone.
return getZone(n), nil
}
}
return "", fmt.Errorf("Node not found %v", name)
}
// ListZones returns a list of zones this Kubernetes cluster spans.
func (t *GCETranslator) ListZones() ([]string, error) {
zones := sets.String{}
readyNodes, err := t.nodeLister.NodeCondition(nodeReady).List()
if err != nil {
return zones.List(), err
}
for _, n := range readyNodes.Items {
zones.Insert(getZone(n))
}
return zones.List(), nil
}
// isPortEqual compares the given IntOrString ports
func isPortEqual(port, targetPort intstr.IntOrString) bool {
if targetPort.Type == intstr.Int {
return port.IntVal == targetPort.IntVal
}
return port.StrVal == targetPort.StrVal
}
// geHTTPProbe returns the http readiness probe from the first container
// that matches targetPort, from the set of pods matching the given labels.
func (t *GCETranslator) getHTTPProbe(l map[string]string, targetPort intstr.IntOrString) (*api.Probe, error) {
// Lookup any container with a matching targetPort from the set of pods
// with a matching label selector.
pl, err := t.podLister.List(labels.SelectorFromSet(labels.Set(l)))
if err != nil {
return nil, err
}
// If multiple endpoints have different health checks, take the first
sort.Sort(PodsByCreationTimestamp(pl))
for _, pod := range pl {
logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort)
for _, c := range pod.Spec.Containers {
if c.ReadinessProbe == nil || c.ReadinessProbe.Handler.HTTPGet == nil {
continue
}
for _, p := range c.Ports {
cPort := intstr.IntOrString{IntVal: p.ContainerPort, StrVal: p.Name}
if isPortEqual(cPort, targetPort) {
if isPortEqual(c.ReadinessProbe.Handler.HTTPGet.Port, targetPort) {
return c.ReadinessProbe, nil
}
glog.Infof("%v: found matching targetPort on container %v, but not on readinessProbe (%+v)",
logStr, c.Name, c.ReadinessProbe.Handler.HTTPGet.Port)
}
}
}
glog.V(4).Infof("%v: lacks a matching HTTP probe for use in health checks.", logStr)
}
return nil, nil
}
// HealthCheck returns the http readiness probe for the endpoint backing the
// given nodePort. If no probe is found it returns a health check with "" as
// the request path, callers are responsible for swapping this out for the
// appropriate default.
func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error) {
sl, err := t.svcLister.List()
if err != nil {
return nil, err
}
// Find the label and target port of the one service with the given nodePort
for _, s := range sl.Items {
for _, p := range s.Spec.Ports {
if int32(port) == p.NodePort {
rp, err := t.getHTTPProbe(s.Spec.Selector, p.TargetPort)
if err != nil {
return nil, err
}
if rp == nil {
glog.Infof("No pod in service %v with node port %v has declared a matching readiness probe for health checks.", s.Name, port)
break
}
healthPath := rp.Handler.HTTPGet.Path
// GCE requires a leading "/" for health check urls.
if string(healthPath[0]) != "/" {
healthPath = fmt.Sprintf("/%v", healthPath)
}
host := rp.Handler.HTTPGet.Host
glog.Infof("Found custom health check for Service %v nodeport %v: %v%v", s.Name, port, host, healthPath)
return &compute.HttpHealthCheck{
Port: port,
RequestPath: healthPath,
Host: host,
Description: "kubernetes L7 health check from readiness probe.",
CheckIntervalSec: int64(rp.PeriodSeconds),
TimeoutSec: int64(rp.TimeoutSeconds),
HealthyThreshold: int64(rp.SuccessThreshold),
UnhealthyThreshold: int64(rp.FailureThreshold),
// TODO: include headers after updating compute godep.
}, nil
}
}
}
return utils.DefaultHealthCheckTemplate(port), nil
}
// PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker.
type PodsByCreationTimestamp []*api.Pod
func (o PodsByCreationTimestamp) Len() int { return len(o) }
func (o PodsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o PodsByCreationTimestamp) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}

View file

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
compute "google.golang.org/api/compute/v1" compute "google.golang.org/api/compute/v1"
"k8s.io/contrib/ingress/controllers/gce/utils"
) )
// NewFakeHealthChecks returns a new FakeHealthChecks. // NewFakeHealthChecks returns a new FakeHealthChecks.
@ -27,6 +28,20 @@ func NewFakeHealthChecks() *FakeHealthChecks {
return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}} return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}}
} }
// FakeHealthCheckGetter implements the healthCheckGetter interface for tests.
type FakeHealthCheckGetter struct {
DefaultHealthCheck *compute.HttpHealthCheck
}
// HealthCheck returns the health check for the given port. If a health check
// isn't stored under the DefaultHealthCheck member, it constructs one.
func (h *FakeHealthCheckGetter) HealthCheck(port int64) (*compute.HttpHealthCheck, error) {
if h.DefaultHealthCheck == nil {
return utils.DefaultHealthCheckTemplate(port), nil
}
return h.DefaultHealthCheck, nil
}
// FakeHealthChecks fakes out health checks. // FakeHealthChecks fakes out health checks.
type FakeHealthChecks struct { type FakeHealthChecks struct {
hc []*compute.HttpHealthCheck hc []*compute.HttpHealthCheck
@ -65,3 +80,22 @@ func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error {
f.hc = healthChecks f.hc = healthChecks
return nil return nil
} }
// UpdateHttpHealthCheck sends the given health check as an update.
func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
healthChecks := []*compute.HttpHealthCheck{}
found := false
for _, h := range f.hc {
if h.Name == hc.Name {
healthChecks = append(healthChecks, hc)
found = true
} else {
healthChecks = append(healthChecks, h)
}
}
if !found {
return fmt.Errorf("Cannot update a non-existent health check %v", hc.Name)
}
f.hc = healthChecks
return nil
}

View file

@ -29,43 +29,47 @@ type HealthChecks struct {
cloud SingleHealthCheck cloud SingleHealthCheck
defaultPath string defaultPath string
namer *utils.Namer namer *utils.Namer
healthCheckGetter
} }
// NewHealthChecker creates a new health checker. // NewHealthChecker creates a new health checker.
// cloud: the cloud object implementing SingleHealthCheck. // cloud: the cloud object implementing SingleHealthCheck.
// defaultHealthCheckPath: is the HTTP path to use for health checks. // defaultHealthCheckPath: is the HTTP path to use for health checks.
func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker {
return &HealthChecks{cloud, defaultHealthCheckPath, namer} return &HealthChecks{cloud, defaultHealthCheckPath, namer, nil}
}
// Init initializes the health checker.
func (h *HealthChecks) Init(r healthCheckGetter) {
h.healthCheckGetter = r
} }
// Add adds a healthcheck if one for the same port doesn't already exist. // Add adds a healthcheck if one for the same port doesn't already exist.
func (h *HealthChecks) Add(port int64, path string) error { func (h *HealthChecks) Add(port int64) error {
hc, _ := h.Get(port) wantHC, err := h.healthCheckGetter.HealthCheck(port)
name := h.namer.BeName(port) if err != nil {
if path == "" { return err
path = h.defaultPath
} }
if wantHC.RequestPath == "" {
wantHC.RequestPath = h.defaultPath
}
name := h.namer.BeName(port)
wantHC.Name = name
hc, _ := h.Get(port)
if hc == nil { if hc == nil {
// TODO: check if the readiness probe has changed and update the
// health check.
glog.Infof("Creating health check %v", name) glog.Infof("Creating health check %v", name)
if err := h.cloud.CreateHttpHealthCheck( if err := h.cloud.CreateHttpHealthCheck(wantHC); err != nil {
&compute.HttpHealthCheck{ return err
Name: name, }
Port: port, } else if wantHC.RequestPath != hc.RequestPath {
RequestPath: path, // TODO: also compare headers interval etc.
Description: "Default kubernetes L7 Loadbalancing health check.", glog.Infof("Updating health check %v, path %v -> %v", name, hc.RequestPath, wantHC.RequestPath)
// How often to health check. if err := h.cloud.UpdateHttpHealthCheck(wantHC); err != nil {
CheckIntervalSec: 1,
// How long to wait before claiming failure of a health check.
TimeoutSec: 1,
// Number of healthchecks to pass for a vm to be deemed healthy.
HealthyThreshold: 1,
// Number of healthchecks to fail before the vm is deemed unhealthy.
UnhealthyThreshold: 10,
}); err != nil {
return err return err
} }
} else { } else {
// TODO: Does this health check need an edge hop?
glog.Infof("Health check %v already exists", hc.Name) glog.Infof("Health check %v already exists", hc.Name)
} }
return nil return nil

View file

@ -20,16 +20,25 @@ import (
compute "google.golang.org/api/compute/v1" compute "google.golang.org/api/compute/v1"
) )
// healthCheckGetter retrieves health checks.
type healthCheckGetter interface {
// HealthCheck returns the HTTP readiness check for a node port.
HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error)
}
// SingleHealthCheck is an interface to manage a single GCE health check. // SingleHealthCheck is an interface to manage a single GCE health check.
type SingleHealthCheck interface { type SingleHealthCheck interface {
CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error
UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error
DeleteHttpHealthCheck(name string) error DeleteHttpHealthCheck(name string) error
GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error)
} }
// HealthChecker is an interface to manage cloud HTTPHealthChecks. // HealthChecker is an interface to manage cloud HTTPHealthChecks.
type HealthChecker interface { type HealthChecker interface {
Add(port int64, path string) error Init(h healthCheckGetter)
Add(port int64) error
Delete(port int64) error Delete(port int64) error
Get(port int64) (*compute.HttpHealthCheck, error) Get(port int64) (*compute.HttpHealthCheck, error)
} }

View file

@ -27,30 +27,50 @@ import (
// NewFakeInstanceGroups creates a new FakeInstanceGroups. // NewFakeInstanceGroups creates a new FakeInstanceGroups.
func NewFakeInstanceGroups(nodes sets.String) *FakeInstanceGroups { func NewFakeInstanceGroups(nodes sets.String) *FakeInstanceGroups {
return &FakeInstanceGroups{ return &FakeInstanceGroups{
instances: nodes, instances: nodes,
listResult: getInstanceList(nodes), listResult: getInstanceList(nodes),
namer: utils.Namer{}, namer: utils.Namer{},
zonesToInstances: map[string][]string{},
} }
} }
// InstanceGroup fakes // InstanceGroup fakes
// FakeZoneLister records zones for nodes.
type FakeZoneLister struct {
Zones []string
}
// ListZones returns the list of zones.
func (z *FakeZoneLister) ListZones() ([]string, error) {
return z.Zones, nil
}
// GetZoneForNode returns the only zone stored in the fake zone lister.
func (z *FakeZoneLister) GetZoneForNode(name string) (string, error) {
// TODO: evolve as required, it's currently needed just to satisfy the
// interface in unittests that don't care about zones. See unittests in
// controller/util_test for actual zoneLister testing.
return z.Zones[0], nil
}
// FakeInstanceGroups fakes out the instance groups api. // FakeInstanceGroups fakes out the instance groups api.
type FakeInstanceGroups struct { type FakeInstanceGroups struct {
instances sets.String instances sets.String
instanceGroups []*compute.InstanceGroup instanceGroups []*compute.InstanceGroup
Ports []int64 Ports []int64
getResult *compute.InstanceGroup getResult *compute.InstanceGroup
listResult *compute.InstanceGroupsListInstances listResult *compute.InstanceGroupsListInstances
calls []int calls []int
namer utils.Namer namer utils.Namer
zonesToInstances map[string][]string
} }
// GetInstanceGroup fakes getting an instance group from the cloud. // GetInstanceGroup fakes getting an instance group from the cloud.
func (f *FakeInstanceGroups) GetInstanceGroup(name, zone string) (*compute.InstanceGroup, error) { func (f *FakeInstanceGroups) GetInstanceGroup(name, zone string) (*compute.InstanceGroup, error) {
f.calls = append(f.calls, utils.Get) f.calls = append(f.calls, utils.Get)
for _, ig := range f.instanceGroups { for _, ig := range f.instanceGroups {
if ig.Name == name { if ig.Name == name && ig.Zone == zone {
return ig, nil return ig, nil
} }
} }
@ -60,7 +80,7 @@ func (f *FakeInstanceGroups) GetInstanceGroup(name, zone string) (*compute.Insta
// CreateInstanceGroup fakes instance group creation. // CreateInstanceGroup fakes instance group creation.
func (f *FakeInstanceGroups) CreateInstanceGroup(name, zone string) (*compute.InstanceGroup, error) { func (f *FakeInstanceGroups) CreateInstanceGroup(name, zone string) (*compute.InstanceGroup, error) {
newGroup := &compute.InstanceGroup{Name: name, SelfLink: name} newGroup := &compute.InstanceGroup{Name: name, SelfLink: name, Zone: zone}
f.instanceGroups = append(f.instanceGroups, newGroup) f.instanceGroups = append(f.instanceGroups, newGroup)
return newGroup, nil return newGroup, nil
} }
@ -92,13 +112,35 @@ func (f *FakeInstanceGroups) ListInstancesInInstanceGroup(name, zone string, sta
func (f *FakeInstanceGroups) AddInstancesToInstanceGroup(name, zone string, instanceNames []string) error { func (f *FakeInstanceGroups) AddInstancesToInstanceGroup(name, zone string, instanceNames []string) error {
f.calls = append(f.calls, utils.AddInstances) f.calls = append(f.calls, utils.AddInstances)
f.instances.Insert(instanceNames...) f.instances.Insert(instanceNames...)
if _, ok := f.zonesToInstances[zone]; !ok {
f.zonesToInstances[zone] = []string{}
}
f.zonesToInstances[zone] = append(f.zonesToInstances[zone], instanceNames...)
return nil return nil
} }
// GetInstancesByZone returns the zone to instances map.
func (f *FakeInstanceGroups) GetInstancesByZone() map[string][]string {
return f.zonesToInstances
}
// RemoveInstancesFromInstanceGroup fakes removing instances from an instance group. // RemoveInstancesFromInstanceGroup fakes removing instances from an instance group.
func (f *FakeInstanceGroups) RemoveInstancesFromInstanceGroup(name, zone string, instanceNames []string) error { func (f *FakeInstanceGroups) RemoveInstancesFromInstanceGroup(name, zone string, instanceNames []string) error {
f.calls = append(f.calls, utils.RemoveInstances) f.calls = append(f.calls, utils.RemoveInstances)
f.instances.Delete(instanceNames...) f.instances.Delete(instanceNames...)
l, ok := f.zonesToInstances[zone]
if !ok {
return nil
}
newIns := []string{}
delIns := sets.NewString(instanceNames...)
for _, oldIns := range l {
if delIns.Has(oldIns) {
continue
}
newIns = append(newIns, oldIns)
}
f.zonesToInstances[zone] = newIns
return nil return nil
} }

View file

@ -17,6 +17,7 @@ limitations under the License.
package instances package instances
import ( import (
"fmt"
"net/http" "net/http"
"strings" "strings"
@ -35,89 +36,167 @@ const (
// Instances implements NodePool. // Instances implements NodePool.
type Instances struct { type Instances struct {
cloud InstanceGroups cloud InstanceGroups
zone string // zones is a list of zones seeded by Kubernetes node zones.
// TODO: we can figure this out.
snapshotter storage.Snapshotter snapshotter storage.Snapshotter
zoneLister
} }
// NewNodePool creates a new node pool. // NewNodePool creates a new node pool.
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with // - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup. // members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups, zone string) NodePool { func NewNodePool(cloud InstanceGroups) NodePool {
glog.V(3).Infof("NodePool is only aware of instances in zone %v", zone) return &Instances{cloud, storage.NewInMemoryPool(), nil}
return &Instances{cloud, zone, storage.NewInMemoryPool()} }
// Init initializes the instance pool. The given zoneLister is used to list
// all zones that require an instance group, and to lookup which zone a
// given Kubernetes node is in so we can add it to the right instance group.
func (i *Instances) Init(zl zoneLister) {
i.zoneLister = zl
} }
// AddInstanceGroup creates or gets an instance group if it doesn't exist // AddInstanceGroup creates or gets an instance group if it doesn't exist
// and adds the given port to it. // and adds the given port to it. Returns a list of one instance group per zone,
func (i *Instances) AddInstanceGroup(name string, port int64) (*compute.InstanceGroup, *compute.NamedPort, error) { // all of which have the exact same named port.
ig, _ := i.Get(name) func (i *Instances) AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) {
if ig == nil { igs := []*compute.InstanceGroup{}
glog.Infof("Creating instance group %v", name) namedPort := &compute.NamedPort{}
zones, err := i.ListZones()
if err != nil {
return igs, namedPort, err
}
for _, zone := range zones {
ig, _ := i.Get(name, zone)
var err error var err error
ig, err = i.cloud.CreateInstanceGroup(name, i.zone) if ig == nil {
glog.Infof("Creating instance group %v in zone %v", name, zone)
ig, err = i.cloud.CreateInstanceGroup(name, zone)
if err != nil {
return nil, nil, err
}
} else {
glog.V(3).Infof("Instance group %v already exists in zone %v, adding port %d to it", name, zone, port)
}
defer i.snapshotter.Add(name, struct{}{})
namedPort, err = i.cloud.AddPortToInstanceGroup(ig, port)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
} else { igs = append(igs, ig)
glog.V(3).Infof("Instance group already exists %v", name)
} }
defer i.snapshotter.Add(name, ig) return igs, namedPort, nil
namedPort, err := i.cloud.AddPortToInstanceGroup(ig, port)
if err != nil {
return nil, nil, err
}
return ig, namedPort, nil
} }
// DeleteInstanceGroup deletes the given IG by name. // DeleteInstanceGroup deletes the given IG by name, from all zones.
func (i *Instances) DeleteInstanceGroup(name string) error { func (i *Instances) DeleteInstanceGroup(name string) error {
defer i.snapshotter.Delete(name) defer i.snapshotter.Delete(name)
return i.cloud.DeleteInstanceGroup(name, i.zone) errs := []error{}
zones, err := i.ListZones()
if err != nil {
return err
}
for _, zone := range zones {
glog.Infof("deleting instance group %v in zone %v", name, zone)
if err := i.cloud.DeleteInstanceGroup(name, zone); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
} }
// list lists all instances in all zones.
func (i *Instances) list(name string) (sets.String, error) { func (i *Instances) list(name string) (sets.String, error) {
nodeNames := sets.NewString() nodeNames := sets.NewString()
instances, err := i.cloud.ListInstancesInInstanceGroup( zones, err := i.ListZones()
name, i.zone, allInstances)
if err != nil { if err != nil {
return nodeNames, err return nodeNames, err
} }
for _, ins := range instances.Items {
// TODO: If round trips weren't so slow one would be inclided for _, zone := range zones {
// to GetInstance using this url and get the name. instances, err := i.cloud.ListInstancesInInstanceGroup(
parts := strings.Split(ins.Instance, "/") name, zone, allInstances)
nodeNames.Insert(parts[len(parts)-1]) if err != nil {
return nodeNames, err
}
for _, ins := range instances.Items {
// TODO: If round trips weren't so slow one would be inclided
// to GetInstance using this url and get the name.
parts := strings.Split(ins.Instance, "/")
nodeNames.Insert(parts[len(parts)-1])
}
} }
return nodeNames, nil return nodeNames, nil
} }
// Get returns the Instance Group by name. // Get returns the Instance Group by name.
func (i *Instances) Get(name string) (*compute.InstanceGroup, error) { func (i *Instances) Get(name, zone string) (*compute.InstanceGroup, error) {
ig, err := i.cloud.GetInstanceGroup(name, i.zone) ig, err := i.cloud.GetInstanceGroup(name, zone)
if err != nil { if err != nil {
return nil, err return nil, err
} }
i.snapshotter.Add(name, ig) i.snapshotter.Add(name, struct{}{})
return ig, nil return ig, nil
} }
// Add adds the given instances to the Instance Group. // splitNodesByZones takes a list of node names and returns a map of zone:node names.
func (i *Instances) Add(groupName string, names []string) error { // It figures out the zones by asking the zoneLister.
glog.V(3).Infof("Adding nodes %v to %v", names, groupName) func (i *Instances) splitNodesByZone(names []string) map[string][]string {
return i.cloud.AddInstancesToInstanceGroup(groupName, i.zone, names) nodesByZone := map[string][]string{}
for _, name := range names {
zone, err := i.GetZoneForNode(name)
if err != nil {
glog.Errorf("Failed to get zones for %v: %v, skipping", name, err)
continue
}
if _, ok := nodesByZone[zone]; !ok {
nodesByZone[zone] = []string{}
}
nodesByZone[zone] = append(nodesByZone[zone], name)
}
return nodesByZone
} }
// Remove removes the given instances from the Instance Group. // Add adds the given instances to the appropriately zoned Instance Group.
func (i *Instances) Add(groupName string, names []string) error {
errs := []error{}
for zone, nodeNames := range i.splitNodesByZone(names) {
glog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.AddInstancesToInstanceGroup(groupName, zone, nodeNames); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
}
// Remove removes the given instances from the appropriately zoned Instance Group.
func (i *Instances) Remove(groupName string, names []string) error { func (i *Instances) Remove(groupName string, names []string) error {
glog.V(3).Infof("Removing nodes %v from %v", names, groupName) errs := []error{}
return i.cloud.RemoveInstancesFromInstanceGroup(groupName, i.zone, names) for zone, nodeNames := range i.splitNodesByZone(names) {
glog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, nodeNames); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
} }
// Sync syncs kubernetes instances with the instances in the instance group. // Sync syncs kubernetes instances with the instances in the instance group.
func (i *Instances) Sync(nodes []string) (err error) { func (i *Instances) Sync(nodes []string) (err error) {
glog.V(3).Infof("Syncing nodes %v", nodes) glog.V(4).Infof("Syncing nodes %v", nodes)
defer func() { defer func() {
// The node pool is only responsible for syncing nodes to instance // The node pool is only responsible for syncing nodes to instance
@ -133,9 +212,9 @@ func (i *Instances) Sync(nodes []string) (err error) {
}() }()
pool := i.snapshotter.Snapshot() pool := i.snapshotter.Snapshot()
for name := range pool { for igName := range pool {
gceNodes := sets.NewString() gceNodes := sets.NewString()
gceNodes, err = i.list(name) gceNodes, err = i.list(igName)
if err != nil { if err != nil {
return err return err
} }
@ -149,14 +228,14 @@ func (i *Instances) Sync(nodes []string) (err error) {
addNodes := kubeNodes.Difference(gceNodes).List() addNodes := kubeNodes.Difference(gceNodes).List()
if len(removeNodes) != 0 { if len(removeNodes) != 0 {
if err = i.Remove( if err = i.Remove(
name, gceNodes.Difference(kubeNodes).List()); err != nil { igName, gceNodes.Difference(kubeNodes).List()); err != nil {
return err return err
} }
} }
if len(addNodes) != 0 { if len(addNodes) != 0 {
if err = i.Add( if err = i.Add(
name, kubeNodes.Difference(gceNodes).List()); err != nil { igName, kubeNodes.Difference(gceNodes).List()); err != nil {
return err return err
} }
} }

View file

@ -24,10 +24,16 @@ import (
const defaultZone = "default-zone" const defaultZone = "default-zone"
func newNodePool(f *FakeInstanceGroups, zone string) NodePool {
pool := NewNodePool(f)
pool.Init(&FakeZoneLister{[]string{zone}})
return pool
}
func TestNodePoolSync(t *testing.T) { func TestNodePoolSync(t *testing.T) {
f := NewFakeInstanceGroups(sets.NewString( f := NewFakeInstanceGroups(sets.NewString(
[]string{"n1", "n2"}...)) []string{"n1", "n2"}...))
pool := NewNodePool(f, defaultZone) pool := newNodePool(f, defaultZone)
pool.AddInstanceGroup("test", 80) pool.AddInstanceGroup("test", 80)
// KubeNodes: n1 // KubeNodes: n1
@ -46,7 +52,7 @@ func TestNodePoolSync(t *testing.T) {
// Try to add n2 to the instance group. // Try to add n2 to the instance group.
f = NewFakeInstanceGroups(sets.NewString([]string{"n1"}...)) f = NewFakeInstanceGroups(sets.NewString([]string{"n1"}...))
pool = NewNodePool(f, defaultZone) pool = newNodePool(f, defaultZone)
pool.AddInstanceGroup("test", 80) pool.AddInstanceGroup("test", 80)
f.calls = []int{} f.calls = []int{}
@ -62,7 +68,7 @@ func TestNodePoolSync(t *testing.T) {
// Do nothing. // Do nothing.
f = NewFakeInstanceGroups(sets.NewString([]string{"n1", "n2"}...)) f = NewFakeInstanceGroups(sets.NewString([]string{"n1", "n2"}...))
pool = NewNodePool(f, defaultZone) pool = newNodePool(f, defaultZone)
pool.AddInstanceGroup("test", 80) pool.AddInstanceGroup("test", 80)
f.calls = []int{} f.calls = []int{}

View file

@ -20,17 +20,26 @@ import (
compute "google.golang.org/api/compute/v1" compute "google.golang.org/api/compute/v1"
) )
// zoneLister manages lookups for GCE instance groups/instances to zones.
type zoneLister interface {
ListZones() ([]string, error)
GetZoneForNode(name string) (string, error)
}
// NodePool is an interface to manage a pool of kubernetes nodes synced with vm instances in the cloud // NodePool is an interface to manage a pool of kubernetes nodes synced with vm instances in the cloud
// through the InstanceGroups interface. // through the InstanceGroups interface. It handles zones opaquely using the zoneLister.
type NodePool interface { type NodePool interface {
AddInstanceGroup(name string, port int64) (*compute.InstanceGroup, *compute.NamedPort, error) Init(zl zoneLister)
// The following 2 methods operate on instance groups.
AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error)
DeleteInstanceGroup(name string) error DeleteInstanceGroup(name string) error
// TODO: Refactor for modularity // TODO: Refactor for modularity
Add(groupName string, nodeNames []string) error Add(groupName string, nodeNames []string) error
Remove(groupName string, nodeNames []string) error Remove(groupName string, nodeNames []string) error
Sync(nodeNames []string) error Sync(nodeNames []string) error
Get(name string) (*compute.InstanceGroup, error) Get(name, zone string) (*compute.InstanceGroup, error)
} }
// InstanceGroups is an interface for managing gce instances groups, and the instances therein. // InstanceGroups is an interface for managing gce instances groups, and the instances therein.

View file

@ -344,6 +344,7 @@ func (l *L7) deleteOldSSLCert() (err error) {
return err return err
} }
} }
l.oldSSLCert = nil
return nil return nil
} }
@ -368,7 +369,7 @@ func (l *L7) checkSSLCert() (err error) {
cert, _ := l.cloud.GetSslCertificate(certName) cert, _ := l.cloud.GetSslCertificate(certName)
// PrivateKey is write only, so compare certs alone. We're assuming that // PrivateKey is write only, so compare certs alone. We're assuming that
// no one will change just the key. We can remembe the key and compare, // no one will change just the key. We can remember the key and compare,
// but a bug could end up leaking it, which feels worse. // but a bug could end up leaking it, which feels worse.
if cert == nil || ingCert != cert.Certificate { if cert == nil || ingCert != cert.Certificate {

View file

@ -29,7 +29,7 @@ import (
const ( const (
testDefaultBeNodePort = int64(3000) testDefaultBeNodePort = int64(3000)
defaultZone = "default-zone" defaultZone = "zone-a"
) )
func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool {
@ -38,8 +38,11 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool {
fakeHCs := healthchecks.NewFakeHealthChecks() fakeHCs := healthchecks.NewFakeHealthChecks()
namer := &utils.Namer{} namer := &utils.Namer{}
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
healthChecker.Init(&healthchecks.FakeHealthCheckGetter{nil})
nodePool := instances.NewNodePool(fakeIGs)
nodePool.Init(&instances.FakeZoneLister{[]string{defaultZone}})
backendPool := backends.NewBackendPool( backendPool := backends.NewBackendPool(
fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer, []int64{}, false) fakeBackends, healthChecker, nodePool, namer, []int64{}, false)
return NewLoadBalancerPool(f, backendPool, testDefaultBeNodePort, namer) return NewLoadBalancerPool(f, backendPool, testDefaultBeNodePort, namer)
} }

View file

@ -220,6 +220,7 @@ func main() {
if clusterManager.ClusterNamer.ClusterName != "" { if clusterManager.ClusterNamer.ClusterName != "" {
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.ClusterName) glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.ClusterName)
} }
clusterManager.Init(&controller.GCETranslator{lbc})
go registerHandlers(lbc) go registerHandlers(lbc)
go handleSigterm(lbc, *deleteAllOnQuit) go handleSigterm(lbc, *deleteAllOnQuit)

View file

@ -78,16 +78,28 @@ type CloudListingPool struct {
keyGetter keyFunc keyGetter keyFunc
} }
// ReplenishPool lists through the cloudLister and inserts into the pool. // ReplenishPool lists through the cloudLister and inserts into the pool. This
// is especially useful in scenarios like deleting an Ingress while the
// controller is restarting. As long as the resource exists in the shared
// memory pool, it is visible to the caller and they can take corrective
// actions, eg: backend pool deletes backends with non-matching node ports
// in its sync method.
func (c *CloudListingPool) ReplenishPool() { func (c *CloudListingPool) ReplenishPool() {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
glog.V(4).Infof("Replenishing pool") glog.V(4).Infof("Replenishing pool")
// We must list with the lock, because the controller also lists through
// Snapshot(). It's ok if the controller takes a snpshot, we list, we
// delete, because we have delete based on the most recent state. Worst
// case we thrash. It's not ok if we list, the controller lists and
// creates a backend, and we delete that backend based on stale state.
items, err := c.lister.List() items, err := c.lister.List()
if err != nil { if err != nil {
glog.Warningf("Failed to list: %v", err) glog.Warningf("Failed to list: %v", err)
return return
} }
for i := range items { for i := range items {
key, err := c.keyGetter(items[i]) key, err := c.keyGetter(items[i])
if err != nil { if err != nil {

View file

@ -238,3 +238,22 @@ func CompareLinks(l1, l2 string) bool {
// FakeIngressRuleValueMap is a convenience type used by multiple submodules // FakeIngressRuleValueMap is a convenience type used by multiple submodules
// that share the same testing methods. // that share the same testing methods.
type FakeIngressRuleValueMap map[string]string type FakeIngressRuleValueMap map[string]string
// DefaultHealthCheckTemplate simply returns the default health check template.
func DefaultHealthCheckTemplate(port int64) *compute.HttpHealthCheck {
return &compute.HttpHealthCheck{
Port: port,
// Empty string is used as a signal to the caller to use the appropriate
// default.
RequestPath: "",
Description: "Default kubernetes L7 Loadbalancing health check.",
// How often to health check.
CheckIntervalSec: 1,
// How long to wait before claiming failure of a health check.
TimeoutSec: 1,
// Number of healthchecks to pass for a vm to be deemed healthy.
HealthyThreshold: 1,
// Number of healthchecks to fail before the vm is deemed unhealthy.
UnhealthyThreshold: 10,
}
}