Handle health check for NEG

This commit is contained in:
Minhan Xia 2017-10-02 15:33:46 -07:00
parent 8f07777bea
commit a6993bb449
6 changed files with 241 additions and 32 deletions

View file

@ -93,10 +93,12 @@ func portKey(port int64) string {
// ServicePort for tupling port and protocol
type ServicePort struct {
Port int64
Protocol utils.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
Port int64
Protocol utils.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
SvcTargetPort string
NEGEnabled bool
}
// Description returns a string describing the ServicePort.
@ -171,8 +173,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
}
func (b *Backends) ensureHealthCheck(sp ServicePort) (string, error) {
hc := b.healthChecker.New(sp.Port, sp.Protocol)
hc := b.healthChecker.New(sp.Port, sp.Protocol, sp.NEGEnabled)
existingLegacyHC, err := b.healthChecker.GetLegacy(sp.Port)
if err != nil && !utils.IsNotFoundError(err) {
return "", err
@ -274,6 +275,11 @@ func (b *Backends) Add(p ServicePort, igs []*compute.InstanceGroup) error {
return nil
}
// If NEG is enabled, do not link backend service to instance groups.
if p.NEGEnabled {
return nil
}
// Verify that backend service contains links to all backends/instance-groups
return b.edgeHop(be, igs)
}
@ -370,6 +376,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
newBackends := getBackendsForIGs(addIGs, bm)
be.Backends = append(originalBackends, newBackends...)
// TODO (mixia): make sure backend-service can switch between NEG and IG
if err := b.cloud.UpdateGlobalBackendService(be); err != nil {
if utils.IsHTTPErrorCode(err, http.StatusBadRequest) {
glog.V(2).Infof("Updating backend service backends with balancing mode %v failed, will try another mode. err:%v", bm, err)

View file

@ -107,7 +107,7 @@ func TestBackendPoolAdd(t *testing.T) {
}
// Check the created healthcheck is the correct protocol
hc, err := pool.healthChecker.Get(nodePort.Port)
hc, err := pool.healthChecker.Get(nodePort.Port, false)
if err != nil {
t.Fatalf("Unexpected err when querying fake healthchecker: %v", err)
}
@ -146,7 +146,7 @@ func TestHealthCheckMigration(t *testing.T) {
pool.Add(p, nil)
// Assert the proper health check was created
hc, _ := pool.healthChecker.Get(p.Port)
hc, _ := pool.healthChecker.Get(p.Port, false)
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}
@ -181,7 +181,7 @@ func TestBackendPoolUpdate(t *testing.T) {
}
// Assert the proper health check was created
hc, _ := pool.healthChecker.Get(p.Port)
hc, _ := pool.healthChecker.Get(p.Port, false)
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}
@ -201,7 +201,7 @@ func TestBackendPoolUpdate(t *testing.T) {
}
// Assert the proper health check was created
hc, _ = pool.healthChecker.Get(p.Port)
hc, _ = pool.healthChecker.Get(p.Port, false)
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}

View file

@ -17,6 +17,7 @@ limitations under the License.
package healthchecks
import (
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
)
@ -84,6 +85,14 @@ func (f *FakeHealthCheckProvider) CreateHealthCheck(hc *compute.HealthCheck) err
return nil
}
// CreateHealthCheck fakes out http health check creation.
func (f *FakeHealthCheckProvider) CreateAlphaHealthCheck(hc *computealpha.HealthCheck) error {
v := *hc
v.SelfLink = "https://fake.google.com/compute/healthChecks/" + hc.Name
f.generic[hc.Name] = toV1HealthCheck(hc)
return nil
}
// GetHealthCheck fakes out getting a http health check from the cloud.
func (f *FakeHealthCheckProvider) GetHealthCheck(name string) (*compute.HealthCheck, error) {
if hc, found := f.generic[name]; found {
@ -93,6 +102,15 @@ func (f *FakeHealthCheckProvider) GetHealthCheck(name string) (*compute.HealthCh
return nil, fakeNotFoundErr()
}
// GetHealthCheck fakes out getting a http health check from the cloud.
func (f *FakeHealthCheckProvider) GetAlphaHealthCheck(name string) (*computealpha.HealthCheck, error) {
if hc, found := f.generic[name]; found {
return toAlphaHealthCheck(&hc), nil
}
return nil, fakeNotFoundErr()
}
// DeleteHealthCheck fakes out deleting a http health check.
func (f *FakeHealthCheckProvider) DeleteHealthCheck(name string) error {
if _, exists := f.generic[name]; !exists {
@ -112,3 +130,68 @@ func (f *FakeHealthCheckProvider) UpdateHealthCheck(hc *compute.HealthCheck) err
f.generic[hc.Name] = *hc
return nil
}
func (f *FakeHealthCheckProvider) UpdateAlphaHealthCheck(hc *computealpha.HealthCheck) error {
if _, exists := f.generic[hc.Name]; !exists {
return fakeNotFoundErr()
}
f.generic[hc.Name] = toV1HealthCheck(hc)
return nil
}
func toV1HealthCheck(hc *computealpha.HealthCheck) compute.HealthCheck {
v1hc := compute.HealthCheck{
Name: hc.Name,
Description: hc.Description,
CheckIntervalSec: hc.CheckIntervalSec,
HealthyThreshold: hc.HealthyThreshold,
UnhealthyThreshold: hc.UnhealthyThreshold,
TimeoutSec: hc.TimeoutSec,
Type: hc.Type,
SelfLink: hc.SelfLink,
}
if hc.HttpHealthCheck != nil {
v1hc.HttpHealthCheck = &compute.HTTPHealthCheck{
Port: hc.HttpHealthCheck.Port,
RequestPath: hc.HttpHealthCheck.RequestPath,
}
}
if hc.HttpsHealthCheck != nil {
v1hc.HttpsHealthCheck = &compute.HTTPSHealthCheck{
Port: hc.HttpsHealthCheck.Port,
RequestPath: hc.HttpsHealthCheck.RequestPath,
}
}
return v1hc
}
func toAlphaHealthCheck(hc *compute.HealthCheck) *computealpha.HealthCheck {
alphahc := computealpha.HealthCheck{
Name: hc.Name,
Description: hc.Description,
CheckIntervalSec: hc.CheckIntervalSec,
HealthyThreshold: hc.HealthyThreshold,
UnhealthyThreshold: hc.UnhealthyThreshold,
TimeoutSec: hc.TimeoutSec,
Type: hc.Type,
SelfLink: hc.SelfLink,
}
if hc.HttpHealthCheck != nil {
alphahc.HttpHealthCheck = &computealpha.HTTPHealthCheck{
Port: hc.HttpHealthCheck.Port,
RequestPath: hc.HttpHealthCheck.RequestPath,
}
}
if hc.HttpsHealthCheck != nil {
alphahc.HttpsHealthCheck = &computealpha.HTTPSHealthCheck{
Port: hc.HttpsHealthCheck.Port,
RequestPath: hc.HttpsHealthCheck.RequestPath,
}
}
return &alphahc
}

View file

@ -20,6 +20,7 @@ import (
"net/http"
"time"
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
"github.com/golang/glog"
@ -32,14 +33,24 @@ const (
// We're just trying to detect if the node networking is
// borked, service level outages will get detected sooner
// by kube-proxy.
// DefaultHealthCheckInterval defines how frequently a probe runs
// DefaultHealthCheckInterval defines how frequently a probe runs with IG backends
DefaultHealthCheckInterval = 60 * time.Second
// DefaultNEGHealthCheckInterval defines how frequently a probe runs with NEG backends
DefaultNEGHealthCheckInterval = 15 * time.Second
// DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy"
DefaultHealthyThreshold = 1
// DefaultUnhealthyThreshold defines the threshold of failure probes that declare a backend "unhealthy"
// DefaultUnhealthyThreshold defines the threshold of failure probes that declare a instance "unhealthy"
DefaultUnhealthyThreshold = 10
// DefaultNEGUnhealthyThreshold defines the threshold of failure probes that declare a network endpoint "unhealthy"
DefaultNEGUnhealthyThreshold = 2
// DefaultTimeout defines the timeout of each probe
DefaultTimeout = 60 * time.Second
//USE_SERVING_PORT: For NetworkEndpointGroup, the port specified for
// each network endpoint is used for health checking. For other
// backends, the port or named port specified in the Backend Service is
// used for health checking.
UseServingPortSpecification = "USE_SERVING_PORT"
)
// HealthChecks manages health checks.
@ -57,9 +68,19 @@ func NewHealthChecker(cloud HealthCheckProvider, defaultHealthCheckPath string,
}
// New returns a *HealthCheck with default settings and specified port/protocol
func (h *HealthChecks) New(port int64, protocol utils.AppProtocol) *HealthCheck {
hc := DefaultHealthCheck(port, protocol)
hc.Name = h.namer.BeName(port)
func (h *HealthChecks) New(port int64, protocol utils.AppProtocol, enableNEG bool) *HealthCheck {
var hc *HealthCheck
name := h.namer.BeName(port)
if enableNEG {
hc = DefaultNEGHealthCheck(protocol)
hc.alphaHealthCheck.Name = name
} else {
hc = DefaultHealthCheck(port, protocol)
hc.Name = name
}
// port is the key for retriving existing health-check
// TODO: rename backend-service and health-check to not use port as key
hc.Port = port
return hc
}
@ -71,23 +92,22 @@ func (h *HealthChecks) Sync(hc *HealthCheck) (string, error) {
hc.RequestPath = h.defaultPath
}
existingHC, err := h.Get(hc.Port)
existingHC, err := h.Get(hc.Port, hc.alphaHealthCheck != nil)
if err != nil {
if !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return "", err
}
glog.V(2).Infof("Creating health check for port %v with protocol %v", hc.Port, hc.Type)
if err = h.cloud.CreateHealthCheck(hc.ToComputeHealthCheck()); err != nil {
if err = h.create(hc); err != nil {
return "", err
}
return h.getHealthCheckLink(hc.Port)
}
if existingHC.Protocol() != hc.Protocol() {
glog.V(2).Infof("Updating health check %v because it has protocol %v but need %v", existingHC.Name, existingHC.Type, hc.Type)
err = h.cloud.UpdateHealthCheck(hc.ToComputeHealthCheck())
if needToUpdate(existingHC, hc) {
err = h.update(hc)
return existingHC.SelfLink, err
}
@ -103,8 +123,24 @@ func (h *HealthChecks) Sync(hc *HealthCheck) (string, error) {
return existingHC.SelfLink, nil
}
func (h *HealthChecks) create(hc *HealthCheck) error {
if hc.alphaHealthCheck == nil {
return h.cloud.CreateHealthCheck(hc.ToComputeHealthCheck())
} else {
return h.cloud.CreateAlphaHealthCheck(hc.ToAlphaComputeHealthCheck())
}
}
func (h *HealthChecks) update(hc *HealthCheck) error {
if hc.alphaHealthCheck == nil {
return h.cloud.UpdateHealthCheck(hc.ToComputeHealthCheck())
} else {
return h.cloud.UpdateAlphaHealthCheck(hc.ToAlphaComputeHealthCheck())
}
}
func (h *HealthChecks) getHealthCheckLink(port int64) (string, error) {
hc, err := h.Get(port)
hc, err := h.Get(port, false)
if err != nil {
return "", err
}
@ -119,10 +155,21 @@ func (h *HealthChecks) Delete(port int64) error {
}
// Get returns the health check by port
func (h *HealthChecks) Get(port int64) (*HealthCheck, error) {
func (h *HealthChecks) Get(port int64, alpha bool) (*HealthCheck, error) {
name := h.namer.BeName(port)
hc, err := h.cloud.GetHealthCheck(name)
return NewHealthCheck(hc), err
if alpha {
var ret *HealthCheck
hc, err := h.cloud.GetAlphaHealthCheck(name)
if err == nil {
ret = &HealthCheck{alphaHealthCheck: hc}
// SelfLink is used in return value
ret.SelfLink = hc.SelfLink
}
return ret, err
} else {
hc, err := h.cloud.GetHealthCheck(name)
return NewHealthCheck(hc), err
}
}
// GetLegacy deletes legacy HTTP health checks
@ -166,6 +213,35 @@ func DefaultHealthCheck(port int64, protocol utils.AppProtocol) *HealthCheck {
}
}
// DefaultHealthCheck simply returns the default health check.
func DefaultNEGHealthCheck(protocol utils.AppProtocol) *HealthCheck {
hc := computealpha.HealthCheck{
// How often to health check.
CheckIntervalSec: int64(DefaultNEGHealthCheckInterval.Seconds()),
// How long to wait before claiming failure of a health check.
TimeoutSec: int64(DefaultNEGHealthCheckInterval.Seconds()),
// Number of healthchecks to pass for a vm to be deemed healthy.
HealthyThreshold: DefaultHealthyThreshold,
// Number of healthchecks to fail before the vm is deemed unhealthy.
UnhealthyThreshold: DefaultNEGUnhealthyThreshold,
Description: "Default kubernetes L7 Loadbalancing health check for NEG.",
Type: string(protocol),
}
if protocol == utils.ProtocolHTTP {
hc.HttpHealthCheck = &computealpha.HTTPHealthCheck{
PortSpecification: UseServingPortSpecification,
}
}
if protocol == utils.ProtocolHTTPS {
hc.HttpsHealthCheck = &computealpha.HTTPSHealthCheck{
PortSpecification: UseServingPortSpecification,
}
}
return &HealthCheck{
alphaHealthCheck: &hc,
}
}
// HealthCheck embeds two types - the generic healthcheck compute.HealthCheck
// and the HTTP settings compute.HTTPHealthCheck. By embedding both, consumers can modify
// all relevant settings (HTTP specific and HealthCheck generic) regardless of Type
@ -174,6 +250,7 @@ func DefaultHealthCheck(port int64, protocol utils.AppProtocol) *HealthCheck {
type HealthCheck struct {
compute.HTTPHealthCheck
compute.HealthCheck
alphaHealthCheck *computealpha.HealthCheck
}
// NewHealthCheck creates a HealthCheck which abstracts nested structs away
@ -221,3 +298,41 @@ func (hc *HealthCheck) ToComputeHealthCheck() *compute.HealthCheck {
return &hc.HealthCheck
}
// ToComputeHealthCheck returns a valid compute.HealthCheck object
func (hc *HealthCheck) ToAlphaComputeHealthCheck() *computealpha.HealthCheck {
return hc.alphaHealthCheck
}
func needToUpdate(old, new *HealthCheck) bool {
if old.alphaHealthCheck != nil && new.alphaHealthCheck != nil {
var oldPortSpec, newPortSpec string
if old.alphaHealthCheck.HttpHealthCheck != nil {
oldPortSpec = old.alphaHealthCheck.HttpHealthCheck.PortSpecification
}
if new.alphaHealthCheck.HttpHealthCheck != nil {
newPortSpec = new.alphaHealthCheck.HttpHealthCheck.PortSpecification
}
if oldPortSpec != newPortSpec {
glog.V(2).Infof("Updating health check %v because it has http port specification %q but need %q", old.Name, oldPortSpec, newPortSpec)
return true
}
if old.alphaHealthCheck.HttpsHealthCheck != nil {
oldPortSpec = old.alphaHealthCheck.HttpsHealthCheck.PortSpecification
}
if new.alphaHealthCheck.HttpsHealthCheck != nil {
newPortSpec = new.alphaHealthCheck.HttpsHealthCheck.PortSpecification
}
if oldPortSpec != newPortSpec {
glog.V(2).Infof("Updating health check %v because it has https port specification %q but need %q", old.Name, oldPortSpec, newPortSpec)
return true
}
} else {
if old.Protocol() != new.Protocol() {
glog.V(2).Infof("Updating health check %v because it has protocol %v but need %v", old.Name, old.Type, new.Type)
return true
}
}
return false
}

View file

@ -30,7 +30,7 @@ func TestHealthCheckAdd(t *testing.T) {
hcp := NewFakeHealthCheckProvider()
healthChecks := NewHealthChecker(hcp, "/", namer)
hc := healthChecks.New(80, utils.ProtocolHTTP)
hc := healthChecks.New(80, utils.ProtocolHTTP, false)
_, err := healthChecks.Sync(hc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -41,7 +41,7 @@ func TestHealthCheckAdd(t *testing.T) {
t.Fatalf("expected the health check to exist, err: %v", err)
}
hc = healthChecks.New(443, utils.ProtocolHTTPS)
hc = healthChecks.New(443, utils.ProtocolHTTPS, false)
_, err = healthChecks.Sync(hc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -66,7 +66,7 @@ func TestHealthCheckAddExisting(t *testing.T) {
hcp.CreateHealthCheck(httpHC.ToComputeHealthCheck())
// Should not fail adding the same type of health check
hc := healthChecks.New(3000, utils.ProtocolHTTP)
hc := healthChecks.New(3000, utils.ProtocolHTTP, false)
_, err := healthChecks.Sync(hc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -84,7 +84,7 @@ func TestHealthCheckAddExisting(t *testing.T) {
httpsHC.RequestPath = "/my-probes-health"
hcp.CreateHealthCheck(httpsHC.ToComputeHealthCheck())
hc = healthChecks.New(4000, utils.ProtocolHTTPS)
hc = healthChecks.New(4000, utils.ProtocolHTTPS, false)
_, err = healthChecks.Sync(hc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@ -142,7 +142,7 @@ func TestHealthCheckUpdate(t *testing.T) {
hcp.CreateHealthCheck(hc.ToComputeHealthCheck())
// Verify the health check exists
_, err := healthChecks.Get(3000)
_, err := healthChecks.Get(3000, false)
if err != nil {
t.Fatalf("expected the health check to exist, err: %v", err)
}
@ -155,7 +155,7 @@ func TestHealthCheckUpdate(t *testing.T) {
}
// Verify the health check exists
_, err = healthChecks.Get(3000)
_, err = healthChecks.Get(3000, false)
if err != nil {
t.Fatalf("expected the health check to exist, err: %v", err)
}

View file

@ -17,6 +17,7 @@ limitations under the License.
package healthchecks
import (
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
"k8s.io/ingress/controllers/gce/utils"
@ -29,18 +30,21 @@ type HealthCheckProvider interface {
DeleteHttpHealthCheck(name string) error
GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error)
CreateAlphaHealthCheck(hc *computealpha.HealthCheck) error
CreateHealthCheck(hc *compute.HealthCheck) error
UpdateAlphaHealthCheck(hc *computealpha.HealthCheck) error
UpdateHealthCheck(hc *compute.HealthCheck) error
DeleteHealthCheck(name string) error
GetAlphaHealthCheck(name string) (*computealpha.HealthCheck, error)
GetHealthCheck(name string) (*compute.HealthCheck, error)
}
// HealthChecker is an interface to manage cloud HTTPHealthChecks.
type HealthChecker interface {
New(port int64, protocol utils.AppProtocol) *HealthCheck
New(port int64, protocol utils.AppProtocol, enableNEG bool) *HealthCheck
Sync(hc *HealthCheck) (string, error)
Delete(port int64) error
Get(port int64) (*HealthCheck, error)
Get(port int64, alpha bool) (*HealthCheck, error)
GetLegacy(port int64) (*compute.HttpHealthCheck, error)
DeleteLegacy(port int64) error
}