From f84ca5483148ddd815fec3d55b9254c67a6e5444 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Sat, 28 May 2016 22:02:39 -0700 Subject: [PATCH] Readiness probe health check --- controllers/gce/backends/backends.go | 6 +- controllers/gce/backends/fakes.go | 44 --------- controllers/gce/backends/interfaces.go | 14 --- controllers/gce/controller/cluster_manager.go | 18 +++- controllers/gce/controller/controller.go | 56 +++++++++-- controllers/gce/controller/controller_test.go | 1 + controllers/gce/controller/utils.go | 96 +++++++++++++++++++ controllers/gce/healthchecks/fakes.go | 2 + controllers/gce/healthchecks/healthchecks.go | 53 +++++----- controllers/gce/healthchecks/interfaces.go | 5 +- .../gce/loadbalancers/loadbalancers.go | 2 +- controllers/gce/storage/pools.go | 14 ++- 12 files changed, 217 insertions(+), 94 deletions(-) diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index a697748bd..84b029ea9 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -109,7 +109,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) { func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { // Create a new health check - if err := b.healthChecker.Add(namedPort.Port, ""); err != nil { + if err := b.healthChecker.Add(namedPort.Port); err != nil { return nil, err } hc, err := b.healthChecker.Get(namedPort.Port) @@ -152,6 +152,10 @@ func (b *Backends) Add(port int64) error { return err } } + // we won't find any igs till the node pool syncs nodes. + if len(igs) == 0 { + return nil + } if err := b.edgeHop(be, igs); err != nil { return err } diff --git a/controllers/gce/backends/fakes.go b/controllers/gce/backends/fakes.go index ef8bfadaf..945e04847 100644 --- a/controllers/gce/backends/fakes.go +++ b/controllers/gce/backends/fakes.go @@ -99,47 +99,3 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput return &compute.BackendServiceGroupHealth{ HealthStatus: states}, nil } - -// NewFakeHealthChecks returns a health check fake. -func NewFakeHealthChecks() *FakeHealthChecks { - return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}} -} - -// FakeHealthChecks fakes out health checks. -type FakeHealthChecks struct { - hc []*compute.HttpHealthCheck -} - -// CreateHttpHealthCheck fakes health check creation. -func (f *FakeHealthChecks) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error { - f.hc = append(f.hc, hc) - return nil -} - -// GetHttpHealthCheck fakes getting a http health check. -func (f *FakeHealthChecks) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) { - for _, h := range f.hc { - if h.Name == name { - return h, nil - } - } - return nil, fmt.Errorf("Health check %v not found.", name) -} - -// DeleteHttpHealthCheck fakes deleting a http health check. -func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error { - healthChecks := []*compute.HttpHealthCheck{} - exists := false - for _, h := range f.hc { - if h.Name == name { - exists = true - continue - } - healthChecks = append(healthChecks, h) - } - if !exists { - return fmt.Errorf("Failed to find health check %v", name) - } - f.hc = healthChecks - return nil -} diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 3e199f9e5..25802570e 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -42,17 +42,3 @@ type BackendServices interface { ListBackendServices() (*compute.BackendServiceList, error) GetHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) } - -// SingleHealthCheck is an interface to manage a single GCE health check. -type SingleHealthCheck interface { - CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error - DeleteHttpHealthCheck(name string) error - GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) -} - -// HealthChecker is an interface to manage cloud HTTPHealthChecks. -type HealthChecker interface { - Add(port int64, path string) error - Delete(port int64) error - Get(port int64) (*compute.HttpHealthCheck, error) -} diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 65f1b6a92..b20299ada 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -74,10 +74,21 @@ type ClusterManager struct { backendPool backends.BackendPool l7Pool loadbalancers.LoadBalancerPool firewallPool firewalls.SingleFirewallPool + + // TODO: Refactor so we simply init a health check pool. + // Currently health checks are tied to backends because each backend needs + // the link of the associated health, but both the backend pool and + // loadbalancer pool manage backends, because the lifetime of the default + // backend is tied to the last/first loadbalancer not the life of the + // nodeport service or Ingress. + healthCheckers []healthchecks.HealthChecker } func (c *ClusterManager) Init(tr *GCETranslator) { c.instancePool.Init(tr) + for _, h := range c.healthCheckers { + h.Init(tr) + } // TODO: Initialize other members as needed. } @@ -221,7 +232,7 @@ func getGCEClient(config io.Reader) *gce.GCECloud { // string passed to glbc via --gce-cluster-name. // - defaultBackendNodePort: is the node port of glbc's default backend. This is // the kubernetes Service that serves the 404 page if no urls match. -// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz" +// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz". func NewClusterManager( configFilePath string, name string, @@ -258,11 +269,14 @@ func NewClusterManager( // BackendPool creates GCE BackendServices and associated health checks. healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer) + // Loadbalancer pool manages the default backend and its health check. + defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer) + + cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker} // TODO: This needs to change to a consolidated management of the default backend. cluster.backendPool = backends.NewBackendPool( cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true) - defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer) defaultBackendPool := backends.NewBackendPool( cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) cluster.defaultBackendNodePort = defaultBackendNodePort diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index 1fb5d926a..8c03337cf 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -44,18 +44,25 @@ var ( // DefaultClusterUID is the uid to use for clusters resources created by an // L7 controller created without specifying the --cluster-uid flag. DefaultClusterUID = "" + + // Frequency to poll on local stores to sync. + storeSyncPollPeriod = 5 * time.Second ) // LoadBalancerController watches the kubernetes api and adds/removes services // from the loadbalancer, via loadBalancerConfig. type LoadBalancerController struct { - client *client.Client - ingController *framework.Controller - nodeController *framework.Controller - svcController *framework.Controller - ingLister StoreToIngressLister - nodeLister cache.StoreToNodeLister - svcLister cache.StoreToServiceLister + client *client.Client + ingController *framework.Controller + nodeController *framework.Controller + svcController *framework.Controller + podController *framework.Controller + ingLister StoreToIngressLister + nodeLister cache.StoreToNodeLister + svcLister cache.StoreToServiceLister + // Health checks are the readiness probes of containers on pods. + podLister cache.StoreToPodLister + // TODO: Watch secrets CloudClusterManager *ClusterManager recorder record.EventRecorder nodeQueue *taskQueue @@ -69,6 +76,9 @@ type LoadBalancerController struct { shutdown bool // tlsLoader loads secrets from the Kubernetes apiserver for Ingresses. tlsLoader tlsLoader + // hasSynced returns true if all associated sub-controllers have synced. + // Abstracted into a func for testing. + hasSynced func() bool } // NewLoadBalancerController creates a controller for gce loadbalancers. @@ -90,6 +100,7 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste } lbc.nodeQueue = NewTaskQueue(lbc.syncNodes) lbc.ingQueue = NewTaskQueue(lbc.sync) + lbc.hasSynced = lbc.storesSynced // Ingress watch handlers pathHandlers := framework.ResourceEventHandlerFuncs{ @@ -130,12 +141,19 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste lbc.client, "services", namespace, fields.Everything()), &api.Service{}, resyncPeriod, svcHandlers) + lbc.podLister.Indexer, lbc.podController = framework.NewIndexerInformer( + cache.NewListWatchFromClient(lbc.client, "pods", namespace, fields.Everything()), + &api.Pod{}, + resyncPeriod, + framework.ResourceEventHandlerFuncs{}, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + nodeHandlers := framework.ResourceEventHandlerFuncs{ AddFunc: lbc.nodeQueue.enqueue, DeleteFunc: lbc.nodeQueue.enqueue, // Nodes are updated every 10s and we don't care, so no update handler. } - // Node watch handlers lbc.nodeLister.Store, lbc.nodeController = framework.NewInformer( &cache.ListWatch{ @@ -194,6 +212,7 @@ func (lbc *LoadBalancerController) Run() { go lbc.ingController.Run(lbc.stopCh) go lbc.nodeController.Run(lbc.stopCh) go lbc.svcController.Run(lbc.stopCh) + go lbc.podController.Run(lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh) go lbc.nodeQueue.run(time.Second, lbc.stopCh) <-lbc.stopCh @@ -224,8 +243,29 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error { return nil } +// storesSynced returns true if all the sub-controllers have finished their +// first sync with apiserver. +func (lbc *LoadBalancerController) storesSynced() bool { + return ( + // wait for pods to sync so we don't allocate a default health check when + // an endpoint has a readiness probe. + lbc.podController.HasSynced() && + // wait for services so we don't thrash on backend creation. + lbc.svcController.HasSynced() && + // wait for nodes so we don't disconnect a backend from an instance + // group just because we don't realize there are nodes in that zone. + lbc.nodeController.HasSynced() && + // Wait for ingresses as a safety measure. We don't really need this. + lbc.ingController.HasSynced()) +} + // sync manages Ingress create/updates/deletes. func (lbc *LoadBalancerController) sync(key string) { + if !lbc.hasSynced() { + time.Sleep(storeSyncPollPeriod) + lbc.ingQueue.requeue(key, fmt.Errorf("Waiting for stores to sync")) + return + } glog.V(3).Infof("Syncing %v", key) paths, err := lbc.ingLister.List() diff --git a/controllers/gce/controller/controller_test.go b/controllers/gce/controller/controller_test.go index 34d98b2d0..26aab545f 100644 --- a/controllers/gce/controller/controller_test.go +++ b/controllers/gce/controller/controller_test.go @@ -55,6 +55,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager, masterUrl s if err != nil { t.Fatalf("%v", err) } + lb.hasSynced = func() { return true } return lb } diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 8c2de8aa5..274ea02c6 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" @@ -357,3 +358,98 @@ func (t *GCETranslator) ListZones() ([]string, error) { } return zones.List(), nil } + +// isPortEqual compares the given IntOrString ports +func isPortEqual(port, targetPort intstr.IntOrString) bool { + if targetPort.Type == intstr.Int { + return port.IntVal == targetPort.IntVal + } + return port.StrVal == targetPort.StrVal +} + +// geHTTPProbe returns the http readiness probe from the first container +// that matches targetPort, from the set of pods matching the given labels. +func (t *GCETranslator) getHTTPProbe(l map[string]string, targetPort intstr.IntOrString) (*api.Probe, error) { + // Lookup any container with a matching targetPort from the set of pods + // with a matching label selector. + pl, err := t.podLister.List(labels.SelectorFromSet(labels.Set(l))) + if err != nil { + return nil, err + } + for _, pod := range pl { + logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort) + for _, c := range pod.Spec.Containers { + if c.ReadinessProbe == nil || c.ReadinessProbe.Handler.HTTPGet == nil { + continue + } + for _, p := range c.Ports { + cPort := intstr.IntOrString{IntVal: p.ContainerPort, StrVal: p.Name} + if isPortEqual(cPort, targetPort) { + if isPortEqual(c.ReadinessProbe.Handler.HTTPGet.Port, targetPort) { + return c.ReadinessProbe, nil + } else { + glog.Infof("%v: found matching targetPort on container %v, but not on readinessProbe (%+v)", + logStr, c.Name, c.ReadinessProbe.Handler.HTTPGet.Port) + } + } + } + } + glog.V(4).Infof("%v: lacks a matching HTTP probe for use in health checks.", logStr) + } + return nil, nil +} + +// HealthCheck returns the http readiness probe for the endpoint backing the +// given nodePort. If no probe is found it returns a health check with "" as +// the request path, callers are responsible for swapping this out for the +// appropriate default. +func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error) { + sl, err := t.svcLister.List() + if err != nil { + return nil, err + } + // Find the label and target port of the one service with the given nodePort + for _, s := range sl.Items { + for _, p := range s.Spec.Ports { + if int32(port) == p.NodePort { + rp, err := t.getHTTPProbe(s.Spec.Selector, p.TargetPort) + if err != nil { + return nil, err + } + if rp == nil { + glog.Infof("No pod in service %v with node port %v has declared a matching readiness probe for health checks.", s.Name, port) + break + } + healthPath := rp.Handler.HTTPGet.Path + host := rp.Handler.HTTPGet.Host + glog.Infof("Found custom health check for Service %v nodeport %v: %v%v", s.Name, port, host, healthPath) + return &compute.HttpHealthCheck{ + Port: port, + RequestPath: healthPath, + Host: host, + Description: "kubernetes L7 health check from readiness probe.", + CheckIntervalSec: int64(rp.PeriodSeconds), + TimeoutSec: int64(rp.TimeoutSeconds), + HealthyThreshold: int64(rp.SuccessThreshold), + UnhealthyThreshold: int64(rp.FailureThreshold), + // TODO: include headers after updating compute godep. + }, nil + } + } + } + return &compute.HttpHealthCheck{ + Port: port, + // Empty string is used as a signal to the caller to use the appropriate + // default. + RequestPath: "", + Description: "Default kubernetes L7 Loadbalancing health check.", + // How often to health check. + CheckIntervalSec: 1, + // How long to wait before claiming failure of a health check. + TimeoutSec: 1, + // Number of healthchecks to pass for a vm to be deemed healthy. + HealthyThreshold: 1, + // Number of healthchecks to fail before the vm is deemed unhealthy. + UnhealthyThreshold: 10, + }, nil +} diff --git a/controllers/gce/healthchecks/fakes.go b/controllers/gce/healthchecks/fakes.go index 7889b2b50..51fc07174 100644 --- a/controllers/gce/healthchecks/fakes.go +++ b/controllers/gce/healthchecks/fakes.go @@ -65,3 +65,5 @@ func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error { f.hc = healthChecks return nil } + +func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { return nil } diff --git a/controllers/gce/healthchecks/healthchecks.go b/controllers/gce/healthchecks/healthchecks.go index 849899850..fdd97427b 100644 --- a/controllers/gce/healthchecks/healthchecks.go +++ b/controllers/gce/healthchecks/healthchecks.go @@ -29,43 +29,52 @@ type HealthChecks struct { cloud SingleHealthCheck defaultPath string namer *utils.Namer + healthCheckGetter +} + +type healthCheckGetter interface { + // HealthCheck returns the HTTP readiness check for a node port. + HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error) } // NewHealthChecker creates a new health checker. // cloud: the cloud object implementing SingleHealthCheck. // defaultHealthCheckPath: is the HTTP path to use for health checks. func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { - return &HealthChecks{cloud, defaultHealthCheckPath, namer} + return &HealthChecks{cloud, defaultHealthCheckPath, namer, nil} +} + +// Init initializes the health checker. +func (h *HealthChecks) Init(r healthCheckGetter) { + h.healthCheckGetter = r } // Add adds a healthcheck if one for the same port doesn't already exist. -func (h *HealthChecks) Add(port int64, path string) error { - hc, _ := h.Get(port) - name := h.namer.BeName(port) - if path == "" { - path = h.defaultPath +func (h *HealthChecks) Add(port int64) error { + wantHC, err := h.healthCheckGetter.HealthCheck(port) + if err != nil { + return err } + if wantHC.RequestPath == "" { + wantHC.RequestPath = h.defaultPath + } + name := h.namer.BeName(port) + wantHC.Name = name + hc, _ := h.Get(port) if hc == nil { + // TODO: check if the readiness probe has changed and update the + // health check. glog.Infof("Creating health check %v", name) - if err := h.cloud.CreateHttpHealthCheck( - &compute.HttpHealthCheck{ - Name: name, - Port: port, - RequestPath: path, - Description: "Default kubernetes L7 Loadbalancing health check.", - // How often to health check. - CheckIntervalSec: 1, - // How long to wait before claiming failure of a health check. - TimeoutSec: 1, - // Number of healthchecks to pass for a vm to be deemed healthy. - HealthyThreshold: 1, - // Number of healthchecks to fail before the vm is deemed unhealthy. - UnhealthyThreshold: 10, - }); err != nil { + if err := h.cloud.CreateHttpHealthCheck(wantHC); err != nil { + return err + } + } else if wantHC.RequestPath != hc.RequestPath { + // TODO: also compare headers interval etc. + glog.Infof("Updating health check %v, path %v -> %v", name, hc.RequestPath, wantHC.RequestPath) + if err := h.cloud.UpdateHttpHealthCheck(wantHC); err != nil { return err } } else { - // TODO: Does this health check need an edge hop? glog.Infof("Health check %v already exists", hc.Name) } return nil diff --git a/controllers/gce/healthchecks/interfaces.go b/controllers/gce/healthchecks/interfaces.go index efbea9669..6e04bafc6 100644 --- a/controllers/gce/healthchecks/interfaces.go +++ b/controllers/gce/healthchecks/interfaces.go @@ -23,13 +23,16 @@ import ( // SingleHealthCheck is an interface to manage a single GCE health check. type SingleHealthCheck interface { CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error + UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error DeleteHttpHealthCheck(name string) error GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) } // HealthChecker is an interface to manage cloud HTTPHealthChecks. type HealthChecker interface { - Add(port int64, path string) error + Init(h healthCheckGetter) + + Add(port int64) error Delete(port int64) error Get(port int64) (*compute.HttpHealthCheck, error) } diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index 45d2a76cc..0047db957 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -368,7 +368,7 @@ func (l *L7) checkSSLCert() (err error) { cert, _ := l.cloud.GetSslCertificate(certName) // PrivateKey is write only, so compare certs alone. We're assuming that - // no one will change just the key. We can remembe the key and compare, + // no one will change just the key. We can remember the key and compare, // but a bug could end up leaking it, which feels worse. if cert == nil || ingCert != cert.Certificate { diff --git a/controllers/gce/storage/pools.go b/controllers/gce/storage/pools.go index 3d1fd235e..a09dc6c95 100644 --- a/controllers/gce/storage/pools.go +++ b/controllers/gce/storage/pools.go @@ -78,16 +78,28 @@ type CloudListingPool struct { keyGetter keyFunc } -// ReplenishPool lists through the cloudLister and inserts into the pool. +// ReplenishPool lists through the cloudLister and inserts into the pool. This +// is especially useful in scenarios like deleting an Ingress while the +// controller is restarting. As long as the resource exists in the shared +// memory pool, it is visible to the caller and they can take corrective +// actions, eg: backend pool deletes backends with non-matching node ports +// in its sync method. func (c *CloudListingPool) ReplenishPool() { c.lock.Lock() defer c.lock.Unlock() glog.V(4).Infof("Replenishing pool") + + // We must list with the lock, because the controller also lists through + // Snapshot(). It's ok if the controller takes a snpshot, we list, we + // delete, because we have delete based on the most recent state. Worst + // case we thrash. It's not ok if we list, the controller lists and + // creates a backend, and we delete that backend based on stale state. items, err := c.lister.List() if err != nil { glog.Warningf("Failed to list: %v", err) return } + for i := range items { key, err := c.keyGetter(items[i]) if err != nil {