From 6c4fe22df468af5606a2d516f9cbad48051255a5 Mon Sep 17 00:00:00 2001 From: Nick Sardo Date: Mon, 20 Mar 2017 18:16:25 -0700 Subject: [PATCH] Support backside re-encryption --- controllers/gce/backends/backends.go | 180 ++++++++---- controllers/gce/backends/backends_test.go | 274 +++++++++++++++--- controllers/gce/backends/fakes.go | 19 ++ controllers/gce/backends/interfaces.go | 13 +- controllers/gce/controller/cluster_manager.go | 38 +-- controllers/gce/controller/fakes.go | 12 +- controllers/gce/controller/util_test.go | 34 ++- controllers/gce/controller/utils.go | 178 ++++++------ controllers/gce/healthchecks/fakes.go | 124 ++++---- controllers/gce/healthchecks/healthchecks.go | 194 ++++++++++--- .../gce/healthchecks/healthchecks_test.go | 185 ++++++++++-- controllers/gce/healthchecks/interfaces.go | 25 +- .../gce/loadbalancers/loadbalancers.go | 8 +- .../gce/loadbalancers/loadbalancers_test.go | 12 +- controllers/gce/main.go | 5 +- controllers/gce/utils/utils.go | 33 +-- 16 files changed, 940 insertions(+), 394 deletions(-) diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index 115ad52bc..a600364d2 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -27,6 +27,7 @@ import ( compute "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/util/sets" + api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/ingress/controllers/gce/healthchecks" "k8s.io/ingress/controllers/gce/instances" @@ -75,6 +76,7 @@ type Backends struct { nodePool instances.NodePool healthChecker healthchecks.HealthChecker snapshotter storage.Snapshotter + prober probeProvider // ignoredPorts are a set of ports excluded from GC, even // after the Ingress has been deleted. Note that invoking // a Delete() on these ports will still delete the backend. @@ -86,6 +88,12 @@ func portKey(port int64) string { return fmt.Sprintf("%d", port) } +// ServicePort for tupling port and protocol +type ServicePort struct { + Port int64 + Protocol utils.AppProtocol +} + // NewBackendPool returns a new backend pool. // - cloud: implements BackendServices and syncs backends with a cloud provider // - healthChecker: is capable of producing health checks for backends. @@ -134,6 +142,11 @@ func NewBackendPool( return backendPool } +// Init sets the probeProvider interface value +func (b *Backends) Init(pp probeProvider) { + b.prober = pp +} + // Get returns a single backend. func (b *Backends) Get(port int64) (*compute.BackendService, error) { be, err := b.cloud.GetBackendService(b.namer.BeName(port)) @@ -144,16 +157,24 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) { return be, nil } -func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { - // Create a new health check - if err := b.healthChecker.Add(namedPort.Port); err != nil { - return nil, err +func (b *Backends) ensureHealthCheck(port int64, protocol utils.AppProtocol) (string, error) { + hc := b.healthChecker.New(port, protocol) + if b.prober != nil { + probe, err := b.prober.GetProbe(port) + if err != nil { + return "", err + } + if probe != nil { + glog.Infof("Applying httpGet settings of readinessProbe to health check on port %v", port) + applyProbeSettingsToHC(probe, hc) + } } - hc, err := b.healthChecker.Get(namedPort.Port) - if err != nil { - return nil, err - } - errs := []string{} + + return b.healthChecker.Sync(hc) +} + +func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, hcLink string, protocol utils.AppProtocol, name string) (*compute.BackendService, error) { + var errs []string // We first try to create the backend with balancingMode=RATE. If this // fails, it's mostly likely because there are existing backends with // balancingMode=UTILIZATION. This failure mode throws a googleapi error @@ -161,27 +182,9 @@ func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.Named // and come around to retry with the right balancing mode. The goal is to // switch everyone to using RATE. for _, bm := range []BalancingMode{Rate, Utilization} { - backends := getBackendsForIGs(igs) - for _, b := range backends { - switch bm { - case Rate: - b.MaxRate = maxRPS - default: - // TODO: Set utilization and connection limits when we accept them - // as valid fields. - } - b.BalancingMode = string(bm) - } // Create a new backend - backend := &compute.BackendService{ - Name: name, - Protocol: "HTTP", - Backends: backends, - HealthChecks: []string{hc.SelfLink}, - Port: namedPort.Port, - PortName: namedPort.Name, - } - if err := b.cloud.CreateBackendService(backend); err != nil { + bs := newBackendService(igs, bm, namedPort, []string{hcLink}, protocol, name) + if err := b.cloud.CreateBackendService(bs); err != nil { // This is probably a failure because we tried to create the backend // with balancingMode=RATE when there are already backends with // balancingMode=UTILIZATION. Just ignore it and retry setting @@ -198,31 +201,88 @@ func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.Named return nil, fmt.Errorf("%v", strings.Join(errs, "\n")) } +func newBackendService(igs []*compute.InstanceGroup, bm BalancingMode, namedPort *compute.NamedPort, healthCheckLinks []string, protocol utils.AppProtocol, name string) *compute.BackendService { + backends := getBackendsForIGs(igs) + for _, b := range backends { + switch bm { + case Rate: + b.MaxRatePerInstance = maxRPS + default: + // TODO: Set utilization and connection limits when we accept them + // as valid fields. + } + b.BalancingMode = string(bm) + } + + return &compute.BackendService{ + Name: name, + Protocol: string(protocol), + Backends: backends, + HealthChecks: healthCheckLinks, + Port: namedPort.Port, + PortName: namedPort.Name, + } +} + +func (b *Backends) updateProtocol(bs *compute.BackendService, hcLink string, protocol utils.AppProtocol) (*compute.BackendService, error) { + + return bs, nil +} + // Add will get or create a Backend for the given port. -func (b *Backends) Add(port int64) error { +func (b *Backends) Add(p ServicePort) error { // We must track the port even if creating the backend failed, because // we might've created a health-check for it. be := &compute.BackendService{} - defer func() { b.snapshotter.Add(portKey(port), be) }() + defer func() { b.snapshotter.Add(portKey(p.Port), be) }() - igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port) + igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port) if err != nil { return err } - be, _ = b.Get(port) + + // Ensure health check for backend service exists + hcLink, err := b.ensureHealthCheck(p.Port, p.Protocol) + if err != nil { + return err + } + + pName := b.namer.BeName(p.Port) + be, _ = b.Get(p.Port) if be == nil { - glog.Infof("Creating backend for %d instance groups, port %v named port %v", - len(igs), port, namedPort) - be, err = b.create(igs, namedPort, b.namer.BeName(port)) + glog.Infof("Creating backend for %d instance groups, port %v named port %v", len(igs), p.Port, namedPort) + be, err = b.create(igs, namedPort, hcLink, p.Protocol, pName) if err != nil { return err } } + + existingHCLink := "" + if len(be.HealthChecks) == 1 { + existingHCLink = be.HealthChecks[0] + } + + if be.Protocol != string(p.Protocol) || existingHCLink != hcLink { + glog.Infof("Updating backend protocol %v (%v) for change in protocol (%v) or health check", pName, be.Protocol, string(p.Protocol)) + be.Protocol = string(p.Protocol) + be.HealthChecks = []string{hcLink} + if err = b.cloud.UpdateBackendService(be); err != nil { + return err + } + } + + // If previous health check was legacy type, we need to delete it. + if existingHCLink != hcLink && strings.Contains(existingHCLink, "/httpHealthChecks/") { + if err = b.healthChecker.DeleteLegacy(p.Port); err != nil { + return err + } + } + // we won't find any igs till the node pool syncs nodes. if len(igs) == 0 { return nil } - if err := b.edgeHop(be, igs); err != nil { + if err = b.edgeHop(be, igs); err != nil { return err } return err @@ -231,7 +291,7 @@ func (b *Backends) Add(port int64) error { // Delete deletes the Backend for the given port. func (b *Backends) Delete(port int64) (err error) { name := b.namer.BeName(port) - glog.Infof("Deleting backend %v", name) + glog.Infof("Deleting backend service %v", name) defer func() { if utils.IsHTTPErrorCode(err, http.StatusNotFound) { err = nil @@ -241,15 +301,11 @@ func (b *Backends) Delete(port int64) (err error) { } }() // Try deleting health checks even if a backend is not found. - if err = b.cloud.DeleteBackendService(name); err != nil && - !utils.IsHTTPErrorCode(err, http.StatusNotFound) { + if err = b.cloud.DeleteBackendService(name); err != nil && !utils.IsHTTPErrorCode(err, http.StatusNotFound) { return err } - if err = b.healthChecker.Delete(port); err != nil && - !utils.IsHTTPErrorCode(err, http.StatusNotFound) { - return err - } - return nil + + return b.healthChecker.Delete(port) } // List lists all backends. @@ -306,7 +362,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr } // Sync syncs backend services corresponding to ports in the given list. -func (b *Backends) Sync(svcNodePorts []int64) error { +func (b *Backends) Sync(svcNodePorts []ServicePort) error { glog.V(3).Infof("Sync: backends %v", svcNodePorts) // create backends for new ports, perform an edge hop for existing ports @@ -319,14 +375,14 @@ func (b *Backends) Sync(svcNodePorts []int64) error { } // GC garbage collects services corresponding to ports in the given list. -func (b *Backends) GC(svcNodePorts []int64) error { +func (b *Backends) GC(svcNodePorts []ServicePort) error { knownPorts := sets.NewString() - for _, port := range svcNodePorts { - knownPorts.Insert(portKey(port)) + for _, p := range svcNodePorts { + knownPorts.Insert(portKey(p.Port)) } pool := b.snapshotter.Snapshot() for port := range pool { - p, err := strconv.Atoi(port) + p, err := strconv.ParseInt(port, 10, 64) if err != nil { return err } @@ -345,7 +401,7 @@ func (b *Backends) GC(svcNodePorts []int64) error { // Shutdown deletes all backends and the default backend. // This will fail if one of the backends is being used by another resource. func (b *Backends) Shutdown() error { - if err := b.GC([]int64{}); err != nil { + if err := b.GC([]ServicePort{}); err != nil { return err } return nil @@ -365,3 +421,25 @@ func (b *Backends) Status(name string) string { // TODO: State transition are important, not just the latest. return hs.HealthStatus[0].HealthState } + +func applyProbeSettingsToHC(p *api_v1.Probe, hc *healthchecks.HealthCheck) { + healthPath := p.Handler.HTTPGet.Path + // GCE requires a leading "/" for health check urls. + if !strings.HasPrefix(healthPath, "/") { + healthPath = fmt.Sprintf("/%v", healthPath) + } + + host := p.Handler.HTTPGet.Host + // remember the ingresses that use this Service so we can send + // the right events + + hc.RequestPath = healthPath + hc.Host = host + hc.Description = "Kubernetes L7 health check generated with readiness probe settings." + // set a low health threshold and a high failure threshold. + // We're just trying to detect if the node networking is + // borked, service level outages will get detected sooner + // by kube-proxy. + hc.CheckIntervalSec = int64(p.PeriodSeconds + healthchecks.DefaultHealthCheckInterval) + hc.TimeoutSec = int64(p.TimeoutSeconds) +} diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index 5766f6919..6322511c2 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -17,12 +17,15 @@ limitations under the License. package backends import ( + "fmt" "net/http" "testing" compute "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/ingress/controllers/gce/healthchecks" "k8s.io/ingress/controllers/gce/instances" @@ -34,14 +37,28 @@ const defaultZone = "zone-a" var noOpErrFunc = func(op int, be *compute.BackendService) error { return nil } -func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool { +var existingProbe = &api_v1.Probe{ + Handler: api_v1.Handler{ + HTTPGet: &api_v1.HTTPGetAction{ + Scheme: api_v1.URISchemeHTTP, + Path: "/my-special-path", + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, +} + +func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) *Backends { namer := &utils.Namer{} nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) - healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer) - healthChecks.Init(&healthchecks.FakeHealthCheckGetter{DefaultHealthCheck: nil}) - return NewBackendPool( - f, healthChecks, nodePool, namer, []int64{}, syncWithCloud) + healthChecks := healthchecks.NewHealthChecker(healthchecks.NewFakeHealthCheckProvider(), "/", namer) + bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, syncWithCloud) + probes := map[int64]*api_v1.Probe{80: existingProbe} + bp.Init(NewFakeProbeProvider(probes)) + return bp } func TestBackendPoolAdd(t *testing.T) { @@ -50,39 +67,120 @@ func TestBackendPoolAdd(t *testing.T) { pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} - // Add a backend for a port, then re-add the same port and - // make sure it corrects a broken link from the backend to - // the instance group. - nodePort := int64(8080) - pool.Add(nodePort) - beName := namer.BeName(nodePort) + testCases := []ServicePort{ + {80, utils.HTTP}, + {443, utils.HTTPS}, + } + + for _, nodePort := range testCases { + t.Run(fmt.Sprintf("Port:%v Protocol:%v", nodePort.Port, nodePort.Protocol), func(t *testing.T) { + // Add a backend for a port, then re-add the same port and + // make sure it corrects a broken link from the backend to + // the instance group. + err := pool.Add(nodePort) + if err != nil { + t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", nodePort, err) + } + beName := namer.BeName(nodePort.Port) + + // Check that the new backend has the right port + be, err := f.GetBackendService(beName) + if err != nil { + t.Fatalf("Did not find expected backend %v", beName) + } + if be.Port != nodePort.Port { + t.Fatalf("Backend %v has wrong port %v, expected %v", be.Name, be.Port, nodePort) + } + + // Check that the instance group has the new port + var found bool + for _, port := range fakeIGs.Ports { + if port == nodePort.Port { + found = true + } + } + if !found { + t.Fatalf("Port %v not added to instance group", nodePort) + } + + // Check the created healthcheck is the correct protocol + // pool.healthChecker. + hc, err := pool.healthChecker.Get(nodePort.Port) + if err != nil { + t.Fatalf("Unexpected err when querying fake healthchecker: %v", err) + } + + if hc.Protocol() != nodePort.Protocol { + t.Fatalf("Healthcheck scheme does not match nodeport scheme: hc:%v np:%v", hc.Protocol(), nodePort.Protocol) + } + }) + } +} + +func TestBackendPoolUpdate(t *testing.T) { + f := NewFakeBackendServices(noOpErrFunc) + fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) + pool := newBackendPool(f, fakeIGs, false) + namer := utils.Namer{} + + p := ServicePort{Port: 3000, Protocol: utils.HTTP} + pool.Add(p) + beName := namer.BeName(p.Port) - // Check that the new backend has the right port be, err := f.GetBackendService(beName) if err != nil { - t.Fatalf("Did not find expected backend %v", beName) + t.Fatalf("Unexpected err: %v", err) } - if be.Port != nodePort { - t.Fatalf("Backend %v has wrong port %v, expected %v", be.Name, be.Port, nodePort) + + if utils.AppProtocol(be.Protocol) != p.Protocol { + t.Fatalf("Expected scheme %v but got %v", p.Protocol, be.Protocol) } - // Check that the instance group has the new port - var found bool - for _, port := range fakeIGs.Ports { - if port == nodePort { - found = true - } + + // Assert the proper health check was created + hc, _ := pool.healthChecker.Get(p.Port) + if hc == nil || hc.Protocol() != p.Protocol { + t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc) } - if !found { - t.Fatalf("Port %v not added to instance group", nodePort) + + // Update service port to encrypted + p.Protocol = utils.HTTPS + pool.Sync([]ServicePort{p}) + + be, err = f.GetBackendService(beName) + if err != nil { + t.Fatalf("Unexpected err retrieving backend service after update: %v", err) } + // Assert the backend has the correct protocol + if utils.AppProtocol(be.Protocol) != p.Protocol { + t.Fatalf("Expected scheme %v but got %v", p.Protocol, utils.AppProtocol(be.Protocol)) + } + + // Assert the proper health check was created + hc, _ = pool.healthChecker.Get(p.Port) + if hc == nil || hc.Protocol() != p.Protocol { + t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc) + } +} + +func TestBackendPoolChaosMonkey(t *testing.T) { + f := NewFakeBackendServices(noOpErrFunc) + fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) + pool := newBackendPool(f, fakeIGs, false) + namer := utils.Namer{} + + nodePort := ServicePort{Port: 8080, Protocol: utils.HTTP} + pool.Add(nodePort) + beName := namer.BeName(nodePort.Port) + + be, _ := f.GetBackendService(beName) + // Mess up the link between backend service and instance group. // This simulates a user doing foolish things through the UI. - f.calls = []int{} - be, err = f.GetBackendService(beName) be.Backends = []*compute.Backend{ {Group: "test edge hop"}, } + f.calls = []int{} f.UpdateBackendService(be) pool.Add(nodePort) @@ -114,28 +212,35 @@ func TestBackendPoolAdd(t *testing.T) { func TestBackendPoolSync(t *testing.T) { // Call sync on a backend pool with a list of ports, make sure the pool // creates/deletes required ports. - svcNodePorts := []int64{81, 82, 83} + svcNodePorts := []ServicePort{{Port: 81, Protocol: utils.HTTP}, {Port: 82, Protocol: utils.HTTPS}, {Port: 83, Protocol: utils.HTTP}} f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool := newBackendPool(f, fakeIGs, true) - pool.Add(81) - pool.Add(90) - pool.Sync(svcNodePorts) - pool.GC(svcNodePorts) + pool.Add(ServicePort{Port: 81}) + pool.Add(ServicePort{Port: 90}) + if err := pool.Sync(svcNodePorts); err != nil { + t.Errorf("Expected backend pool to sync, err: %v", err) + } + if err := pool.GC(svcNodePorts); err != nil { + t.Errorf("Expected backend pool to GC, err: %v", err) + } if _, err := pool.Get(90); err == nil { t.Fatalf("Did not expect to find port 90") } for _, port := range svcNodePorts { - if _, err := pool.Get(port); err != nil { + if _, err := pool.Get(port.Port); err != nil { t.Fatalf("Expected to find port %v", port) } } - svcNodePorts = []int64{81} - deletedPorts := []int64{82, 83} - pool.GC(svcNodePorts) + svcNodePorts = []ServicePort{{Port: 81}} + deletedPorts := []ServicePort{{Port: 82}, {Port: 83}} + if err := pool.GC(svcNodePorts); err != nil { + t.Fatalf("Expected backend pool to GC, err: %v", err) + } + for _, port := range deletedPorts { - if _, err := pool.Get(port); err == nil { + if _, err := pool.Get(port.Port); err == nil { t.Fatalf("Pool contains %v after deletion", port) } } @@ -152,13 +257,13 @@ func TestBackendPoolSync(t *testing.T) { namer := &utils.Namer{} // This backend should get deleted again since it is managed by this cluster. - f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0])}) + f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0].Port)}) // TODO: Avoid casting. // Repopulate the pool with a cloud list, which now includes the 82 port // backend. This would happen if, say, an ingress backend is removed // while the controller is restarting. - pool.(*Backends).snapshotter.(*storage.CloudListingPool).ReplenishPool() + pool.snapshotter.(*storage.CloudListingPool).ReplenishPool() pool.GC(svcNodePorts) @@ -178,13 +283,68 @@ func TestBackendPoolSync(t *testing.T) { } } +func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) { + namer := &utils.Namer{} + f := NewFakeBackendServices(noOpErrFunc) + fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) + nodePool := instances.NewNodePool(fakeIGs) + nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) + hcp := healthchecks.NewFakeHealthCheckProvider() + healthChecks := healthchecks.NewHealthChecker(hcp, "/", namer) + bp := NewBackendPool(f, healthChecks, nodePool, namer, []int64{}, false) + probes := map[int64]*api_v1.Probe{} + bp.Init(NewFakeProbeProvider(probes)) + + // Create a legacy HTTP health check + beName := namer.BeName(80) + if err := hcp.CreateHttpHealthCheck(&compute.HttpHealthCheck{ + Name: beName, + Port: 80, + }); err != nil { + t.Fatalf("unexpected error creating http health check %v", err) + } + + // Verify health check exists + hc, err := hcp.GetHttpHealthCheck(beName) + if err != nil { + t.Fatalf("unexpected error getting http health check %v", err) + } + + // Create backend service with expected name and link to legacy health check + f.CreateBackendService(&compute.BackendService{ + Name: beName, + HealthChecks: []string{hc.SelfLink}, + }) + + // Have pool sync the above backend service + bp.Add(ServicePort{Port: 80, Protocol: utils.HTTPS}) + + // Verify the legacy health check has been deleted + _, err = hcp.GetHttpHealthCheck(beName) + if err == nil { + t.Fatalf("expected error getting http health check %v", err) + } + + // Verify a newer health check exists + hcNew, err := hcp.GetHealthCheck(beName) + if err != nil { + t.Fatalf("unexpected error getting http health check %v", err) + } + + // Verify the newer health check is of type HTTPS + if hcNew.Type != string(utils.HTTPS) { + t.Fatalf("expected health check type to be %v, actual %v", string(utils.HTTPS), hcNew.Type) + } +} + func TestBackendPoolShutdown(t *testing.T) { f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} - pool.Add(80) + // Add a backend-service and verify that it doesn't exist after Shutdown() + pool.Add(ServicePort{Port: 80}) pool.Shutdown() if _, err := f.GetBackendService(namer.BeName(80)); err == nil { t.Fatalf("%v", err) @@ -198,7 +358,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { namer := utils.Namer{} // This will add the instance group k8s-ig to the instance pool - pool.Add(80) + pool.Add(ServicePort{Port: 80}) be, err := f.GetBackendService(namer.BeName(80)) if err != nil { @@ -211,12 +371,12 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { {Group: "k8s-ig-foo"}, } be.Backends = append(be.Backends, newGroups...) - if err := f.UpdateBackendService(be); err != nil { + if err = f.UpdateBackendService(be); err != nil { t.Fatalf("Failed to update backend service %v", be.Name) } // Make sure repeated adds don't clobber the inserted instance group - pool.Add(80) + pool.Add(ServicePort{Port: 80}) be, err = f.GetBackendService(namer.BeName(80)) if err != nil { t.Fatalf("%v", err) @@ -242,7 +402,7 @@ func TestBackendCreateBalancingMode(t *testing.T) { fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} - nodePort := int64(8080) + nodePort := ServicePort{Port: 8080} modes := []BalancingMode{Rate, Utilization} // block the creation of Backends with the given balancingMode @@ -259,7 +419,7 @@ func TestBackendCreateBalancingMode(t *testing.T) { } pool.Add(nodePort) - be, err := f.GetBackendService(namer.BeName(nodePort)) + be, err := f.GetBackendService(namer.BeName(nodePort.Port)) if err != nil { t.Fatalf("%v", err) } @@ -269,6 +429,32 @@ func TestBackendCreateBalancingMode(t *testing.T) { t.Fatalf("Wrong balancing mode, expected %v got %v", modes[(i+1)%len(modes)], b.BalancingMode) } } - pool.GC([]int64{}) + pool.GC([]ServicePort{}) + } +} + +func TestApplyProbeSettingsToHC(t *testing.T) { + p := "healthz" + hc := healthchecks.DefaultHealthCheck(8080, utils.HTTPS) + probe := &api_v1.Probe{ + Handler: api_v1.Handler{ + HTTPGet: &api_v1.HTTPGetAction{ + Scheme: api_v1.URISchemeHTTP, + Path: p, + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, + } + + applyProbeSettingsToHC(probe, hc) + + if hc.Protocol() != utils.HTTPS || hc.Port != 8080 { + t.Errorf("Basic HC settings changed") + } + if hc.RequestPath != "/"+p { + t.Errorf("Failed to apply probe's requestpath") } } diff --git a/controllers/gce/backends/fakes.go b/controllers/gce/backends/fakes.go index 2fe73cf86..a9eab62c7 100644 --- a/controllers/gce/backends/fakes.go +++ b/controllers/gce/backends/fakes.go @@ -20,6 +20,7 @@ import ( "fmt" compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/ingress/controllers/gce/utils" @@ -118,3 +119,21 @@ func (f *FakeBackendServices) GetHealth(name, instanceGroupLink string) (*comput return &compute.BackendServiceGroupHealth{ HealthStatus: states}, nil } + +// FakeProbeProvider implements the probeProvider interface for tests. +type FakeProbeProvider struct { + probes map[int64]*api_v1.Probe +} + +// NewFakeProbeProvider returns a struct which satifies probeProvider interface +func NewFakeProbeProvider(probes map[int64]*api_v1.Probe) *FakeProbeProvider { + return &FakeProbeProvider{probes} +} + +// GetProbe returns the probe for a given nodePort +func (pp *FakeProbeProvider) GetProbe(port int64) (*api_v1.Probe, error) { + if probe, exists := pp.probes[port]; exists { + return probe, nil + } + return nil, nil +} diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 1e53368fc..5a5bed4de 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -18,16 +18,23 @@ package backends import ( compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/client-go/pkg/api/v1" ) +// ProbeProvider retrieves a probe struct given a nodePort +type probeProvider interface { + GetProbe(nodePort int64) (*api_v1.Probe, error) +} + // BackendPool is an interface to manage a pool of kubernetes nodePort services // as gce backendServices, and sync them through the BackendServices interface. type BackendPool interface { - Add(port int64) error + Init(p probeProvider) + Add(port ServicePort) error Get(port int64) (*compute.BackendService, error) Delete(port int64) error - Sync(ports []int64) error - GC(ports []int64) error + Sync(ports []ServicePort) error + GC(ports []ServicePort) error Shutdown() error Status(name string) string List() ([]interface{}, error) diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 79d21bc17..2f2a0836f 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -65,7 +65,7 @@ const ( // ClusterManager manages cluster resource pools. type ClusterManager struct { ClusterNamer *utils.Namer - defaultBackendNodePort int64 + defaultBackendNodePort backends.ServicePort instancePool instances.NodePool backendPool backends.BackendPool l7Pool loadbalancers.LoadBalancerPool @@ -83,9 +83,7 @@ type ClusterManager struct { // Init initializes the cluster manager. func (c *ClusterManager) Init(tr *GCETranslator) { c.instancePool.Init(tr) - for _, h := range c.healthCheckers { - h.Init(tr) - } + c.backendPool.Init(tr) // TODO: Initialize other members as needed. } @@ -126,17 +124,17 @@ func (c *ClusterManager) shutdown() error { // these ports must also be opened on the corresponding Instance Group. // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. -func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []int64) error { +func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []backends.ServicePort) error { // Multiple ingress paths can point to the same service (and hence nodePort) // but each nodePort can only have one set of cloud resources behind it. So // don't waste time double validating GCE BackendServices. - portMap := map[int64]struct{}{} + portMap := map[int64]backends.ServicePort{} for _, p := range nodePorts { - portMap[p] = struct{}{} + portMap[p.Port] = p } - nodePorts = []int64{} - for p := range portMap { - nodePorts = append(nodePorts, p) + nodePorts = []backends.ServicePort{} + for _, sp := range portMap { + nodePorts = append(nodePorts, sp) } if err := c.backendPool.Sync(nodePorts); err != nil { return err @@ -158,7 +156,12 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName // we shouldn't leak the firewall rule. fwNodePorts = append(fwNodePorts, c.defaultBackendNodePort) } - if err := c.firewallPool.Sync(fwNodePorts, nodeNames); err != nil { + + var np []int64 + for _, p := range fwNodePorts { + np = append(np, p.Port) + } + if err := c.firewallPool.Sync(np, nodeNames); err != nil { return err } @@ -171,7 +174,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName // - nodePorts are the ports for which we want BackendServies. BackendServices // for ports not in this list are deleted. // This method ignores googleapi 404 errors (StatusNotFound). -func (c *ClusterManager) GC(lbNames []string, nodePorts []int64) error { +func (c *ClusterManager) GC(lbNames []string, nodePorts []backends.ServicePort) error { // On GC: // * Loadbalancers need to get deleted before backends. @@ -240,7 +243,7 @@ func getGCEClient(config io.Reader) *gce.GCECloud { func NewClusterManager( configFilePath string, namer *utils.Namer, - defaultBackendNodePort int64, + defaultBackendNodePort backends.ServicePort, defaultHealthCheckPath string) (*ClusterManager, error) { // TODO: Make this more resilient. Currently we create the cloud client @@ -279,15 +282,12 @@ func NewClusterManager( cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker} // TODO: This needs to change to a consolidated management of the default backend. - cluster.backendPool = backends.NewBackendPool( - cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true) - defaultBackendPool := backends.NewBackendPool( - cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) + cluster.backendPool = backends.NewBackendPool(cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort.Port}, true) + defaultBackendPool := backends.NewBackendPool(cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) cluster.defaultBackendNodePort = defaultBackendNodePort // L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs. - cluster.l7Pool = loadbalancers.NewLoadBalancerPool( - cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) + cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer) return &cluster, nil } diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index 11866b368..800dc3ed7 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -29,12 +29,11 @@ import ( "k8s.io/ingress/controllers/gce/utils" ) -const ( - testDefaultBeNodePort = int64(3000) +var ( + testDefaultBeNodePort = backends.ServicePort{Port: 3000, Protocol: utils.HTTP} + testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} ) -var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} - // ClusterManager fake type fakeClusterManager struct { *ClusterManager @@ -48,14 +47,13 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager fakeLbs := loadbalancers.NewFakeLoadBalancers(clusterName) fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil }) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - fakeHCs := healthchecks.NewFakeHealthChecks() + fakeHCP := healthchecks.NewFakeHealthCheckProvider() namer := utils.NewNamer(clusterName, firewallName) nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}}) - healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) - healthChecker.Init(&healthchecks.FakeHealthCheckGetter{}) + healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", namer) backendPool := backends.NewBackendPool( fakeBackends, diff --git a/controllers/gce/controller/util_test.go b/controllers/gce/controller/util_test.go index 774750f55..3b6cfb4bb 100644 --- a/controllers/gce/controller/util_test.go +++ b/controllers/gce/controller/util_test.go @@ -100,11 +100,11 @@ func TestProbeGetter(t *testing.T) { } addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.HealthCheck(p) - if err != nil { - t.Errorf("Failed to get health check for node port %v: %v", p, err) - } else if got.RequestPath != exp { - t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + got, err := lbc.tr.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getProbePath(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) } } } @@ -122,11 +122,11 @@ func TestProbeGetterNamedPort(t *testing.T) { pod.Spec.Containers[0].ReadinessProbe.Handler.HTTPGet.Port = intstr.IntOrString{Type: intstr.String, StrVal: "test"} } for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.HealthCheck(p) - if err != nil { - t.Errorf("Failed to get health check for node port %v: %v", p, err) - } else if got.RequestPath != exp { - t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + got, err := lbc.tr.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getProbePath(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) } } @@ -173,11 +173,11 @@ func TestProbeGetterCrossNamespace(t *testing.T) { addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.HealthCheck(p) - if err != nil { - t.Errorf("Failed to get health check for node port %v: %v", p, err) - } else if got.RequestPath != exp { - t.Errorf("Wrong health check for node port %v, got %v expected %v", p, got.RequestPath, exp) + got, err := lbc.tr.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getProbePath(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) } } } @@ -257,3 +257,7 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { } lbc.CloudClusterManager.instancePool.Init(lbc.tr) } + +func getProbePath(p *api_v1.Probe) string { + return p.Handler.HTTPGet.Path +} diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 34b7eb10c..d41ed2ff9 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "encoding/json" "fmt" "sort" "strconv" @@ -37,6 +38,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/ingress/controllers/gce/backends" "k8s.io/ingress/controllers/gce/loadbalancers" "k8s.io/ingress/controllers/gce/utils" ) @@ -63,6 +65,12 @@ const ( // to the target proxies of the Ingress. preSharedCertKey = "ingress.gcp.kubernetes.io/pre-shared-cert" + // serviceApplicationProtocolKey is a stringified JSON map of port names to + // protocol strings. Possible values are HTTP, HTTPS + // Example: + // '{"my-https-port":"HTTPS","my-http-port":"HTTP"}' + serviceApplicationProtocolKey = "service.alpha.kubernetes.io/app-protocols" + // ingressClassKey picks a specific "class" for the Ingress. The controller // only processes Ingresses with this annotation either unset, or set // to either gceIngessClass or the empty string. @@ -116,6 +124,30 @@ func (ing ingAnnotations) ingressClass() string { return val } +// svcAnnotations represents Service annotations. +type svcAnnotations map[string]string + +func (svc svcAnnotations) ApplicationProtocols() (map[string]utils.AppProtocol, error) { + val, ok := svc[serviceApplicationProtocolKey] + if !ok { + return map[string]utils.AppProtocol{}, nil + } + + var portToProtos map[string]utils.AppProtocol + err := json.Unmarshal([]byte(val), &portToProtos) + + // Verify protocol is an accepted value + for _, proto := range portToProtos { + switch proto { + case utils.HTTP, utils.HTTPS: + default: + return nil, fmt.Errorf("unexpected port application protocol: %v", proto) + } + } + + return portToProtos, err +} + // isGCEIngress returns true if the given Ingress either doesn't specify the // ingress.class annotation, or it's set to "gce". func isGCEIngress(ing *extensions.Ingress) bool { @@ -134,6 +166,15 @@ func (e errorNodePortNotFound) Error() string { e.backend, e.origErr) } +type errorSvcAppProtosParsing struct { + svc *api_v1.Service + origErr error +} + +func (e errorSvcAppProtosParsing) Error() string { + return fmt.Sprintf("could not parse %v annotation on Service %v/%v, err: %v", serviceApplicationProtocolKey, e.svc.Namespace, e.svc.Name, e.origErr) +} + // taskQueue manages a work queue through an independent worker that // invokes the given sync function for every work item inserted. type taskQueue struct { @@ -221,6 +262,7 @@ type StoreToPodLister struct { cache.Indexer } +// List returns a list of all pods based on selector func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api_v1.Pod, err error) { err = ListAll(s.Indexer, selector, func(m interface{}) { ret = append(ret, m.(*api_v1.Pod)) @@ -228,6 +270,7 @@ func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api_v1.Pod, er return ret, err } +// ListAll iterates a store and passes selected item to a func func ListAll(store cache.Store, selector labels.Selector, appendFn cache.AppendFunc) error { for _, m := range store.List() { metadata, err := meta.Accessor(m) @@ -362,17 +405,17 @@ func (t *GCETranslator) toGCEBackend(be *extensions.IngressBackend, ns string) ( if err != nil { return nil, err } - backend, err := t.CloudClusterManager.backendPool.Get(int64(port)) + backend, err := t.CloudClusterManager.backendPool.Get(port.Port) if err != nil { - return nil, fmt.Errorf( - "no GCE backend exists for port %v, kube backend %+v", port, be) + return nil, fmt.Errorf("no GCE backend exists for port %v, kube backend %+v", port, be) } return backend, nil } // getServiceNodePort looks in the svc store for a matching service:port, // and returns the nodeport. -func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespace string) (int, error) { +func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespace string) (backends.ServicePort, error) { + invalidPort := backends.ServicePort{} obj, exists, err := t.svcLister.Indexer.Get( &api_v1.Service{ ObjectMeta: meta_v1.ObjectMeta{ @@ -381,37 +424,51 @@ func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespa }, }) if !exists { - return invalidPort, errorNodePortNotFound{be, fmt.Errorf( - "service %v/%v not found in store", namespace, be.ServiceName)} + return invalidPort, errorNodePortNotFound{be, fmt.Errorf("service %v/%v not found in store", namespace, be.ServiceName)} } if err != nil { return invalidPort, errorNodePortNotFound{be, err} } - var nodePort int - for _, p := range obj.(*api_v1.Service).Spec.Ports { + svc := obj.(*api_v1.Service) + appProtocols, err := svcAnnotations(svc.GetAnnotations()).ApplicationProtocols() + if err != nil { + return invalidPort, errorSvcAppProtosParsing{svc, err} + } + + var port *api_v1.ServicePort +PortLoop: + for _, p := range svc.Spec.Ports { + np := p switch be.ServicePort.Type { case intstr.Int: if p.Port == be.ServicePort.IntVal { - nodePort = int(p.NodePort) - break + port = &np + break PortLoop } default: if p.Name == be.ServicePort.StrVal { - nodePort = int(p.NodePort) - break + port = &np + break PortLoop } } } - if nodePort != invalidPort { - return nodePort, nil + + if port == nil { + return invalidPort, errorNodePortNotFound{be, fmt.Errorf("could not find matching nodeport from service")} } - return invalidPort, errorNodePortNotFound{be, fmt.Errorf( - "could not find matching nodeport from service")} + + proto := utils.HTTP + if protoStr, exists := appProtocols[port.Name]; exists { + proto = utils.AppProtocol(protoStr) + } + + p := backends.ServicePort{Port: int64(port.NodePort), Protocol: proto} + return p, nil } // toNodePorts converts a pathlist to a flat list of nodeports. -func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { - knownPorts := []int64{} +func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []backends.ServicePort { + var knownPorts []backends.ServicePort for _, ing := range ings.Items { defaultBackend := ing.Spec.Backend if defaultBackend != nil { @@ -419,7 +476,7 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { if err != nil { glog.Infof("%v", err) } else { - knownPorts = append(knownPorts, int64(port)) + knownPorts = append(knownPorts, port) } } for _, rule := range ing.Spec.Rules { @@ -433,7 +490,7 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { glog.Infof("%v", err) continue } - knownPorts = append(knownPorts, int64(port)) + knownPorts = append(knownPorts, port) } } } @@ -529,80 +586,39 @@ func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOr // isSimpleHTTPProbe returns true if the given Probe is: // - an HTTPGet probe, as opposed to a tcp or exec probe -// - has a scheme of HTTP, as opposed to HTTPS // - has no special host or headers fields func isSimpleHTTPProbe(probe *api_v1.Probe) bool { return (probe != nil && probe.Handler.HTTPGet != nil && probe.Handler.HTTPGet.Host == "" && - probe.Handler.HTTPGet.Scheme == api_v1.URISchemeHTTP && len(probe.Handler.HTTPGet.HTTPHeaders) == 0) + len(probe.Handler.HTTPGet.HTTPHeaders) == 0) } -// HealthCheck returns the http readiness probe for the endpoint backing the -// given nodePort. If no probe is found it returns a health check with "" as -// the request path, callers are responsible for swapping this out for the -// appropriate default. -func (t *GCETranslator) HealthCheck(port int64) (*compute.HttpHealthCheck, error) { +// GetProbe returns a probe that's used for the given nodeport +func (t *GCETranslator) GetProbe(port int64) (*api_v1.Probe, error) { sl := t.svcLister.List() - var ingresses []extensions.Ingress - var healthCheck *compute.HttpHealthCheck - // Find the label and target port of the one service with the given nodePort - for _, as := range sl { - s := as.(*api_v1.Service) - for _, p := range s.Spec.Ports { + // Find the label and target port of the one service with the given nodePort + var service api_v1.Service + var svcPort api_v1.ServicePort + var found bool +OuterLoop: + for _, as := range sl { + service = *as.(*api_v1.Service) + for _, sp := range service.Spec.Ports { + svcPort = sp // only one Service can match this nodePort, try and look up // the readiness probe of the pods behind it - if int32(port) != p.NodePort { - continue + if int32(port) == sp.NodePort { + found = true + break OuterLoop } - rp, err := t.getHTTPProbe(*s, p.TargetPort) - if err != nil { - return nil, err - } - if rp == nil { - glog.Infof("No pod in service %v with node port %v has declared a matching readiness probe for health checks.", s.Name, port) - break - } - - healthPath := rp.Handler.HTTPGet.Path - // GCE requires a leading "/" for health check urls. - if string(healthPath[0]) != "/" { - healthPath = fmt.Sprintf("/%v", healthPath) - } - - host := rp.Handler.HTTPGet.Host - glog.Infof("Found custom health check for Service %v nodeport %v: %v%v", s.Name, port, host, healthPath) - // remember the ingresses that use this Service so we can send - // the right events - ingresses, err = t.ingLister.GetServiceIngress(s) - if err != nil { - glog.Warningf("Failed to list ingresses for service %v", s.Name) - } - - healthCheck = &compute.HttpHealthCheck{ - Port: port, - RequestPath: healthPath, - Host: host, - Description: "kubernetes L7 health check from readiness probe.", - // set a low health threshold and a high failure threshold. - // We're just trying to detect if the node networking is - // borked, service level outages will get detected sooner - // by kube-proxy. - CheckIntervalSec: int64(rp.PeriodSeconds + utils.DefaultHealthCheckInterval), - TimeoutSec: int64(rp.TimeoutSeconds), - HealthyThreshold: utils.DefaultHealthyThreshold, - UnhealthyThreshold: utils.DefaultUnhealthyThreshold, - // TODO: include headers after updating compute godep. - } - break } } - if healthCheck == nil { - healthCheck = utils.DefaultHealthCheckTemplate(port) + + if !found { + return nil, fmt.Errorf("unable to find nodeport %v in any service", port) } - for _, ing := range ingresses { - t.recorder.Eventf(&ing, api_v1.EventTypeNormal, "GCE", fmt.Sprintf("health check using %v:%v%v", healthCheck.Host, healthCheck.Port, healthCheck.RequestPath)) - } - return healthCheck, nil + + return t.getHTTPProbe(service, svcPort.TargetPort) } // PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker. diff --git a/controllers/gce/healthchecks/fakes.go b/controllers/gce/healthchecks/fakes.go index 07240ccef..013e3bf26 100644 --- a/controllers/gce/healthchecks/fakes.go +++ b/controllers/gce/healthchecks/fakes.go @@ -17,86 +17,98 @@ limitations under the License. package healthchecks import ( - "fmt" - compute "google.golang.org/api/compute/v1" - - "k8s.io/ingress/controllers/gce/utils" + "google.golang.org/api/googleapi" ) -// NewFakeHealthChecks returns a new FakeHealthChecks. -func NewFakeHealthChecks() *FakeHealthChecks { - return &FakeHealthChecks{hc: []*compute.HttpHealthCheck{}} +func fakeNotFoundErr() *googleapi.Error { + return &googleapi.Error{Code: 404} } -// FakeHealthCheckGetter implements the healthCheckGetter interface for tests. -type FakeHealthCheckGetter struct { - DefaultHealthCheck *compute.HttpHealthCheck -} - -// HealthCheck returns the health check for the given port. If a health check -// isn't stored under the DefaultHealthCheck member, it constructs one. -func (h *FakeHealthCheckGetter) HealthCheck(port int64) (*compute.HttpHealthCheck, error) { - if h.DefaultHealthCheck == nil { - return utils.DefaultHealthCheckTemplate(port), nil +// NewFakeHealthCheckProvider returns a new FakeHealthChecks. +func NewFakeHealthCheckProvider() *FakeHealthCheckProvider { + return &FakeHealthCheckProvider{ + http: make(map[string]compute.HttpHealthCheck), + generic: make(map[string]compute.HealthCheck), } - return h.DefaultHealthCheck, nil } -// FakeHealthChecks fakes out health checks. -type FakeHealthChecks struct { - hc []*compute.HttpHealthCheck +// FakeHealthCheckProvider fakes out health checks. +type FakeHealthCheckProvider struct { + http map[string]compute.HttpHealthCheck + generic map[string]compute.HealthCheck } // CreateHttpHealthCheck fakes out http health check creation. -func (f *FakeHealthChecks) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error { - f.hc = append(f.hc, hc) +func (f *FakeHealthCheckProvider) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error { + v := *hc + v.SelfLink = "https://fake.google.com/compute/httpHealthChecks/" + hc.Name + f.http[hc.Name] = v return nil } // GetHttpHealthCheck fakes out getting a http health check from the cloud. -func (f *FakeHealthChecks) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) { - for _, h := range f.hc { - if h.Name == name { - return h, nil - } +func (f *FakeHealthCheckProvider) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) { + if hc, found := f.http[name]; found { + return &hc, nil } - return nil, fmt.Errorf("health check %v not found", name) + + return nil, fakeNotFoundErr() } // DeleteHttpHealthCheck fakes out deleting a http health check. -func (f *FakeHealthChecks) DeleteHttpHealthCheck(name string) error { - healthChecks := []*compute.HttpHealthCheck{} - exists := false - for _, h := range f.hc { - if h.Name == name { - exists = true - continue - } - healthChecks = append(healthChecks, h) +func (f *FakeHealthCheckProvider) DeleteHttpHealthCheck(name string) error { + if _, exists := f.http[name]; !exists { + return fakeNotFoundErr() } - if !exists { - return fmt.Errorf("failed to find health check %v", name) - } - f.hc = healthChecks + + delete(f.http, name) return nil } // UpdateHttpHealthCheck sends the given health check as an update. -func (f *FakeHealthChecks) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { - healthChecks := []*compute.HttpHealthCheck{} - found := false - for _, h := range f.hc { - if h.Name == hc.Name { - healthChecks = append(healthChecks, hc) - found = true - } else { - healthChecks = append(healthChecks, h) - } +func (f *FakeHealthCheckProvider) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { + if _, exists := f.http[hc.Name]; !exists { + return fakeNotFoundErr() } - if !found { - return fmt.Errorf("cannot update a non-existent health check %v", hc.Name) - } - f.hc = healthChecks + + f.http[hc.Name] = *hc + return nil +} + +// CreateHealthCheck fakes out http health check creation. +func (f *FakeHealthCheckProvider) CreateHealthCheck(hc *compute.HealthCheck) error { + v := *hc + v.SelfLink = "https://fake.google.com/compute/healthChecks/" + hc.Name + f.generic[hc.Name] = v + return nil +} + +// GetHealthCheck fakes out getting a http health check from the cloud. +func (f *FakeHealthCheckProvider) GetHealthCheck(name string) (*compute.HealthCheck, error) { + if hc, found := f.generic[name]; found { + return &hc, nil + } + + return nil, fakeNotFoundErr() +} + +// DeleteHealthCheck fakes out deleting a http health check. +func (f *FakeHealthCheckProvider) DeleteHealthCheck(name string) error { + if _, exists := f.generic[name]; !exists { + return fakeNotFoundErr() + } + + delete(f.generic, name) + return nil +} + +// UpdateHealthCheck sends the given health check as an update. +func (f *FakeHealthCheckProvider) UpdateHealthCheck(hc *compute.HealthCheck) error { + if _, exists := f.generic[hc.Name]; !exists { + return fakeNotFoundErr() + } + + f.generic[hc.Name] = *hc return nil } diff --git a/controllers/gce/healthchecks/healthchecks.go b/controllers/gce/healthchecks/healthchecks.go index 6b37392ab..57c93b886 100644 --- a/controllers/gce/healthchecks/healthchecks.go +++ b/controllers/gce/healthchecks/healthchecks.go @@ -19,76 +19,194 @@ package healthchecks import ( "net/http" - "github.com/golang/glog" compute "google.golang.org/api/compute/v1" + "github.com/golang/glog" + "k8s.io/ingress/controllers/gce/utils" ) +const ( + // DefaultHealthCheckInterval defines how frequently a probe runs + DefaultHealthCheckInterval = 60 + // DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy" + DefaultHealthyThreshold = 1 + // DefaultUnhealthyThreshold defines the threshold of failure probes that declare a backend "unhealthy" + DefaultUnhealthyThreshold = 10 + // DefaultTimeoutSeconds defines the timeout of each probe + DefaultTimeoutSeconds = 60 +) + // HealthChecks manages health checks. type HealthChecks struct { - cloud SingleHealthCheck + cloud HealthCheckProvider defaultPath string namer *utils.Namer - healthCheckGetter } // NewHealthChecker creates a new health checker. // cloud: the cloud object implementing SingleHealthCheck. // defaultHealthCheckPath: is the HTTP path to use for health checks. -func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { - return &HealthChecks{cloud, defaultHealthCheckPath, namer, nil} +func NewHealthChecker(cloud HealthCheckProvider, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { + return &HealthChecks{cloud, defaultHealthCheckPath, namer} } -// Init initializes the health checker. -func (h *HealthChecks) Init(r healthCheckGetter) { - h.healthCheckGetter = r +// New returns a *HealthCheck with default settings and specified port/protocol +func (h *HealthChecks) New(port int64, protocol utils.AppProtocol) *HealthCheck { + hc := DefaultHealthCheck(port, protocol) + hc.Name = h.namer.BeName(port) + return hc } -// Add adds a healthcheck if one for the same port doesn't already exist. -func (h *HealthChecks) Add(port int64) error { - wantHC, err := h.healthCheckGetter.HealthCheck(port) +// Sync retrieves a health check based on port, checks type and settings and updates/creates if necessary. +// Sync is only called by the backends.Add func - it's not a pool like other resources. +func (h *HealthChecks) Sync(hc *HealthCheck) (string, error) { + // Verify default path + if hc.RequestPath == "" { + hc.RequestPath = h.defaultPath + } + + existingHC, err := h.Get(hc.Port) if err != nil { - return err - } - if wantHC.RequestPath == "" { - wantHC.RequestPath = h.defaultPath - } - name := h.namer.BeName(port) - wantHC.Name = name - hc, _ := h.Get(port) - if hc == nil { - // TODO: check if the readiness probe has changed and update the - // health check. - glog.Infof("Creating health check %v", name) - if err := h.cloud.CreateHttpHealthCheck(wantHC); err != nil { - return err + if !utils.IsHTTPErrorCode(err, http.StatusNotFound) { + return "", err } - } else if wantHC.RequestPath != hc.RequestPath { + + glog.Infof("Creating health check for port %v with protocol %v", hc.Port, hc.Type) + if err = h.cloud.CreateHealthCheck(hc.Out()); err != nil { + return "", err + } + + return h.getHealthCheckLink(hc.Port) + } + + if existingHC.Protocol() != hc.Protocol() { + glog.Infof("Updating health check %v because it has protocol %v but need %v", existingHC.Name, existingHC.Type, hc.Type) + err = h.cloud.UpdateHealthCheck(hc.Out()) + return existingHC.SelfLink, err + } + + if existingHC.RequestPath != hc.RequestPath { // TODO: reconcile health checks, and compare headers interval etc. // Currently Ingress doesn't expose all the health check params // natively, so some users prefer to hand modify the check. - glog.Infof("Unexpected request path on health check %v, has %v want %v, NOT reconciling", - name, hc.RequestPath, wantHC.RequestPath) + glog.Infof("Unexpected request path on health check %v, has %v want %v, NOT reconciling", hc.Name, existingHC.RequestPath, hc.RequestPath) } else { glog.Infof("Health check %v already exists and has the expected path %v", hc.Name, hc.RequestPath) } - return nil + + return existingHC.SelfLink, nil +} + +func (h *HealthChecks) getHealthCheckLink(port int64) (string, error) { + hc, err := h.Get(port) + if err != nil { + return "", err + } + return hc.SelfLink, nil } // Delete deletes the health check by port. func (h *HealthChecks) Delete(port int64) error { name := h.namer.BeName(port) glog.Infof("Deleting health check %v", name) - if err := h.cloud.DeleteHttpHealthCheck(h.namer.BeName(port)); err != nil { - if !utils.IsHTTPErrorCode(err, http.StatusNotFound) { - return err - } - } - return nil + return h.cloud.DeleteHealthCheck(name) } -// Get returns the given health check. -func (h *HealthChecks) Get(port int64) (*compute.HttpHealthCheck, error) { - return h.cloud.GetHttpHealthCheck(h.namer.BeName(port)) +// Get returns the health check by port +func (h *HealthChecks) Get(port int64) (*HealthCheck, error) { + name := h.namer.BeName(port) + hc, err := h.cloud.GetHealthCheck(name) + return NewHealthCheck(hc), err +} + +// DeleteLegacy deletes legacy HTTP health checks +func (h *HealthChecks) DeleteLegacy(port int64) error { + name := h.namer.BeName(port) + glog.Infof("Deleting legacy HTTP health check %v", name) + return h.cloud.DeleteHttpHealthCheck(name) +} + +// DefaultHealthCheck simply returns the default health check. +func DefaultHealthCheck(port int64, protocol utils.AppProtocol) *HealthCheck { + httpSettings := compute.HTTPHealthCheck{ + Port: port, + // Empty string is used as a signal to the caller to use the appropriate + // default. + RequestPath: "", + } + + hcSettings := compute.HealthCheck{ + // How often to health check. + CheckIntervalSec: DefaultHealthCheckInterval, + // How long to wait before claiming failure of a health check. + TimeoutSec: DefaultTimeoutSeconds, + // Number of healthchecks to pass for a vm to be deemed healthy. + HealthyThreshold: DefaultHealthyThreshold, + // Number of healthchecks to fail before the vm is deemed unhealthy. + UnhealthyThreshold: DefaultUnhealthyThreshold, + Description: "Default kubernetes L7 Loadbalancing health check.", + Type: string(protocol), + } + + return &HealthCheck{ + HTTPHealthCheck: httpSettings, + HealthCheck: hcSettings, + } +} + +// HealthCheck embeds two types - the generic healthcheck compute.HealthCheck +// and the HTTP settings compute.HTTPHealthCheck. By embedding both, consumers can modify +// all relevant settings (HTTP specific and HealthCheck generic) regardless of Type +// Consumers should call .Out() func to generate a compute.HealthCheck +// with the proper child struct (.HttpHealthCheck, .HttpshealthCheck, etc). +type HealthCheck struct { + compute.HTTPHealthCheck + compute.HealthCheck +} + +// NewHealthCheck creates a HealthCheck which abstracts nested structs away +func NewHealthCheck(hc *compute.HealthCheck) *HealthCheck { + if hc == nil { + return nil + } + + v := &HealthCheck{HealthCheck: *hc} + switch utils.AppProtocol(hc.Type) { + case utils.HTTP: + v.HTTPHealthCheck = *hc.HttpHealthCheck + case utils.HTTPS: + // HTTPHealthCheck and HTTPSHealthChecks have identical fields + v.HTTPHealthCheck = compute.HTTPHealthCheck(*hc.HttpsHealthCheck) + } + + // Users should be modifying HTTP(S) specific settings on the embedded + // HTTPHealthCheck. Setting these to nil for preventing confusion. + v.HealthCheck.HttpHealthCheck = nil + v.HealthCheck.HttpsHealthCheck = nil + + return v +} + +// Protocol returns the type cased to AppProtocol +func (hc *HealthCheck) Protocol() utils.AppProtocol { + return utils.AppProtocol(hc.Type) +} + +// Out returns a valid compute.HealthCheck object +func (hc *HealthCheck) Out() *compute.HealthCheck { + // Zeroing out child settings as a precaution. GoogleAPI throws an error + // if the wrong child struct is set. + hc.HealthCheck.HttpsHealthCheck = nil + hc.HealthCheck.HttpHealthCheck = nil + + switch hc.Protocol() { + case utils.HTTP: + hc.HealthCheck.HttpHealthCheck = &hc.HTTPHealthCheck + case utils.HTTPS: + https := compute.HTTPSHealthCheck(hc.HTTPHealthCheck) + hc.HealthCheck.HttpsHealthCheck = &https + } + + return &hc.HealthCheck } diff --git a/controllers/gce/healthchecks/healthchecks_test.go b/controllers/gce/healthchecks/healthchecks_test.go index 9db1edd49..9d76d1bde 100644 --- a/controllers/gce/healthchecks/healthchecks_test.go +++ b/controllers/gce/healthchecks/healthchecks_test.go @@ -17,47 +17,170 @@ limitations under the License. package healthchecks import ( + "net/http" "testing" + compute "google.golang.org/api/compute/v1" + "k8s.io/ingress/controllers/gce/utils" ) -func TestFakeHealthCheckActions(t *testing.T) { - namer := &utils.Namer{} - healthChecks := NewHealthChecker(NewFakeHealthChecks(), "/", namer) - healthChecks.Init(&FakeHealthCheckGetter{DefaultHealthCheck: nil}) +func TestHealthCheckAdd(t *testing.T) { + namer := utils.NewNamer("ABC", "XYZ") + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) - err := healthChecks.Add(80) + hc := healthChecks.New(80, utils.HTTP) + _, err := healthChecks.Sync(hc) if err != nil { - t.Fatalf("unexpected error") + t.Fatalf("unexpected error: %v", err) } - - _, err1 := healthChecks.Get(8080) - if err1 == nil { - t.Errorf("expected error") - } - - hc, err2 := healthChecks.Get(80) - if err2 != nil { - t.Errorf("unexpected error") - } else { - if hc == nil { - t.Errorf("expected a *compute.HttpHealthCheck") - } - } - - err = healthChecks.Delete(8080) - if err == nil { - t.Errorf("expected error") - } - - err = healthChecks.Delete(80) + // Verify the health check exists + _, err = hcp.GetHealthCheck(namer.BeName(80)) if err != nil { - t.Errorf("unexpected error") + t.Fatalf("expected the health check to exist, err: %v", err) } - _, err3 := healthChecks.Get(80) - if err3 == nil { - t.Errorf("expected error") + hc = healthChecks.New(443, utils.HTTPS) + _, err = healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHealthCheck(namer.BeName(443)) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) } } + +func TestHealthCheckAddExisting(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + // HTTP + // Manually insert a health check + httpHC := DefaultHealthCheck(3000, utils.HTTP) + httpHC.Name = namer.BeName(3000) + httpHC.RequestPath = "/my-probes-health" + hcp.CreateHealthCheck(httpHC.Out()) + + // Should not fail adding the same type of health check + hc := healthChecks.New(3000, utils.HTTP) + _, err := healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHealthCheck(httpHC.Name) + if err != nil { + t.Fatalf("expected the health check to continue existing, err: %v", err) + } + + // HTTPS + // Manually insert a health check + httpsHC := DefaultHealthCheck(4000, utils.HTTPS) + httpsHC.Name = namer.BeName(4000) + httpsHC.RequestPath = "/my-probes-health" + hcp.CreateHealthCheck(httpsHC.Out()) + + hc = healthChecks.New(4000, utils.HTTPS) + _, err = healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Verify the health check exists + _, err = hcp.GetHealthCheck(httpsHC.Name) + if err != nil { + t.Fatalf("expected the health check to continue existing, err: %v", err) + } +} + +func TestHealthCheckDelete(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + // Create HTTP HC for 1234 + hc := DefaultHealthCheck(1234, utils.HTTP) + hc.Name = namer.BeName(1234) + hcp.CreateHealthCheck(hc.Out()) + + // Create HTTPS HC for 1234) + hc.Type = string(utils.HTTPS) + hcp.CreateHealthCheck(hc.Out()) + + // Delete only HTTP 1234 + err := healthChecks.Delete(1234) + if err != nil { + t.Errorf("unexpected error when deleting health check, err: %v", err) + } + + // Validate port is deleted + _, err = hcp.GetHealthCheck(hc.Name) + if !utils.IsHTTPErrorCode(err, http.StatusNotFound) { + t.Errorf("expected not-found error, actual: %v", err) + } + + // Delete only HTTP 1234 + err = healthChecks.Delete(1234) + if err == nil { + t.Errorf("expected not-found error when deleting health check, err: %v", err) + } +} + +func TestHealthCheckUpdate(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + // HTTP + // Manually insert a health check + hc := DefaultHealthCheck(3000, utils.HTTP) + hc.Name = namer.BeName(3000) + hc.RequestPath = "/my-probes-health" + hcp.CreateHealthCheck(hc.Out()) + + // Verify the health check exists + _, err := healthChecks.Get(3000) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) + } + + // Change to HTTPS + hc.Type = string(utils.HTTPS) + _, err = healthChecks.Sync(hc) + if err != nil { + t.Fatalf("unexpected err while syncing healthcheck, err %v", err) + } + + // Verify the health check exists + _, err = healthChecks.Get(3000) + if err != nil { + t.Fatalf("expected the health check to exist, err: %v", err) + } + + // Verify the check is now HTTPS + if hc.Protocol() != utils.HTTPS { + t.Fatalf("expected check to be of type HTTPS") + } +} + +func TestHealthCheckDeleteLegacy(t *testing.T) { + namer := &utils.Namer{} + hcp := NewFakeHealthCheckProvider() + healthChecks := NewHealthChecker(hcp, "/", namer) + + err := hcp.CreateHttpHealthCheck(&compute.HttpHealthCheck{ + Name: namer.BeName(80), + }) + if err != nil { + t.Fatalf("expected health check to be created, err: %v", err) + } + + err = healthChecks.DeleteLegacy(80) + if err != nil { + t.Fatalf("expected health check to be deleted, err: %v", err) + } + +} diff --git a/controllers/gce/healthchecks/interfaces.go b/controllers/gce/healthchecks/interfaces.go index 59d259696..cdf8c635d 100644 --- a/controllers/gce/healthchecks/interfaces.go +++ b/controllers/gce/healthchecks/interfaces.go @@ -18,27 +18,28 @@ package healthchecks import ( compute "google.golang.org/api/compute/v1" + + "k8s.io/ingress/controllers/gce/utils" ) -// healthCheckGetter retrieves health checks. -type healthCheckGetter interface { - // HealthCheck returns the HTTP readiness check for a node port. - HealthCheck(nodePort int64) (*compute.HttpHealthCheck, error) -} - -// SingleHealthCheck is an interface to manage a single GCE health check. -type SingleHealthCheck interface { +// HealthCheckProvider is an interface to manage a single GCE health check. +type HealthCheckProvider interface { CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error DeleteHttpHealthCheck(name string) error GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) + + CreateHealthCheck(hc *compute.HealthCheck) error + UpdateHealthCheck(hc *compute.HealthCheck) error + DeleteHealthCheck(name string) error + GetHealthCheck(name string) (*compute.HealthCheck, error) } // HealthChecker is an interface to manage cloud HTTPHealthChecks. type HealthChecker interface { - Init(h healthCheckGetter) - - Add(port int64) error + New(port int64, protocol utils.AppProtocol) *HealthCheck + Sync(hc *HealthCheck) (string, error) Delete(port int64) error - Get(port int64) (*compute.HttpHealthCheck, error) + Get(port int64) (*HealthCheck, error) + DeleteLegacy(port int64) error } diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index dd5fd1574..9972ccc01 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -70,7 +70,7 @@ type L7s struct { // TODO: Remove this field and always ask the BackendPool using the NodePort. glbcDefaultBackend *compute.BackendService defaultBackendPool backends.BackendPool - defaultBackendNodePort int64 + defaultBackendNodePort backends.ServicePort namer *utils.Namer } @@ -84,7 +84,7 @@ type L7s struct { func NewLoadBalancerPool( cloud LoadBalancers, defaultBackendPool backends.BackendPool, - defaultBackendNodePort int64, namer *utils.Namer) LoadBalancerPool { + defaultBackendNodePort backends.ServicePort, namer *utils.Namer) LoadBalancerPool { return &L7s{cloud, storage.NewInMemoryPool(), nil, defaultBackendPool, defaultBackendNodePort, namer} } @@ -172,7 +172,7 @@ func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil { return err } - defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort) + defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort.Port) if err != nil { return err } @@ -209,7 +209,7 @@ func (l *L7s) GC(names []string) error { // This needs to happen after we've deleted all url-maps that might be // using it. if len(names) == 0 { - if err := l.defaultBackendPool.Delete(l.defaultBackendNodePort); err != nil { + if err := l.defaultBackendPool.Delete(l.defaultBackendNodePort.Port); err != nil { return err } l.glbcDefaultBackend = nil diff --git a/controllers/gce/loadbalancers/loadbalancers_test.go b/controllers/gce/loadbalancers/loadbalancers_test.go index a2f54d9a1..16cb5130a 100644 --- a/controllers/gce/loadbalancers/loadbalancers_test.go +++ b/controllers/gce/loadbalancers/loadbalancers_test.go @@ -30,17 +30,19 @@ import ( ) const ( - testDefaultBeNodePort = int64(3000) - defaultZone = "zone-a" + defaultZone = "zone-a" +) + +var ( + testDefaultBeNodePort = backends.ServicePort{Port: 3000, Protocol: utils.HTTP} ) func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil }) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - fakeHCs := healthchecks.NewFakeHealthChecks() + fakeHCP := healthchecks.NewFakeHealthCheckProvider() namer := &utils.Namer{} - healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) - healthChecker.Init(&healthchecks.FakeHealthCheckGetter{}) + healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", namer) nodePool := instances.NewNodePool(fakeIGs) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) backendPool := backends.NewBackendPool( diff --git a/controllers/gce/main.go b/controllers/gce/main.go index 491098395..7d2806bf7 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -40,6 +40,7 @@ import ( "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/ingress/controllers/gce/backends" "k8s.io/ingress/controllers/gce/controller" "k8s.io/ingress/controllers/gce/loadbalancers" "k8s.io/ingress/controllers/gce/storage" @@ -226,11 +227,13 @@ func main() { glog.Fatalf("Default backend should take the form namespace/name: %v", *defaultSvc) } - defaultBackendNodePort, err := getNodePort(kubeClient, parts[0], parts[1]) + nodePort, err := getNodePort(kubeClient, parts[0], parts[1]) if err != nil { glog.Fatalf("Could not configure default backend %v: %v", *defaultSvc, err) } + // The default backend is known to be HTTP + defaultBackendNodePort := backends.ServicePort{Port: nodePort, Protocol: utils.HTTP} if *inCluster || *useRealCloud { // Create cluster manager diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index f59d7b259..e6fcc16e7 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -80,16 +80,14 @@ const ( // debug information in the Ingress annotations. K8sAnnotationPrefix = "ingress.kubernetes.io" - // DefaultHealthCheckInterval defines how frequently a probe runs - DefaultHealthCheckInterval = 60 - // DefaultHealthyThreshold defines the threshold of success probes that declare a backend "healthy" - DefaultHealthyThreshold = 1 - // DefaultUnhealthyThreshold defines the threshold of failure probes that declare a backend "unhealthy" - DefaultUnhealthyThreshold = 10 - // DefaultTimeoutSeconds defines the timeout of each probe - DefaultTimeoutSeconds = 60 + // HTTP protocol for a service + HTTP AppProtocol = "HTTP" + // HTTPS protocol for a service + HTTPS AppProtocol = "HTTPS" ) +type AppProtocol string + // Namer handles centralized naming for the cluster. type Namer struct { clusterName string @@ -333,22 +331,3 @@ func CompareLinks(l1, l2 string) bool { // FakeIngressRuleValueMap is a convenience type used by multiple submodules // that share the same testing methods. type FakeIngressRuleValueMap map[string]string - -// DefaultHealthCheckTemplate simply returns the default health check template. -func DefaultHealthCheckTemplate(port int64) *compute.HttpHealthCheck { - return &compute.HttpHealthCheck{ - Port: port, - // Empty string is used as a signal to the caller to use the appropriate - // default. - RequestPath: "", - Description: "Default kubernetes L7 Loadbalancing health check.", - // How often to health check. - CheckIntervalSec: DefaultHealthCheckInterval, - // How long to wait before claiming failure of a health check. - TimeoutSec: DefaultTimeoutSeconds, - // Number of healthchecks to pass for a vm to be deemed healthy. - HealthyThreshold: DefaultHealthyThreshold, - // Number of healthchecks to fail before the vm is deemed unhealthy. - UnhealthyThreshold: DefaultUnhealthyThreshold, - } -}