Readiness probe health check

This commit is contained in:
Prashanth Balasubramanian 2016-05-28 22:02:39 -07:00
parent 32ac61e7e3
commit f84ca54831
12 changed files with 217 additions and 94 deletions

View file

@ -109,7 +109,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) {
// Create a new health check
if err := b.healthChecker.Add(namedPort.Port, ""); err != nil {
if err := b.healthChecker.Add(namedPort.Port); err != nil {
return nil, err
}
hc, err := b.healthChecker.Get(namedPort.Port)
@ -152,6 +152,10 @@ func (b *Backends) Add(port int64) error {
return err
}
}
// we won't find any igs till the node pool syncs nodes.
if len(igs) == 0 {
return nil
}
if err := b.edgeHop(be, igs); err != nil {
return err
}

View file

@ -99,47 +99,3 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput
return &compute.BackendServiceGroupHealth{
HealthStatus: states}, nil
}
// NewFakeHealthChecks returns a health check fake.
func NewFakeHealthChecks() *FakeHealthChecks {
return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}}
}
// FakeHealthChecks fakes out health checks.
type FakeHealthChecks struct {
hc []*compute.HttpHealthCheck
}
// CreateHttpHealthCheck fakes health check creation.
func (f *FakeHealthChecks) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
f.hc = append(f.hc, hc)
return nil
}
// GetHttpHealthCheck fakes getting a http health check.
func (f *FakeHealthChecks) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) {
for _, h := range f.hc {
if h.Name == name {
return h, nil
}
}
return nil, fmt.Errorf("Health check %v not found.", name)
}
// DeleteHttpHealthCheck fakes deleting a http health check.
func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error {
healthChecks := []*compute.HttpHealthCheck{}
exists := false
for _, h := range f.hc {
if h.Name == name {
exists = true
continue
}
healthChecks = append(healthChecks, h)
}
if !exists {
return fmt.Errorf("Failed to find health check %v", name)
}
f.hc = healthChecks
return nil
}

View file

@ -42,17 +42,3 @@ type BackendServices interface {
ListBackendServices() (*compute.BackendServiceList, error)
GetHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error)
}
// SingleHealthCheck is an interface to manage a single GCE health check.
type SingleHealthCheck interface {
CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error
DeleteHttpHealthCheck(name string) error
GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error)
}
// HealthChecker is an interface to manage cloud HTTPHealthChecks.
type HealthChecker interface {
Add(port int64, path string) error
Delete(port int64) error
Get(port int64) (*compute.HttpHealthCheck, error)
}

View file

@ -74,10 +74,21 @@ type ClusterManager struct {
backendPool backends.BackendPool
l7Pool loadbalancers.LoadBalancerPool
firewallPool firewalls.SingleFirewallPool
// TODO: Refactor so we simply init a health check pool.
// Currently health checks are tied to backends because each backend needs
// the link of the associated health, but both the backend pool and
// loadbalancer pool manage backends, because the lifetime of the default
// backend is tied to the last/first loadbalancer not the life of the
// nodeport service or Ingress.
healthCheckers []healthchecks.HealthChecker
}
func (c *ClusterManager) Init(tr *GCETranslator) {
c.instancePool.Init(tr)
for _, h := range c.healthCheckers {
h.Init(tr)
}
// TODO: Initialize other members as needed.
}
@ -221,7 +232,7 @@ func getGCEClient(config io.Reader) *gce.GCECloud {
// string passed to glbc via --gce-cluster-name.
// - defaultBackendNodePort: is the node port of glbc's default backend. This is
// the kubernetes Service that serves the 404 page if no urls match.
// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz"
// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz".
func NewClusterManager(
configFilePath string,
name string,
@ -258,11 +269,14 @@ func NewClusterManager(
// BackendPool creates GCE BackendServices and associated health checks.
healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer)
// Loadbalancer pool manages the default backend and its health check.
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker}
// TODO: This needs to change to a consolidated management of the default backend.
cluster.backendPool = backends.NewBackendPool(
cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true)
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
defaultBackendPool := backends.NewBackendPool(
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
cluster.defaultBackendNodePort = defaultBackendNodePort

View file

@ -44,18 +44,25 @@ var (
// DefaultClusterUID is the uid to use for clusters resources created by an
// L7 controller created without specifying the --cluster-uid flag.
DefaultClusterUID = ""
// Frequency to poll on local stores to sync.
storeSyncPollPeriod = 5 * time.Second
)
// LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct {
client *client.Client
ingController *framework.Controller
nodeController *framework.Controller
svcController *framework.Controller
ingLister StoreToIngressLister
nodeLister cache.StoreToNodeLister
svcLister cache.StoreToServiceLister
client *client.Client
ingController *framework.Controller
nodeController *framework.Controller
svcController *framework.Controller
podController *framework.Controller
ingLister StoreToIngressLister
nodeLister cache.StoreToNodeLister
svcLister cache.StoreToServiceLister
// Health checks are the readiness probes of containers on pods.
podLister cache.StoreToPodLister
// TODO: Watch secrets
CloudClusterManager *ClusterManager
recorder record.EventRecorder
nodeQueue *taskQueue
@ -69,6 +76,9 @@ type LoadBalancerController struct {
shutdown bool
// tlsLoader loads secrets from the Kubernetes apiserver for Ingresses.
tlsLoader tlsLoader
// hasSynced returns true if all associated sub-controllers have synced.
// Abstracted into a func for testing.
hasSynced func() bool
}
// NewLoadBalancerController creates a controller for gce loadbalancers.
@ -90,6 +100,7 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
}
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
lbc.ingQueue = NewTaskQueue(lbc.sync)
lbc.hasSynced = lbc.storesSynced
// Ingress watch handlers
pathHandlers := framework.ResourceEventHandlerFuncs{
@ -130,12 +141,19 @@ func NewLoadBalancerController(kubeClient *client.Client, clusterManager *Cluste
lbc.client, "services", namespace, fields.Everything()),
&api.Service{}, resyncPeriod, svcHandlers)
lbc.podLister.Indexer, lbc.podController = framework.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client, "pods", namespace, fields.Everything()),
&api.Pod{},
resyncPeriod,
framework.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nodeHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: lbc.nodeQueue.enqueue,
DeleteFunc: lbc.nodeQueue.enqueue,
// Nodes are updated every 10s and we don't care, so no update handler.
}
// Node watch handlers
lbc.nodeLister.Store, lbc.nodeController = framework.NewInformer(
&cache.ListWatch{
@ -194,6 +212,7 @@ func (lbc *LoadBalancerController) Run() {
go lbc.ingController.Run(lbc.stopCh)
go lbc.nodeController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)
go lbc.podController.Run(lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh)
go lbc.nodeQueue.run(time.Second, lbc.stopCh)
<-lbc.stopCh
@ -224,8 +243,29 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error {
return nil
}
// storesSynced returns true if all the sub-controllers have finished their
// first sync with apiserver.
func (lbc *LoadBalancerController) storesSynced() bool {
return (
// wait for pods to sync so we don't allocate a default health check when
// an endpoint has a readiness probe.
lbc.podController.HasSynced() &&
// wait for services so we don't thrash on backend creation.
lbc.svcController.HasSynced() &&
// wait for nodes so we don't disconnect a backend from an instance
// group just because we don't realize there are nodes in that zone.
lbc.nodeController.HasSynced() &&
// Wait for ingresses as a safety measure. We don't really need this.
lbc.ingController.HasSynced())
}
// sync manages Ingress create/updates/deletes.
func (lbc *LoadBalancerController) sync(key string) {
if !lbc.hasSynced() {
time.Sleep(storeSyncPollPeriod)
lbc.ingQueue.requeue(key, fmt.Errorf("Waiting for stores to sync"))
return
}
glog.V(3).Infof("Syncing %v", key)
paths, err := lbc.ingLister.List()

View file

@ -55,6 +55,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager, masterUrl s
if err != nil {
t.Fatalf("%v", err)
}
lb.hasSynced = func() { return true }
return lb
}

View file

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
@ -357,3 +358,98 @@ func (t *GCETranslator) ListZones() ([]string, error) {
}
return zones.List(), nil
}
// isPortEqual compares the given IntOrString ports
func isPortEqual(port, targetPort intstr.IntOrString) bool {
if targetPort.Type == intstr.Int {
return port.IntVal == targetPort.IntVal
}
return port.StrVal == targetPort.StrVal
}
// geHTTPProbe returns the http readiness probe from the first container
// that matches targetPort, from the set of pods matching the given labels.
func (t *GCETranslator) getHTTPProbe(l map[string]string, targetPort intstr.IntOrString) (*api.Probe, error) {
// Lookup any container with a matching targetPort from the set of pods
// with a matching label selector.
pl, err := t.podLister.List(labels.SelectorFromSet(labels.Set(l)))
if err != nil {
return nil, err
}
for _, pod := range pl {
logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort)
for _, c := range pod.Spec.Containers {
if c.ReadinessProbe == nil || c.ReadinessProbe.Handler.HTTPGet == nil {
continue
}
for _, p := range c.Ports {
cPort := intstr.IntOrString{IntVal: p.ContainerPort, StrVal: p.Name}
if isPortEqual(cPort, targetPort) {
if isPortEqual(c.ReadinessProbe.Handler.HTTPGet.Port, targetPort) {
return c.ReadinessProbe, nil
} else {
glog.Infof("%v: found matching targetPort on container %v, but not on readinessProbe (%+v)",
logStr, c.Name, c.ReadinessProbe.Handler.HTTPGet.Port)
}
}
}
}
glog.V(4).Infof("%v: lacks a matching HTTP probe for use in health checks.", logStr)
}
return nil, nil
}
// HealthCheck returns the http readiness probe for the endpoint backing the
// given nodePort. If no probe is found it returns a health check with "" as
// the request path, callers are responsible for swapping this out for the
// appropriate default.
func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error) {
sl, err := t.svcLister.List()
if err != nil {
return nil, err
}
// Find the label and target port of the one service with the given nodePort
for _, s := range sl.Items {
for _, p := range s.Spec.Ports {
if int32(port) == p.NodePort {
rp, err := t.getHTTPProbe(s.Spec.Selector, p.TargetPort)
if err != nil {
return nil, err
}
if rp == nil {
glog.Infof("No pod in service %v with node port %v has declared a matching readiness probe for health checks.", s.Name, port)
break
}
healthPath := rp.Handler.HTTPGet.Path
host := rp.Handler.HTTPGet.Host
glog.Infof("Found custom health check for Service %v nodeport %v: %v%v", s.Name, port, host, healthPath)
return &compute.HttpHealthCheck{
Port: port,
RequestPath: healthPath,
Host: host,
Description: "kubernetes L7 health check from readiness probe.",
CheckIntervalSec: int64(rp.PeriodSeconds),
TimeoutSec: int64(rp.TimeoutSeconds),
HealthyThreshold: int64(rp.SuccessThreshold),
UnhealthyThreshold: int64(rp.FailureThreshold),
// TODO: include headers after updating compute godep.
}, nil
}
}
}
return &compute.HttpHealthCheck{
Port: port,
// Empty string is used as a signal to the caller to use the appropriate
// default.
RequestPath: "",
Description: "Default kubernetes L7 Loadbalancing health check.",
// How often to health check.
CheckIntervalSec: 1,
// How long to wait before claiming failure of a health check.
TimeoutSec: 1,
// Number of healthchecks to pass for a vm to be deemed healthy.
HealthyThreshold: 1,
// Number of healthchecks to fail before the vm is deemed unhealthy.
UnhealthyThreshold: 10,
}, nil
}

View file

@ -65,3 +65,5 @@ func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error {
f.hc = healthChecks
return nil
}
func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { return nil }

View file

@ -29,43 +29,52 @@ type HealthChecks struct {
cloud SingleHealthCheck
defaultPath string
namer *utils.Namer
healthCheckGetter
}
type healthCheckGetter interface {
// HealthCheck returns the HTTP readiness check for a node port.
HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error)
}
// NewHealthChecker creates a new health checker.
// cloud: the cloud object implementing SingleHealthCheck.
// defaultHealthCheckPath: is the HTTP path to use for health checks.
func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker {
return &HealthChecks{cloud, defaultHealthCheckPath, namer}
return &HealthChecks{cloud, defaultHealthCheckPath, namer, nil}
}
// Init initializes the health checker.
func (h *HealthChecks) Init(r healthCheckGetter) {
h.healthCheckGetter = r
}
// Add adds a healthcheck if one for the same port doesn't already exist.
func (h *HealthChecks) Add(port int64, path string) error {
hc, _ := h.Get(port)
name := h.namer.BeName(port)
if path == "" {
path = h.defaultPath
func (h *HealthChecks) Add(port int64) error {
wantHC, err := h.healthCheckGetter.HealthCheck(port)
if err != nil {
return err
}
if wantHC.RequestPath == "" {
wantHC.RequestPath = h.defaultPath
}
name := h.namer.BeName(port)
wantHC.Name = name
hc, _ := h.Get(port)
if hc == nil {
// TODO: check if the readiness probe has changed and update the
// health check.
glog.Infof("Creating health check %v", name)
if err := h.cloud.CreateHttpHealthCheck(
&compute.HttpHealthCheck{
Name: name,
Port: port,
RequestPath: path,
Description: "Default kubernetes L7 Loadbalancing health check.",
// How often to health check.
CheckIntervalSec: 1,
// How long to wait before claiming failure of a health check.
TimeoutSec: 1,
// Number of healthchecks to pass for a vm to be deemed healthy.
HealthyThreshold: 1,
// Number of healthchecks to fail before the vm is deemed unhealthy.
UnhealthyThreshold: 10,
}); err != nil {
if err := h.cloud.CreateHttpHealthCheck(wantHC); err != nil {
return err
}
} else if wantHC.RequestPath != hc.RequestPath {
// TODO: also compare headers interval etc.
glog.Infof("Updating health check %v, path %v -> %v", name, hc.RequestPath, wantHC.RequestPath)
if err := h.cloud.UpdateHttpHealthCheck(wantHC); err != nil {
return err
}
} else {
// TODO: Does this health check need an edge hop?
glog.Infof("Health check %v already exists", hc.Name)
}
return nil

View file

@ -23,13 +23,16 @@ import (
// SingleHealthCheck is an interface to manage a single GCE health check.
type SingleHealthCheck interface {
CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error
UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error
DeleteHttpHealthCheck(name string) error
GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error)
}
// HealthChecker is an interface to manage cloud HTTPHealthChecks.
type HealthChecker interface {
Add(port int64, path string) error
Init(h healthCheckGetter)
Add(port int64) error
Delete(port int64) error
Get(port int64) (*compute.HttpHealthCheck, error)
}

View file

@ -368,7 +368,7 @@ func (l *L7) checkSSLCert() (err error) {
cert, _ := l.cloud.GetSslCertificate(certName)
// PrivateKey is write only, so compare certs alone. We're assuming that
// no one will change just the key. We can remembe the key and compare,
// no one will change just the key. We can remember the key and compare,
// but a bug could end up leaking it, which feels worse.
if cert == nil || ingCert != cert.Certificate {

View file

@ -78,16 +78,28 @@ type CloudListingPool struct {
keyGetter keyFunc
}
// ReplenishPool lists through the cloudLister and inserts into the pool.
// ReplenishPool lists through the cloudLister and inserts into the pool. This
// is especially useful in scenarios like deleting an Ingress while the
// controller is restarting. As long as the resource exists in the shared
// memory pool, it is visible to the caller and they can take corrective
// actions, eg: backend pool deletes backends with non-matching node ports
// in its sync method.
func (c *CloudListingPool) ReplenishPool() {
c.lock.Lock()
defer c.lock.Unlock()
glog.V(4).Infof("Replenishing pool")
// We must list with the lock, because the controller also lists through
// Snapshot(). It's ok if the controller takes a snpshot, we list, we
// delete, because we have delete based on the most recent state. Worst
// case we thrash. It's not ok if we list, the controller lists and
// creates a backend, and we delete that backend based on stale state.
items, err := c.lister.List()
if err != nil {
glog.Warningf("Failed to list: %v", err)
return
}
for i := range items {
key, err := c.keyGetter(items[i])
if err != nil {