From a6993bb449be4668a260e38bf59f9693cfa4b990 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Mon, 2 Oct 2017 15:33:46 -0700 Subject: [PATCH] Handle health check for NEG --- controllers/gce/backends/backends.go | 19 ++- controllers/gce/backends/backends_test.go | 8 +- controllers/gce/healthchecks/fakes.go | 83 ++++++++++ controllers/gce/healthchecks/healthchecks.go | 143 ++++++++++++++++-- .../gce/healthchecks/healthchecks_test.go | 12 +- controllers/gce/healthchecks/interfaces.go | 8 +- 6 files changed, 241 insertions(+), 32 deletions(-) diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index 49b632efe..6effeae97 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -93,10 +93,12 @@ func portKey(port int64) string { // ServicePort for tupling port and protocol type ServicePort struct { - Port int64 - Protocol utils.AppProtocol - SvcName types.NamespacedName - SvcPort intstr.IntOrString + Port int64 + Protocol utils.AppProtocol + SvcName types.NamespacedName + SvcPort intstr.IntOrString + SvcTargetPort string + NEGEnabled bool } // Description returns a string describing the ServicePort. @@ -171,8 +173,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) { } func (b *Backends) ensureHealthCheck(sp ServicePort) (string, error) { - hc := b.healthChecker.New(sp.Port, sp.Protocol) - + hc := b.healthChecker.New(sp.Port, sp.Protocol, sp.NEGEnabled) existingLegacyHC, err := b.healthChecker.GetLegacy(sp.Port) if err != nil && !utils.IsNotFoundError(err) { return "", err @@ -274,6 +275,11 @@ func (b *Backends) Add(p ServicePort, igs []*compute.InstanceGroup) error { return nil } + // If NEG is enabled, do not link backend service to instance groups. + if p.NEGEnabled { + return nil + } + // Verify that backend service contains links to all backends/instance-groups return b.edgeHop(be, igs) } @@ -370,6 +376,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr newBackends := getBackendsForIGs(addIGs, bm) be.Backends = append(originalBackends, newBackends...) + // TODO (mixia): make sure backend-service can switch between NEG and IG if err := b.cloud.UpdateGlobalBackendService(be); err != nil { if utils.IsHTTPErrorCode(err, http.StatusBadRequest) { glog.V(2).Infof("Updating backend service backends with balancing mode %v failed, will try another mode. err:%v", bm, err) diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index 8d116ac6e..075804a92 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -107,7 +107,7 @@ func TestBackendPoolAdd(t *testing.T) { } // Check the created healthcheck is the correct protocol - hc, err := pool.healthChecker.Get(nodePort.Port) + hc, err := pool.healthChecker.Get(nodePort.Port, false) if err != nil { t.Fatalf("Unexpected err when querying fake healthchecker: %v", err) } @@ -146,7 +146,7 @@ func TestHealthCheckMigration(t *testing.T) { pool.Add(p, nil) // Assert the proper health check was created - hc, _ := pool.healthChecker.Get(p.Port) + hc, _ := pool.healthChecker.Get(p.Port, false) if hc == nil || hc.Protocol() != p.Protocol { t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc) } @@ -181,7 +181,7 @@ func TestBackendPoolUpdate(t *testing.T) { } // Assert the proper health check was created - hc, _ := pool.healthChecker.Get(p.Port) + hc, _ := pool.healthChecker.Get(p.Port, false) if hc == nil || hc.Protocol() != p.Protocol { t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc) } @@ -201,7 +201,7 @@ func TestBackendPoolUpdate(t *testing.T) { } // Assert the proper health check was created - hc, _ = pool.healthChecker.Get(p.Port) + hc, _ = pool.healthChecker.Get(p.Port, false) if hc == nil || hc.Protocol() != p.Protocol { t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc) } diff --git a/controllers/gce/healthchecks/fakes.go b/controllers/gce/healthchecks/fakes.go index 013e3bf26..826962240 100644 --- a/controllers/gce/healthchecks/fakes.go +++ b/controllers/gce/healthchecks/fakes.go @@ -17,6 +17,7 @@ limitations under the License. package healthchecks import ( + computealpha "google.golang.org/api/compute/v0.alpha" compute "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" ) @@ -84,6 +85,14 @@ func (f *FakeHealthCheckProvider) CreateHealthCheck(hc *compute.HealthCheck) err return nil } +// CreateHealthCheck fakes out http health check creation. +func (f *FakeHealthCheckProvider) CreateAlphaHealthCheck(hc *computealpha.HealthCheck) error { + v := *hc + v.SelfLink = "https://fake.google.com/compute/healthChecks/" + hc.Name + f.generic[hc.Name] = toV1HealthCheck(hc) + return nil +} + // GetHealthCheck fakes out getting a http health check from the cloud. func (f *FakeHealthCheckProvider) GetHealthCheck(name string) (*compute.HealthCheck, error) { if hc, found := f.generic[name]; found { @@ -93,6 +102,15 @@ func (f *FakeHealthCheckProvider) GetHealthCheck(name string) (*compute.HealthCh return nil, fakeNotFoundErr() } +// GetHealthCheck fakes out getting a http health check from the cloud. +func (f *FakeHealthCheckProvider) GetAlphaHealthCheck(name string) (*computealpha.HealthCheck, error) { + if hc, found := f.generic[name]; found { + return toAlphaHealthCheck(&hc), nil + } + + return nil, fakeNotFoundErr() +} + // DeleteHealthCheck fakes out deleting a http health check. func (f *FakeHealthCheckProvider) DeleteHealthCheck(name string) error { if _, exists := f.generic[name]; !exists { @@ -112,3 +130,68 @@ func (f *FakeHealthCheckProvider) UpdateHealthCheck(hc *compute.HealthCheck) err f.generic[hc.Name] = *hc return nil } + +func (f *FakeHealthCheckProvider) UpdateAlphaHealthCheck(hc *computealpha.HealthCheck) error { + if _, exists := f.generic[hc.Name]; !exists { + return fakeNotFoundErr() + } + + f.generic[hc.Name] = toV1HealthCheck(hc) + return nil +} + +func toV1HealthCheck(hc *computealpha.HealthCheck) compute.HealthCheck { + v1hc := compute.HealthCheck{ + Name: hc.Name, + Description: hc.Description, + CheckIntervalSec: hc.CheckIntervalSec, + HealthyThreshold: hc.HealthyThreshold, + UnhealthyThreshold: hc.UnhealthyThreshold, + TimeoutSec: hc.TimeoutSec, + Type: hc.Type, + SelfLink: hc.SelfLink, + } + + if hc.HttpHealthCheck != nil { + v1hc.HttpHealthCheck = &compute.HTTPHealthCheck{ + Port: hc.HttpHealthCheck.Port, + RequestPath: hc.HttpHealthCheck.RequestPath, + } + } + + if hc.HttpsHealthCheck != nil { + v1hc.HttpsHealthCheck = &compute.HTTPSHealthCheck{ + Port: hc.HttpsHealthCheck.Port, + RequestPath: hc.HttpsHealthCheck.RequestPath, + } + } + return v1hc +} + +func toAlphaHealthCheck(hc *compute.HealthCheck) *computealpha.HealthCheck { + alphahc := computealpha.HealthCheck{ + Name: hc.Name, + Description: hc.Description, + CheckIntervalSec: hc.CheckIntervalSec, + HealthyThreshold: hc.HealthyThreshold, + UnhealthyThreshold: hc.UnhealthyThreshold, + TimeoutSec: hc.TimeoutSec, + Type: hc.Type, + SelfLink: hc.SelfLink, + } + + if hc.HttpHealthCheck != nil { + alphahc.HttpHealthCheck = &computealpha.HTTPHealthCheck{ + Port: hc.HttpHealthCheck.Port, + RequestPath: hc.HttpHealthCheck.RequestPath, + } + } + + if hc.HttpsHealthCheck != nil { + alphahc.HttpsHealthCheck = &computealpha.HTTPSHealthCheck{ + Port: hc.HttpsHealthCheck.Port, + RequestPath: hc.HttpsHealthCheck.RequestPath, + } + } + return &alphahc +} diff --git a/controllers/gce/healthchecks/healthchecks.go b/controllers/gce/healthchecks/healthchecks.go index b150f2f2a..d2bbd2434 100644 --- a/controllers/gce/healthchecks/healthchecks.go +++ b/controllers/gce/healthchecks/healthchecks.go @@ -20,6 +20,7 @@ import ( "net/http" "time" + computealpha "google.golang.org/api/compute/v0.alpha" compute "google.golang.org/api/compute/v1" "github.com/golang/glog" @@ -32,14 +33,24 @@ const ( // We're just trying to detect if the node networking is // borked, service level outages will get detected sooner // by kube-proxy. - // DefaultHealthCheckInterval defines how frequently a probe runs + // DefaultHealthCheckInterval defines how frequently a probe runs with IG backends DefaultHealthCheckInterval = 60 * time.Second + // DefaultNEGHealthCheckInterval defines how frequently a probe runs with NEG backends + DefaultNEGHealthCheckInterval = 15 * time.Second // DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy" DefaultHealthyThreshold = 1 - // DefaultUnhealthyThreshold defines the threshold of failure probes that declare a backend "unhealthy" + // DefaultUnhealthyThreshold defines the threshold of failure probes that declare a instance "unhealthy" DefaultUnhealthyThreshold = 10 + // DefaultNEGUnhealthyThreshold defines the threshold of failure probes that declare a network endpoint "unhealthy" + DefaultNEGUnhealthyThreshold = 2 // DefaultTimeout defines the timeout of each probe DefaultTimeout = 60 * time.Second + + //USE_SERVING_PORT: For NetworkEndpointGroup, the port specified for + // each network endpoint is used for health checking. For other + // backends, the port or named port specified in the Backend Service is + // used for health checking. + UseServingPortSpecification = "USE_SERVING_PORT" ) // HealthChecks manages health checks. @@ -57,9 +68,19 @@ func NewHealthChecker(cloud HealthCheckProvider, defaultHealthCheckPath string, } // New returns a *HealthCheck with default settings and specified port/protocol -func (h *HealthChecks) New(port int64, protocol utils.AppProtocol) *HealthCheck { - hc := DefaultHealthCheck(port, protocol) - hc.Name = h.namer.BeName(port) +func (h *HealthChecks) New(port int64, protocol utils.AppProtocol, enableNEG bool) *HealthCheck { + var hc *HealthCheck + name := h.namer.BeName(port) + if enableNEG { + hc = DefaultNEGHealthCheck(protocol) + hc.alphaHealthCheck.Name = name + } else { + hc = DefaultHealthCheck(port, protocol) + hc.Name = name + } + // port is the key for retriving existing health-check + // TODO: rename backend-service and health-check to not use port as key + hc.Port = port return hc } @@ -71,23 +92,22 @@ func (h *HealthChecks) Sync(hc *HealthCheck) (string, error) { hc.RequestPath = h.defaultPath } - existingHC, err := h.Get(hc.Port) + existingHC, err := h.Get(hc.Port, hc.alphaHealthCheck != nil) if err != nil { if !utils.IsHTTPErrorCode(err, http.StatusNotFound) { return "", err } glog.V(2).Infof("Creating health check for port %v with protocol %v", hc.Port, hc.Type) - if err = h.cloud.CreateHealthCheck(hc.ToComputeHealthCheck()); err != nil { + if err = h.create(hc); err != nil { return "", err } return h.getHealthCheckLink(hc.Port) } - if existingHC.Protocol() != hc.Protocol() { - glog.V(2).Infof("Updating health check %v because it has protocol %v but need %v", existingHC.Name, existingHC.Type, hc.Type) - err = h.cloud.UpdateHealthCheck(hc.ToComputeHealthCheck()) + if needToUpdate(existingHC, hc) { + err = h.update(hc) return existingHC.SelfLink, err } @@ -103,8 +123,24 @@ func (h *HealthChecks) Sync(hc *HealthCheck) (string, error) { return existingHC.SelfLink, nil } +func (h *HealthChecks) create(hc *HealthCheck) error { + if hc.alphaHealthCheck == nil { + return h.cloud.CreateHealthCheck(hc.ToComputeHealthCheck()) + } else { + return h.cloud.CreateAlphaHealthCheck(hc.ToAlphaComputeHealthCheck()) + } +} + +func (h *HealthChecks) update(hc *HealthCheck) error { + if hc.alphaHealthCheck == nil { + return h.cloud.UpdateHealthCheck(hc.ToComputeHealthCheck()) + } else { + return h.cloud.UpdateAlphaHealthCheck(hc.ToAlphaComputeHealthCheck()) + } +} + func (h *HealthChecks) getHealthCheckLink(port int64) (string, error) { - hc, err := h.Get(port) + hc, err := h.Get(port, false) if err != nil { return "", err } @@ -119,10 +155,21 @@ func (h *HealthChecks) Delete(port int64) error { } // Get returns the health check by port -func (h *HealthChecks) Get(port int64) (*HealthCheck, error) { +func (h *HealthChecks) Get(port int64, alpha bool) (*HealthCheck, error) { name := h.namer.BeName(port) - hc, err := h.cloud.GetHealthCheck(name) - return NewHealthCheck(hc), err + if alpha { + var ret *HealthCheck + hc, err := h.cloud.GetAlphaHealthCheck(name) + if err == nil { + ret = &HealthCheck{alphaHealthCheck: hc} + // SelfLink is used in return value + ret.SelfLink = hc.SelfLink + } + return ret, err + } else { + hc, err := h.cloud.GetHealthCheck(name) + return NewHealthCheck(hc), err + } } // GetLegacy deletes legacy HTTP health checks @@ -166,6 +213,35 @@ func DefaultHealthCheck(port int64, protocol utils.AppProtocol) *HealthCheck { } } +// DefaultHealthCheck simply returns the default health check. +func DefaultNEGHealthCheck(protocol utils.AppProtocol) *HealthCheck { + hc := computealpha.HealthCheck{ + // How often to health check. + CheckIntervalSec: int64(DefaultNEGHealthCheckInterval.Seconds()), + // How long to wait before claiming failure of a health check. + TimeoutSec: int64(DefaultNEGHealthCheckInterval.Seconds()), + // Number of healthchecks to pass for a vm to be deemed healthy. + HealthyThreshold: DefaultHealthyThreshold, + // Number of healthchecks to fail before the vm is deemed unhealthy. + UnhealthyThreshold: DefaultNEGUnhealthyThreshold, + Description: "Default kubernetes L7 Loadbalancing health check for NEG.", + Type: string(protocol), + } + if protocol == utils.ProtocolHTTP { + hc.HttpHealthCheck = &computealpha.HTTPHealthCheck{ + PortSpecification: UseServingPortSpecification, + } + } + if protocol == utils.ProtocolHTTPS { + hc.HttpsHealthCheck = &computealpha.HTTPSHealthCheck{ + PortSpecification: UseServingPortSpecification, + } + } + return &HealthCheck{ + alphaHealthCheck: &hc, + } +} + // HealthCheck embeds two types - the generic healthcheck compute.HealthCheck // and the HTTP settings compute.HTTPHealthCheck. By embedding both, consumers can modify // all relevant settings (HTTP specific and HealthCheck generic) regardless of Type @@ -174,6 +250,7 @@ func DefaultHealthCheck(port int64, protocol utils.AppProtocol) *HealthCheck { type HealthCheck struct { compute.HTTPHealthCheck compute.HealthCheck + alphaHealthCheck *computealpha.HealthCheck } // NewHealthCheck creates a HealthCheck which abstracts nested structs away @@ -221,3 +298,41 @@ func (hc *HealthCheck) ToComputeHealthCheck() *compute.HealthCheck { return &hc.HealthCheck } + +// ToComputeHealthCheck returns a valid compute.HealthCheck object +func (hc *HealthCheck) ToAlphaComputeHealthCheck() *computealpha.HealthCheck { + return hc.alphaHealthCheck +} + +func needToUpdate(old, new *HealthCheck) bool { + if old.alphaHealthCheck != nil && new.alphaHealthCheck != nil { + var oldPortSpec, newPortSpec string + if old.alphaHealthCheck.HttpHealthCheck != nil { + oldPortSpec = old.alphaHealthCheck.HttpHealthCheck.PortSpecification + } + if new.alphaHealthCheck.HttpHealthCheck != nil { + newPortSpec = new.alphaHealthCheck.HttpHealthCheck.PortSpecification + } + if oldPortSpec != newPortSpec { + glog.V(2).Infof("Updating health check %v because it has http port specification %q but need %q", old.Name, oldPortSpec, newPortSpec) + return true + } + if old.alphaHealthCheck.HttpsHealthCheck != nil { + oldPortSpec = old.alphaHealthCheck.HttpsHealthCheck.PortSpecification + } + if new.alphaHealthCheck.HttpsHealthCheck != nil { + newPortSpec = new.alphaHealthCheck.HttpsHealthCheck.PortSpecification + } + if oldPortSpec != newPortSpec { + glog.V(2).Infof("Updating health check %v because it has https port specification %q but need %q", old.Name, oldPortSpec, newPortSpec) + return true + } + + } else { + if old.Protocol() != new.Protocol() { + glog.V(2).Infof("Updating health check %v because it has protocol %v but need %v", old.Name, old.Type, new.Type) + return true + } + } + return false +} diff --git a/controllers/gce/healthchecks/healthchecks_test.go b/controllers/gce/healthchecks/healthchecks_test.go index 0f6abc358..72ec34783 100644 --- a/controllers/gce/healthchecks/healthchecks_test.go +++ b/controllers/gce/healthchecks/healthchecks_test.go @@ -30,7 +30,7 @@ func TestHealthCheckAdd(t *testing.T) { hcp := NewFakeHealthCheckProvider() healthChecks := NewHealthChecker(hcp, "/", namer) - hc := healthChecks.New(80, utils.ProtocolHTTP) + hc := healthChecks.New(80, utils.ProtocolHTTP, false) _, err := healthChecks.Sync(hc) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -41,7 +41,7 @@ func TestHealthCheckAdd(t *testing.T) { t.Fatalf("expected the health check to exist, err: %v", err) } - hc = healthChecks.New(443, utils.ProtocolHTTPS) + hc = healthChecks.New(443, utils.ProtocolHTTPS, false) _, err = healthChecks.Sync(hc) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -66,7 +66,7 @@ func TestHealthCheckAddExisting(t *testing.T) { hcp.CreateHealthCheck(httpHC.ToComputeHealthCheck()) // Should not fail adding the same type of health check - hc := healthChecks.New(3000, utils.ProtocolHTTP) + hc := healthChecks.New(3000, utils.ProtocolHTTP, false) _, err := healthChecks.Sync(hc) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -84,7 +84,7 @@ func TestHealthCheckAddExisting(t *testing.T) { httpsHC.RequestPath = "/my-probes-health" hcp.CreateHealthCheck(httpsHC.ToComputeHealthCheck()) - hc = healthChecks.New(4000, utils.ProtocolHTTPS) + hc = healthChecks.New(4000, utils.ProtocolHTTPS, false) _, err = healthChecks.Sync(hc) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -142,7 +142,7 @@ func TestHealthCheckUpdate(t *testing.T) { hcp.CreateHealthCheck(hc.ToComputeHealthCheck()) // Verify the health check exists - _, err := healthChecks.Get(3000) + _, err := healthChecks.Get(3000, false) if err != nil { t.Fatalf("expected the health check to exist, err: %v", err) } @@ -155,7 +155,7 @@ func TestHealthCheckUpdate(t *testing.T) { } // Verify the health check exists - _, err = healthChecks.Get(3000) + _, err = healthChecks.Get(3000, false) if err != nil { t.Fatalf("expected the health check to exist, err: %v", err) } diff --git a/controllers/gce/healthchecks/interfaces.go b/controllers/gce/healthchecks/interfaces.go index e63f1f491..da950d708 100644 --- a/controllers/gce/healthchecks/interfaces.go +++ b/controllers/gce/healthchecks/interfaces.go @@ -17,6 +17,7 @@ limitations under the License. package healthchecks import ( + computealpha "google.golang.org/api/compute/v0.alpha" compute "google.golang.org/api/compute/v1" "k8s.io/ingress/controllers/gce/utils" @@ -29,18 +30,21 @@ type HealthCheckProvider interface { DeleteHttpHealthCheck(name string) error GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) + CreateAlphaHealthCheck(hc *computealpha.HealthCheck) error CreateHealthCheck(hc *compute.HealthCheck) error + UpdateAlphaHealthCheck(hc *computealpha.HealthCheck) error UpdateHealthCheck(hc *compute.HealthCheck) error DeleteHealthCheck(name string) error + GetAlphaHealthCheck(name string) (*computealpha.HealthCheck, error) GetHealthCheck(name string) (*compute.HealthCheck, error) } // HealthChecker is an interface to manage cloud HTTPHealthChecks. type HealthChecker interface { - New(port int64, protocol utils.AppProtocol) *HealthCheck + New(port int64, protocol utils.AppProtocol, enableNEG bool) *HealthCheck Sync(hc *HealthCheck) (string, error) Delete(port int64) error - Get(port int64) (*HealthCheck, error) + Get(port int64, alpha bool) (*HealthCheck, error) GetLegacy(port int64) (*compute.HttpHealthCheck, error) DeleteLegacy(port int64) error }