Fix: only take readinessProbe settings if protocols match; address review comments; rename protocol constants
This commit is contained in:
parent
52bc74d315
commit
6af52e7195
12 changed files with 83 additions and 78 deletions
|
@ -157,15 +157,15 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
|
||||||
return be, nil
|
return be, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Backends) ensureHealthCheck(port int64, protocol utils.AppProtocol) (string, error) {
|
func (b *Backends) ensureHealthCheck(sp ServicePort) (string, error) {
|
||||||
hc := b.healthChecker.New(port, protocol)
|
hc := b.healthChecker.New(sp.Port, sp.Protocol)
|
||||||
if b.prober != nil {
|
if b.prober != nil {
|
||||||
probe, err := b.prober.GetProbe(port)
|
probe, err := b.prober.GetProbe(sp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
if probe != nil {
|
if probe != nil {
|
||||||
glog.Infof("Applying httpGet settings of readinessProbe to health check on port %v", port)
|
glog.V(1).Infof("Applying httpGet settings of readinessProbe to health check on port %+v", sp)
|
||||||
applyProbeSettingsToHC(probe, hc)
|
applyProbeSettingsToHC(probe, hc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -237,7 +237,7 @@ func (b *Backends) Add(p ServicePort) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure health check for backend service exists
|
// Ensure health check for backend service exists
|
||||||
hcLink, err := b.ensureHealthCheck(p.Port, p.Protocol)
|
hcLink, err := b.ensureHealthCheck(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -245,7 +245,7 @@ func (b *Backends) Add(p ServicePort) error {
|
||||||
pName := b.namer.BeName(p.Port)
|
pName := b.namer.BeName(p.Port)
|
||||||
be, _ = b.Get(p.Port)
|
be, _ = b.Get(p.Port)
|
||||||
if be == nil {
|
if be == nil {
|
||||||
glog.Infof("Creating backend for %d instance groups, port %v named port %v", len(igs), p.Port, namedPort)
|
glog.V(1).Infof("Creating backend for %d instance groups, port %v named port %v", len(igs), p.Port, namedPort)
|
||||||
be, err = b.create(igs, namedPort, hcLink, p.Protocol, pName)
|
be, err = b.create(igs, namedPort, hcLink, p.Protocol, pName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -421,20 +421,12 @@ func applyProbeSettingsToHC(p *api_v1.Probe, hc *healthchecks.HealthCheck) {
|
||||||
healthPath := p.Handler.HTTPGet.Path
|
healthPath := p.Handler.HTTPGet.Path
|
||||||
// GCE requires a leading "/" for health check urls.
|
// GCE requires a leading "/" for health check urls.
|
||||||
if !strings.HasPrefix(healthPath, "/") {
|
if !strings.HasPrefix(healthPath, "/") {
|
||||||
healthPath = fmt.Sprintf("/%v", healthPath)
|
healthPath = "/" + healthPath
|
||||||
}
|
}
|
||||||
|
|
||||||
host := p.Handler.HTTPGet.Host
|
|
||||||
// remember the ingresses that use this Service so we can send
|
|
||||||
// the right events
|
|
||||||
|
|
||||||
hc.RequestPath = healthPath
|
hc.RequestPath = healthPath
|
||||||
hc.Host = host
|
hc.Host = p.Handler.HTTPGet.Host
|
||||||
hc.Description = "Kubernetes L7 health check generated with readiness probe settings."
|
hc.Description = "Kubernetes L7 health check generated with readiness probe settings."
|
||||||
// set a low health threshold and a high failure threshold.
|
|
||||||
// We're just trying to detect if the node networking is
|
|
||||||
// borked, service level outages will get detected sooner
|
|
||||||
// by kube-proxy.
|
|
||||||
hc.CheckIntervalSec = int64(p.PeriodSeconds + healthchecks.DefaultHealthCheckInterval)
|
hc.CheckIntervalSec = int64(p.PeriodSeconds + healthchecks.DefaultHealthCheckInterval)
|
||||||
hc.TimeoutSec = int64(p.TimeoutSeconds)
|
hc.TimeoutSec = int64(p.TimeoutSeconds)
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,11 +40,11 @@ var noOpErrFunc = func(op int, be *compute.BackendService) error { return nil }
|
||||||
var existingProbe = &api_v1.Probe{
|
var existingProbe = &api_v1.Probe{
|
||||||
Handler: api_v1.Handler{
|
Handler: api_v1.Handler{
|
||||||
HTTPGet: &api_v1.HTTPGetAction{
|
HTTPGet: &api_v1.HTTPGetAction{
|
||||||
Scheme: api_v1.URISchemeHTTP,
|
Scheme: api_v1.URISchemeHTTPS,
|
||||||
Path: "/my-special-path",
|
Path: "/my-special-path",
|
||||||
Port: intstr.IntOrString{
|
Port: intstr.IntOrString{
|
||||||
Type: intstr.Int,
|
Type: intstr.Int,
|
||||||
IntVal: 80,
|
IntVal: 443,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -56,7 +56,7 @@ func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWit
|
||||||
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
|
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
|
||||||
healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthCheckProvider(), "/", namer)
|
healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthCheckProvider(), "/", namer)
|
||||||
bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, syncWithCloud)
|
bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, syncWithCloud)
|
||||||
probes := map[int64]*api_v1.Probe{80: existingProbe}
|
probes := map[ServicePort]*api_v1.Probe{{Port: 443, Protocol: utils.ProtocolHTTPS}: existingProbe}
|
||||||
bp.Init(NewFakeProbeProvider(probes))
|
bp.Init(NewFakeProbeProvider(probes))
|
||||||
return bp
|
return bp
|
||||||
}
|
}
|
||||||
|
@ -68,11 +68,12 @@ func TestBackendPoolAdd(t *testing.T) {
|
||||||
namer := utils.Namer{}
|
namer := utils.Namer{}
|
||||||
|
|
||||||
testCases := []ServicePort{
|
testCases := []ServicePort{
|
||||||
{80, utils.HTTP},
|
{80, utils.ProtocolHTTP},
|
||||||
{443, utils.HTTPS},
|
{443, utils.ProtocolHTTPS},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, nodePort := range testCases {
|
for _, nodePort := range testCases {
|
||||||
|
// For simplicity, these tests use 80/443 as nodeports
|
||||||
t.Run(fmt.Sprintf("Port:%v Protocol:%v", nodePort.Port, nodePort.Protocol), func(t *testing.T) {
|
t.Run(fmt.Sprintf("Port:%v Protocol:%v", nodePort.Port, nodePort.Protocol), func(t *testing.T) {
|
||||||
// Add a backend for a port, then re-add the same port and
|
// Add a backend for a port, then re-add the same port and
|
||||||
// make sure it corrects a broken link from the backend to
|
// make sure it corrects a broken link from the backend to
|
||||||
|
@ -113,6 +114,10 @@ func TestBackendPoolAdd(t *testing.T) {
|
||||||
if hc.Protocol() != nodePort.Protocol {
|
if hc.Protocol() != nodePort.Protocol {
|
||||||
t.Fatalf("Healthcheck scheme does not match nodeport scheme: hc:%v np:%v", hc.Protocol(), nodePort.Protocol)
|
t.Fatalf("Healthcheck scheme does not match nodeport scheme: hc:%v np:%v", hc.Protocol(), nodePort.Protocol)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if nodePort.Port == 443 && hc.RequestPath != "/my-special-path" {
|
||||||
|
t.Fatalf("Healthcheck for 443 should have special request path from probe")
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -123,7 +128,7 @@ func TestBackendPoolUpdate(t *testing.T) {
|
||||||
pool := newBackendPool(f, fakeIGs, false)
|
pool := newBackendPool(f, fakeIGs, false)
|
||||||
namer := utils.Namer{}
|
namer := utils.Namer{}
|
||||||
|
|
||||||
p := ServicePort{Port: 3000, Protocol: utils.HTTP}
|
p := ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP}
|
||||||
pool.Add(p)
|
pool.Add(p)
|
||||||
beName := namer.BeName(p.Port)
|
beName := namer.BeName(p.Port)
|
||||||
|
|
||||||
|
@ -143,7 +148,7 @@ func TestBackendPoolUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update service port to encrypted
|
// Update service port to encrypted
|
||||||
p.Protocol = utils.HTTPS
|
p.Protocol = utils.ProtocolHTTPS
|
||||||
pool.Sync([]ServicePort{p})
|
pool.Sync([]ServicePort{p})
|
||||||
|
|
||||||
be, err = f.GetBackendService(beName)
|
be, err = f.GetBackendService(beName)
|
||||||
|
@ -169,7 +174,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) {
|
||||||
pool := newBackendPool(f, fakeIGs, false)
|
pool := newBackendPool(f, fakeIGs, false)
|
||||||
namer := utils.Namer{}
|
namer := utils.Namer{}
|
||||||
|
|
||||||
nodePort := ServicePort{Port: 8080, Protocol: utils.HTTP}
|
nodePort := ServicePort{Port: 8080, Protocol: utils.ProtocolHTTP}
|
||||||
pool.Add(nodePort)
|
pool.Add(nodePort)
|
||||||
beName := namer.BeName(nodePort.Port)
|
beName := namer.BeName(nodePort.Port)
|
||||||
|
|
||||||
|
@ -212,7 +217,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) {
|
||||||
func TestBackendPoolSync(t *testing.T) {
|
func TestBackendPoolSync(t *testing.T) {
|
||||||
// Call sync on a backend pool with a list of ports, make sure the pool
|
// Call sync on a backend pool with a list of ports, make sure the pool
|
||||||
// creates/deletes required ports.
|
// creates/deletes required ports.
|
||||||
svcNodePorts := []ServicePort{{Port: 81, Protocol: utils.HTTP}, {Port: 82, Protocol: utils.HTTPS}, {Port: 83, Protocol: utils.HTTP}}
|
svcNodePorts := []ServicePort{{Port: 81, Protocol: utils.ProtocolHTTP}, {Port: 82, Protocol: utils.ProtocolHTTPS}, {Port: 83, Protocol: utils.ProtocolHTTP}}
|
||||||
f := NewFakeBackendServices(noOpErrFunc)
|
f := NewFakeBackendServices(noOpErrFunc)
|
||||||
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
|
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
|
||||||
pool := newBackendPool(f, fakeIGs, true)
|
pool := newBackendPool(f, fakeIGs, true)
|
||||||
|
@ -292,7 +297,7 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) {
|
||||||
hcp := healthchecks.NewFakeHealthCheckProvider()
|
hcp := healthchecks.NewFakeHealthCheckProvider()
|
||||||
healthChecks := healthchecks.NewHealthChecker(hcp, "/", namer)
|
healthChecks := healthchecks.NewHealthChecker(hcp, "/", namer)
|
||||||
bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, false)
|
bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, false)
|
||||||
probes := map[int64]*api_v1.Probe{}
|
probes := map[ServicePort]*api_v1.Probe{}
|
||||||
bp.Init(NewFakeProbeProvider(probes))
|
bp.Init(NewFakeProbeProvider(probes))
|
||||||
|
|
||||||
// Create a legacy HTTP health check
|
// Create a legacy HTTP health check
|
||||||
|
@ -317,7 +322,7 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Have pool sync the above backend service
|
// Have pool sync the above backend service
|
||||||
bp.Add(ServicePort{Port: 80, Protocol: utils.HTTPS})
|
bp.Add(ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS})
|
||||||
|
|
||||||
// Verify the legacy health check has been deleted
|
// Verify the legacy health check has been deleted
|
||||||
_, err = hcp.GetHttpHealthCheck(beName)
|
_, err = hcp.GetHttpHealthCheck(beName)
|
||||||
|
@ -332,8 +337,8 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the newer health check is of type HTTPS
|
// Verify the newer health check is of type HTTPS
|
||||||
if hcNew.Type != string(utils.HTTPS) {
|
if hcNew.Type != string(utils.ProtocolHTTPS) {
|
||||||
t.Fatalf("expected health check type to be %v, actual %v", string(utils.HTTPS), hcNew.Type)
|
t.Fatalf("expected health check type to be %v, actual %v", string(utils.ProtocolHTTPS), hcNew.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -435,7 +440,7 @@ func TestBackendCreateBalancingMode(t *testing.T) {
|
||||||
|
|
||||||
func TestApplyProbeSettingsToHC(t *testing.T) {
|
func TestApplyProbeSettingsToHC(t *testing.T) {
|
||||||
p := "healthz"
|
p := "healthz"
|
||||||
hc := healthchecks.DefaultHealthCheck(8080, utils.HTTPS)
|
hc := healthchecks.DefaultHealthCheck(8080, utils.ProtocolHTTPS)
|
||||||
probe := &api_v1.Probe{
|
probe := &api_v1.Probe{
|
||||||
Handler: api_v1.Handler{
|
Handler: api_v1.Handler{
|
||||||
HTTPGet: &api_v1.HTTPGetAction{
|
HTTPGet: &api_v1.HTTPGetAction{
|
||||||
|
@ -451,7 +456,7 @@ func TestApplyProbeSettingsToHC(t *testing.T) {
|
||||||
|
|
||||||
applyProbeSettingsToHC(probe, hc)
|
applyProbeSettingsToHC(probe, hc)
|
||||||
|
|
||||||
if hc.Protocol() != utils.HTTPS || hc.Port != 8080 {
|
if hc.Protocol() != utils.ProtocolHTTPS || hc.Port != 8080 {
|
||||||
t.Errorf("Basic HC settings changed")
|
t.Errorf("Basic HC settings changed")
|
||||||
}
|
}
|
||||||
if hc.RequestPath != "/"+p {
|
if hc.RequestPath != "/"+p {
|
||||||
|
|
|
@ -122,17 +122,17 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput
|
||||||
|
|
||||||
// FakeProbeProvider implements the probeProvider interface for tests.
|
// FakeProbeProvider implements the probeProvider interface for tests.
|
||||||
type FakeProbeProvider struct {
|
type FakeProbeProvider struct {
|
||||||
probes map[int64]*api_v1.Probe
|
probes map[ServicePort]*api_v1.Probe
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFakeProbeProvider returns a struct which satifies probeProvider interface
|
// NewFakeProbeProvider returns a struct which satifies probeProvider interface
|
||||||
func NewFakeProbeProvider(probes map[int64]*api_v1.Probe) *FakeProbeProvider {
|
func NewFakeProbeProvider(probes map[ServicePort]*api_v1.Probe) *FakeProbeProvider {
|
||||||
return &FakeProbeProvider{probes}
|
return &FakeProbeProvider{probes}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetProbe returns the probe for a given nodePort
|
// GetProbe returns the probe for a given nodePort
|
||||||
func (pp *FakeProbeProvider) GetProbe(port int64) (*api_v1.Probe, error) {
|
func (pp *FakeProbeProvider) GetProbe(port ServicePort) (*api_v1.Probe, error) {
|
||||||
if probe, exists := pp.probes[port]; exists {
|
if probe, exists := pp.probes[port]; exists && probe.HTTPGet != nil {
|
||||||
return probe, nil
|
return probe, nil
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|
|
@ -23,7 +23,7 @@ import (
|
||||||
|
|
||||||
// ProbeProvider retrieves a probe struct given a nodePort
|
// ProbeProvider retrieves a probe struct given a nodePort
|
||||||
type probeProvider interface {
|
type probeProvider interface {
|
||||||
GetProbe(nodePort int64) (*api_v1.Probe, error)
|
GetProbe(sp ServicePort) (*api_v1.Probe, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BackendPool is an interface to manage a pool of kubernetes nodePort services
|
// BackendPool is an interface to manage a pool of kubernetes nodePort services
|
||||||
|
|
|
@ -30,7 +30,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
testDefaultBeNodePort = backends.ServicePort{Port: 3000, Protocol: utils.HTTP}
|
testDefaultBeNodePort = backends.ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP}
|
||||||
testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}
|
testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,8 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
api_v1 "k8s.io/client-go/pkg/api/v1"
|
api_v1 "k8s.io/client-go/pkg/api/v1"
|
||||||
|
"k8s.io/ingress/controllers/gce/backends"
|
||||||
|
"k8s.io/ingress/controllers/gce/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Pods created in loops start from this time, for routines that
|
// Pods created in loops start from this time, for routines that
|
||||||
|
@ -94,9 +96,10 @@ func TestInstancesAddedToZones(t *testing.T) {
|
||||||
func TestProbeGetter(t *testing.T) {
|
func TestProbeGetter(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm)
|
lbc := newLoadBalancerController(t, cm)
|
||||||
nodePortToHealthCheck := map[int64]string{
|
|
||||||
3001: "/healthz",
|
nodePortToHealthCheck := map[backends.ServicePort]string{
|
||||||
3002: "/foo",
|
{Port: 3001, Protocol: utils.ProtocolHTTP}: "/healthz",
|
||||||
|
{Port: 3002, Protocol: utils.ProtocolHTTPS}: "/foo",
|
||||||
}
|
}
|
||||||
addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault)
|
addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault)
|
||||||
for p, exp := range nodePortToHealthCheck {
|
for p, exp := range nodePortToHealthCheck {
|
||||||
|
@ -112,8 +115,8 @@ func TestProbeGetter(t *testing.T) {
|
||||||
func TestProbeGetterNamedPort(t *testing.T) {
|
func TestProbeGetterNamedPort(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm)
|
lbc := newLoadBalancerController(t, cm)
|
||||||
nodePortToHealthCheck := map[int64]string{
|
nodePortToHealthCheck := map[backends.ServicePort]string{
|
||||||
3001: "/healthz",
|
{Port: 3001, Protocol: utils.ProtocolHTTP}: "/healthz",
|
||||||
}
|
}
|
||||||
addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault)
|
addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault)
|
||||||
for _, p := range lbc.podLister.Indexer.List() {
|
for _, p := range lbc.podLister.Indexer.List() {
|
||||||
|
@ -167,8 +170,8 @@ func TestProbeGetterCrossNamespace(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
lbc.podLister.Indexer.Add(firstPod)
|
lbc.podLister.Indexer.Add(firstPod)
|
||||||
nodePortToHealthCheck := map[int64]string{
|
nodePortToHealthCheck := map[backends.ServicePort]string{
|
||||||
3001: "/healthz",
|
{Port: 3001, Protocol: utils.ProtocolHTTP}: "/healthz",
|
||||||
}
|
}
|
||||||
addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault)
|
addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault)
|
||||||
|
|
||||||
|
@ -182,16 +185,16 @@ func TestProbeGetterCrossNamespace(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string, ns string) {
|
func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[backends.ServicePort]string, ns string) {
|
||||||
delay := time.Minute
|
delay := time.Minute
|
||||||
for np, u := range nodePortToHealthCheck {
|
for np, u := range nodePortToHealthCheck {
|
||||||
l := map[string]string{fmt.Sprintf("app-%d", np): "test"}
|
l := map[string]string{fmt.Sprintf("app-%d", np.Port): "test"}
|
||||||
svc := &api_v1.Service{
|
svc := &api_v1.Service{
|
||||||
Spec: api_v1.ServiceSpec{
|
Spec: api_v1.ServiceSpec{
|
||||||
Selector: l,
|
Selector: l,
|
||||||
Ports: []api_v1.ServicePort{
|
Ports: []api_v1.ServicePort{
|
||||||
{
|
{
|
||||||
NodePort: int32(np),
|
NodePort: int32(np.Port),
|
||||||
TargetPort: intstr.IntOrString{
|
TargetPort: intstr.IntOrString{
|
||||||
Type: intstr.Int,
|
Type: intstr.Int,
|
||||||
IntVal: 80,
|
IntVal: 80,
|
||||||
|
@ -200,14 +203,14 @@ func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
svc.Name = fmt.Sprintf("%d", np)
|
svc.Name = fmt.Sprintf("%d", np.Port)
|
||||||
svc.Namespace = ns
|
svc.Namespace = ns
|
||||||
lbc.svcLister.Indexer.Add(svc)
|
lbc.svcLister.Indexer.Add(svc)
|
||||||
|
|
||||||
pod := &api_v1.Pod{
|
pod := &api_v1.Pod{
|
||||||
ObjectMeta: meta_v1.ObjectMeta{
|
ObjectMeta: meta_v1.ObjectMeta{
|
||||||
Labels: l,
|
Labels: l,
|
||||||
Name: fmt.Sprintf("%d", np),
|
Name: fmt.Sprintf("%d", np.Port),
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
CreationTimestamp: meta_v1.NewTime(firstPodCreationTime.Add(delay)),
|
CreationTimestamp: meta_v1.NewTime(firstPodCreationTime.Add(delay)),
|
||||||
},
|
},
|
||||||
|
@ -218,7 +221,7 @@ func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[int64]string
|
||||||
ReadinessProbe: &api_v1.Probe{
|
ReadinessProbe: &api_v1.Probe{
|
||||||
Handler: api_v1.Handler{
|
Handler: api_v1.Handler{
|
||||||
HTTPGet: &api_v1.HTTPGetAction{
|
HTTPGet: &api_v1.HTTPGetAction{
|
||||||
Scheme: api_v1.URISchemeHTTP,
|
Scheme: api_v1.URIScheme(string(np.Protocol)),
|
||||||
Path: u,
|
Path: u,
|
||||||
Port: intstr.IntOrString{
|
Port: intstr.IntOrString{
|
||||||
Type: intstr.Int,
|
Type: intstr.Int,
|
||||||
|
|
|
@ -139,7 +139,7 @@ func (svc svcAnnotations) ApplicationProtocols() (map[string]utils.AppProtocol,
|
||||||
// Verify protocol is an accepted value
|
// Verify protocol is an accepted value
|
||||||
for _, proto := range portToProtos {
|
for _, proto := range portToProtos {
|
||||||
switch proto {
|
switch proto {
|
||||||
case utils.HTTP, utils.HTTPS:
|
case utils.ProtocolHTTP, utils.ProtocolHTTPS:
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unexpected port application protocol: %v", proto)
|
return nil, fmt.Errorf("unexpected port application protocol: %v", proto)
|
||||||
}
|
}
|
||||||
|
@ -457,7 +457,7 @@ PortLoop:
|
||||||
return invalidPort, errorNodePortNotFound{be, fmt.Errorf("could not find matching nodeport from service")}
|
return invalidPort, errorNodePortNotFound{be, fmt.Errorf("could not find matching nodeport from service")}
|
||||||
}
|
}
|
||||||
|
|
||||||
proto := utils.HTTP
|
proto := utils.ProtocolHTTP
|
||||||
if protoStr, exists := appProtocols[port.Name]; exists {
|
if protoStr, exists := appProtocols[port.Name]; exists {
|
||||||
proto = utils.AppProtocol(protoStr)
|
proto = utils.AppProtocol(protoStr)
|
||||||
}
|
}
|
||||||
|
@ -536,7 +536,7 @@ func (t *GCETranslator) ListZones() ([]string, error) {
|
||||||
|
|
||||||
// geHTTPProbe returns the http readiness probe from the first container
|
// geHTTPProbe returns the http readiness probe from the first container
|
||||||
// that matches targetPort, from the set of pods matching the given labels.
|
// that matches targetPort, from the set of pods matching the given labels.
|
||||||
func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOrString) (*api_v1.Probe, error) {
|
func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOrString, protocol utils.AppProtocol) (*api_v1.Probe, error) {
|
||||||
l := svc.Spec.Selector
|
l := svc.Spec.Selector
|
||||||
|
|
||||||
// Lookup any container with a matching targetPort from the set of pods
|
// Lookup any container with a matching targetPort from the set of pods
|
||||||
|
@ -555,9 +555,10 @@ func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOr
|
||||||
}
|
}
|
||||||
logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort)
|
logStr := fmt.Sprintf("Pod %v matching service selectors %v (targetport %+v)", pod.Name, l, targetPort)
|
||||||
for _, c := range pod.Spec.Containers {
|
for _, c := range pod.Spec.Containers {
|
||||||
if !isSimpleHTTPProbe(c.ReadinessProbe) {
|
if !isSimpleHTTPProbe(c.ReadinessProbe) || string(protocol) != string(c.ReadinessProbe.HTTPGet.Scheme) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, p := range c.Ports {
|
for _, p := range c.Ports {
|
||||||
if (targetPort.Type == intstr.Int && targetPort.IntVal == p.ContainerPort) ||
|
if (targetPort.Type == intstr.Int && targetPort.IntVal == p.ContainerPort) ||
|
||||||
(targetPort.Type == intstr.String && targetPort.StrVal == p.Name) {
|
(targetPort.Type == intstr.String && targetPort.StrVal == p.Name) {
|
||||||
|
@ -593,7 +594,7 @@ func isSimpleHTTPProbe(probe *api_v1.Probe) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetProbe returns a probe that's used for the given nodeport
|
// GetProbe returns a probe that's used for the given nodeport
|
||||||
func (t *GCETranslator) GetProbe(port int64) (*api_v1.Probe, error) {
|
func (t *GCETranslator) GetProbe(port backends.ServicePort) (*api_v1.Probe, error) {
|
||||||
sl := t.svcLister.List()
|
sl := t.svcLister.List()
|
||||||
|
|
||||||
// Find the label and target port of the one service with the given nodePort
|
// Find the label and target port of the one service with the given nodePort
|
||||||
|
@ -607,7 +608,7 @@ OuterLoop:
|
||||||
svcPort = sp
|
svcPort = sp
|
||||||
// only one Service can match this nodePort, try and look up
|
// only one Service can match this nodePort, try and look up
|
||||||
// the readiness probe of the pods behind it
|
// the readiness probe of the pods behind it
|
||||||
if int32(port) == sp.NodePort {
|
if int32(port.Port) == sp.NodePort {
|
||||||
found = true
|
found = true
|
||||||
break OuterLoop
|
break OuterLoop
|
||||||
}
|
}
|
||||||
|
@ -618,7 +619,7 @@ OuterLoop:
|
||||||
return nil, fmt.Errorf("unable to find nodeport %v in any service", port)
|
return nil, fmt.Errorf("unable to find nodeport %v in any service", port)
|
||||||
}
|
}
|
||||||
|
|
||||||
return t.getHTTPProbe(service, svcPort.TargetPort)
|
return t.getHTTPProbe(service, svcPort.TargetPort, port.Protocol)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker.
|
// PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker.
|
||||||
|
|
|
@ -27,6 +27,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// These values set a low health threshold and a high failure threshold.
|
||||||
|
// We're just trying to detect if the node networking is
|
||||||
|
// borked, service level outages will get detected sooner
|
||||||
|
// by kube-proxy.
|
||||||
// DefaultHealthCheckInterval defines how frequently a probe runs
|
// DefaultHealthCheckInterval defines how frequently a probe runs
|
||||||
DefaultHealthCheckInterval = 60
|
DefaultHealthCheckInterval = 60
|
||||||
// DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy"
|
// DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy"
|
||||||
|
@ -173,9 +177,9 @@ func NewHealthCheck(hc *compute.HealthCheck) *HealthCheck {
|
||||||
|
|
||||||
v := &HealthCheck{HealthCheck: *hc}
|
v := &HealthCheck{HealthCheck: *hc}
|
||||||
switch utils.AppProtocol(hc.Type) {
|
switch utils.AppProtocol(hc.Type) {
|
||||||
case utils.HTTP:
|
case utils.ProtocolHTTP:
|
||||||
v.HTTPHealthCheck = *hc.HttpHealthCheck
|
v.HTTPHealthCheck = *hc.HttpHealthCheck
|
||||||
case utils.HTTPS:
|
case utils.ProtocolHTTPS:
|
||||||
// HTTPHealthCheck and HTTPSHealthChecks have identical fields
|
// HTTPHealthCheck and HTTPSHealthChecks have identical fields
|
||||||
v.HTTPHealthCheck = compute.HTTPHealthCheck(*hc.HttpsHealthCheck)
|
v.HTTPHealthCheck = compute.HTTPHealthCheck(*hc.HttpsHealthCheck)
|
||||||
}
|
}
|
||||||
|
@ -201,9 +205,9 @@ func (hc *HealthCheck) Out() *compute.HealthCheck {
|
||||||
hc.HealthCheck.HttpHealthCheck = nil
|
hc.HealthCheck.HttpHealthCheck = nil
|
||||||
|
|
||||||
switch hc.Protocol() {
|
switch hc.Protocol() {
|
||||||
case utils.HTTP:
|
case utils.ProtocolHTTP:
|
||||||
hc.HealthCheck.HttpHealthCheck = &hc.HTTPHealthCheck
|
hc.HealthCheck.HttpHealthCheck = &hc.HTTPHealthCheck
|
||||||
case utils.HTTPS:
|
case utils.ProtocolHTTPS:
|
||||||
https := compute.HTTPSHealthCheck(hc.HTTPHealthCheck)
|
https := compute.HTTPSHealthCheck(hc.HTTPHealthCheck)
|
||||||
hc.HealthCheck.HttpsHealthCheck = &https
|
hc.HealthCheck.HttpsHealthCheck = &https
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ func TestHealthCheckAdd(t *testing.T) {
|
||||||
hcp := NewFakeHealthCheckProvider()
|
hcp := NewFakeHealthCheckProvider()
|
||||||
healthChecks := NewHealthChecker(hcp, "/", namer)
|
healthChecks := NewHealthChecker(hcp, "/", namer)
|
||||||
|
|
||||||
hc := healthChecks.New(80, utils.HTTP)
|
hc := healthChecks.New(80, utils.ProtocolHTTP)
|
||||||
_, err := healthChecks.Sync(hc)
|
_, err := healthChecks.Sync(hc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
@ -41,7 +41,7 @@ func TestHealthCheckAdd(t *testing.T) {
|
||||||
t.Fatalf("expected the health check to exist, err: %v", err)
|
t.Fatalf("expected the health check to exist, err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hc = healthChecks.New(443, utils.HTTPS)
|
hc = healthChecks.New(443, utils.ProtocolHTTPS)
|
||||||
_, err = healthChecks.Sync(hc)
|
_, err = healthChecks.Sync(hc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
@ -60,13 +60,13 @@ func TestHealthCheckAddExisting(t *testing.T) {
|
||||||
|
|
||||||
// HTTP
|
// HTTP
|
||||||
// Manually insert a health check
|
// Manually insert a health check
|
||||||
httpHC := DefaultHealthCheck(3000, utils.HTTP)
|
httpHC := DefaultHealthCheck(3000, utils.ProtocolHTTP)
|
||||||
httpHC.Name = namer.BeName(3000)
|
httpHC.Name = namer.BeName(3000)
|
||||||
httpHC.RequestPath = "/my-probes-health"
|
httpHC.RequestPath = "/my-probes-health"
|
||||||
hcp.CreateHealthCheck(httpHC.Out())
|
hcp.CreateHealthCheck(httpHC.Out())
|
||||||
|
|
||||||
// Should not fail adding the same type of health check
|
// Should not fail adding the same type of health check
|
||||||
hc := healthChecks.New(3000, utils.HTTP)
|
hc := healthChecks.New(3000, utils.ProtocolHTTP)
|
||||||
_, err := healthChecks.Sync(hc)
|
_, err := healthChecks.Sync(hc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
@ -79,12 +79,12 @@ func TestHealthCheckAddExisting(t *testing.T) {
|
||||||
|
|
||||||
// HTTPS
|
// HTTPS
|
||||||
// Manually insert a health check
|
// Manually insert a health check
|
||||||
httpsHC := DefaultHealthCheck(4000, utils.HTTPS)
|
httpsHC := DefaultHealthCheck(4000, utils.ProtocolHTTPS)
|
||||||
httpsHC.Name = namer.BeName(4000)
|
httpsHC.Name = namer.BeName(4000)
|
||||||
httpsHC.RequestPath = "/my-probes-health"
|
httpsHC.RequestPath = "/my-probes-health"
|
||||||
hcp.CreateHealthCheck(httpsHC.Out())
|
hcp.CreateHealthCheck(httpsHC.Out())
|
||||||
|
|
||||||
hc = healthChecks.New(4000, utils.HTTPS)
|
hc = healthChecks.New(4000, utils.ProtocolHTTPS)
|
||||||
_, err = healthChecks.Sync(hc)
|
_, err = healthChecks.Sync(hc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
@ -102,12 +102,12 @@ func TestHealthCheckDelete(t *testing.T) {
|
||||||
healthChecks := NewHealthChecker(hcp, "/", namer)
|
healthChecks := NewHealthChecker(hcp, "/", namer)
|
||||||
|
|
||||||
// Create HTTP HC for 1234
|
// Create HTTP HC for 1234
|
||||||
hc := DefaultHealthCheck(1234, utils.HTTP)
|
hc := DefaultHealthCheck(1234, utils.ProtocolHTTP)
|
||||||
hc.Name = namer.BeName(1234)
|
hc.Name = namer.BeName(1234)
|
||||||
hcp.CreateHealthCheck(hc.Out())
|
hcp.CreateHealthCheck(hc.Out())
|
||||||
|
|
||||||
// Create HTTPS HC for 1234)
|
// Create HTTPS HC for 1234)
|
||||||
hc.Type = string(utils.HTTPS)
|
hc.Type = string(utils.ProtocolHTTPS)
|
||||||
hcp.CreateHealthCheck(hc.Out())
|
hcp.CreateHealthCheck(hc.Out())
|
||||||
|
|
||||||
// Delete only HTTP 1234
|
// Delete only HTTP 1234
|
||||||
|
@ -136,7 +136,7 @@ func TestHealthCheckUpdate(t *testing.T) {
|
||||||
|
|
||||||
// HTTP
|
// HTTP
|
||||||
// Manually insert a health check
|
// Manually insert a health check
|
||||||
hc := DefaultHealthCheck(3000, utils.HTTP)
|
hc := DefaultHealthCheck(3000, utils.ProtocolHTTP)
|
||||||
hc.Name = namer.BeName(3000)
|
hc.Name = namer.BeName(3000)
|
||||||
hc.RequestPath = "/my-probes-health"
|
hc.RequestPath = "/my-probes-health"
|
||||||
hcp.CreateHealthCheck(hc.Out())
|
hcp.CreateHealthCheck(hc.Out())
|
||||||
|
@ -148,7 +148,7 @@ func TestHealthCheckUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Change to HTTPS
|
// Change to HTTPS
|
||||||
hc.Type = string(utils.HTTPS)
|
hc.Type = string(utils.ProtocolHTTPS)
|
||||||
_, err = healthChecks.Sync(hc)
|
_, err = healthChecks.Sync(hc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err while syncing healthcheck, err %v", err)
|
t.Fatalf("unexpected err while syncing healthcheck, err %v", err)
|
||||||
|
@ -161,7 +161,7 @@ func TestHealthCheckUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the check is now HTTPS
|
// Verify the check is now HTTPS
|
||||||
if hc.Protocol() != utils.HTTPS {
|
if hc.Protocol() != utils.ProtocolHTTPS {
|
||||||
t.Fatalf("expected check to be of type HTTPS")
|
t.Fatalf("expected check to be of type HTTPS")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
testDefaultBeNodePort = backends.ServicePort{Port: 3000, Protocol: utils.HTTP}
|
testDefaultBeNodePort = backends.ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP}
|
||||||
)
|
)
|
||||||
|
|
||||||
func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool {
|
func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool {
|
||||||
|
|
|
@ -233,7 +233,7 @@ func main() {
|
||||||
*defaultSvc, err)
|
*defaultSvc, err)
|
||||||
}
|
}
|
||||||
// The default backend is known to be HTTP
|
// The default backend is known to be HTTP
|
||||||
defaultBackendNodePort := backends.ServicePort{Port: nodePort, Protocol: utils.HTTP}
|
defaultBackendNodePort := backends.ServicePort{Port: nodePort, Protocol: utils.ProtocolHTTP}
|
||||||
|
|
||||||
if *inCluster || *useRealCloud {
|
if *inCluster || *useRealCloud {
|
||||||
// Create cluster manager
|
// Create cluster manager
|
||||||
|
|
|
@ -80,10 +80,10 @@ const (
|
||||||
// debug information in the Ingress annotations.
|
// debug information in the Ingress annotations.
|
||||||
K8sAnnotationPrefix = "ingress.kubernetes.io"
|
K8sAnnotationPrefix = "ingress.kubernetes.io"
|
||||||
|
|
||||||
// HTTP protocol for a service
|
// ProtocolHTTP protocol for a service
|
||||||
HTTP AppProtocol = "HTTP"
|
ProtocolHTTP AppProtocol = "HTTP"
|
||||||
// HTTPS protocol for a service
|
// ProtocolHTTPS protocol for a service
|
||||||
HTTPS AppProtocol = "HTTPS"
|
ProtocolHTTPS AppProtocol = "HTTPS"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AppProtocol string
|
type AppProtocol string
|
||||||
|
|
Loading…
Reference in a new issue