diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index d46d68d10..49b632efe 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -211,15 +211,22 @@ func (b *Backends) create(namedPort *compute.NamedPort, hcLink string, sp Servic } // Add will get or create a Backend for the given port. -func (b *Backends) Add(p ServicePort) error { +// Uses the given instance groups if non-nil, else creates instance groups. +func (b *Backends) Add(p ServicePort, igs []*compute.InstanceGroup) error { // We must track the port even if creating the backend failed, because // we might've created a health-check for it. be := &compute.BackendService{} defer func() { b.snapshotter.Add(portKey(p.Port), be) }() - igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port) - if err != nil { - return err + var err error + // Ideally callers should pass the instance groups to prevent recomputing them here. + // Igs can be nil in scenarios where we do not have instance groups such as + // while syncing default backend service. + if igs == nil { + igs, _, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, p.Port) + if err != nil { + return err + } } // Ensure health check for backend service exists @@ -232,6 +239,7 @@ func (b *Backends) Add(p ServicePort) error { pName := b.namer.BeName(p.Port) be, _ = b.Get(p.Port) if be == nil { + namedPort := utils.GetNamedPort(p.Port) glog.V(2).Infof("Creating backend service for port %v named port %v", p.Port, namedPort) be, err = b.create(namedPort, hcLink, p, pName) if err != nil { @@ -381,12 +389,12 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr } // Sync syncs backend services corresponding to ports in the given list. -func (b *Backends) Sync(svcNodePorts []ServicePort) error { +func (b *Backends) Sync(svcNodePorts []ServicePort, igs []*compute.InstanceGroup) error { glog.V(3).Infof("Sync: backends %v", svcNodePorts) // create backends for new ports, perform an edge hop for existing ports for _, port := range svcNodePorts { - if err := b.Add(port); err != nil { + if err := b.Add(port, igs); err != nil { return err } } diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index ba1331558..8d116ac6e 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -80,7 +80,7 @@ func TestBackendPoolAdd(t *testing.T) { // Add a backend for a port, then re-add the same port and // make sure it corrects a broken link from the backend to // the instance group. - err := pool.Add(nodePort) + err := pool.Add(nodePort, nil) if err != nil { t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", nodePort, err) } @@ -143,7 +143,7 @@ func TestHealthCheckMigration(t *testing.T) { hcp.CreateHttpHealthCheck(legacyHC) // Add the service port to the backend pool - pool.Add(p) + pool.Add(p, nil) // Assert the proper health check was created hc, _ := pool.healthChecker.Get(p.Port) @@ -168,7 +168,7 @@ func TestBackendPoolUpdate(t *testing.T) { namer := utils.Namer{} p := ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP} - pool.Add(p) + pool.Add(p, nil) beName := namer.BeName(p.Port) be, err := f.GetGlobalBackendService(beName) @@ -188,7 +188,7 @@ func TestBackendPoolUpdate(t *testing.T) { // Update service port to encrypted p.Protocol = utils.ProtocolHTTPS - pool.Sync([]ServicePort{p}) + pool.Sync([]ServicePort{p}, nil) be, err = f.GetGlobalBackendService(beName) if err != nil { @@ -214,7 +214,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) { namer := utils.Namer{} nodePort := ServicePort{Port: 8080, Protocol: utils.ProtocolHTTP} - pool.Add(nodePort) + pool.Add(nodePort, nil) beName := namer.BeName(nodePort.Port) be, _ := f.GetGlobalBackendService(beName) @@ -227,7 +227,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) { f.calls = []int{} f.UpdateGlobalBackendService(be) - pool.Add(nodePort) + pool.Add(nodePort, nil) for _, call := range f.calls { if call == utils.Create { t.Fatalf("Unexpected create for existing backend service") @@ -260,9 +260,9 @@ func TestBackendPoolSync(t *testing.T) { f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool, _ := newTestJig(f, fakeIGs, true) - pool.Add(ServicePort{Port: 81}) - pool.Add(ServicePort{Port: 90}) - if err := pool.Sync(svcNodePorts); err != nil { + pool.Add(ServicePort{Port: 81}, nil) + pool.Add(ServicePort{Port: 90}, nil) + if err := pool.Sync(svcNodePorts, nil); err != nil { t.Errorf("Expected backend pool to sync, err: %v", err) } if err := pool.GC(svcNodePorts); err != nil { @@ -361,7 +361,7 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) { }) // Have pool sync the above backend service - bp.Add(ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS}) + bp.Add(ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS}, nil) // Verify the legacy health check has been deleted _, err = hcp.GetHttpHealthCheck(beName) @@ -388,7 +388,7 @@ func TestBackendPoolShutdown(t *testing.T) { namer := utils.Namer{} // Add a backend-service and verify that it doesn't exist after Shutdown() - pool.Add(ServicePort{Port: 80}) + pool.Add(ServicePort{Port: 80}, nil) pool.Shutdown() if _, err := f.GetGlobalBackendService(namer.BeName(80)); err == nil { t.Fatalf("%v", err) @@ -402,7 +402,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { namer := utils.Namer{} // This will add the instance group k8s-ig to the instance pool - pool.Add(ServicePort{Port: 80}) + pool.Add(ServicePort{Port: 80}, nil) be, err := f.GetGlobalBackendService(namer.BeName(80)) if err != nil { @@ -420,7 +420,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { } // Make sure repeated adds don't clobber the inserted instance group - pool.Add(ServicePort{Port: 80}) + pool.Add(ServicePort{Port: 80}, nil) be, err = f.GetGlobalBackendService(namer.BeName(80)) if err != nil { t.Fatalf("%v", err) @@ -462,7 +462,7 @@ func TestBackendCreateBalancingMode(t *testing.T) { return nil } - pool.Add(nodePort) + pool.Add(nodePort, nil) be, err := f.GetGlobalBackendService(namer.BeName(nodePort.Port)) if err != nil { t.Fatalf("%v", err) diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 2153f3505..586ceb17c 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -30,10 +30,10 @@ type probeProvider interface { // as gce backendServices, and sync them through the BackendServices interface. type BackendPool interface { Init(p probeProvider) - Add(port ServicePort) error + Add(port ServicePort, igs []*compute.InstanceGroup) error Get(port int64) (*compute.BackendService, error) Delete(port int64) error - Sync(ports []ServicePort) error + Sync(ports []ServicePort, igs []*compute.InstanceGroup) error GC(ports []ServicePort) error Shutdown() error Status(name string) string diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 1f9156a8c..e987711b9 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -21,6 +21,7 @@ import ( "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/ingress/controllers/gce/backends" @@ -108,41 +109,45 @@ func (c *ClusterManager) shutdown() error { } // Checkpoint performs a checkpoint with the cloud. -// - lbNames are the names of L7 loadbalancers we wish to exist. If they already +// - lbs are the single cluster L7 loadbalancers we wish to exist. If they already // exist, they should not have any broken links between say, a UrlMap and // TargetHttpProxy. // - nodeNames are the names of nodes we wish to add to all loadbalancer // instance groups. -// - nodePorts are the ports for which we require BackendServices. Each of -// these ports must also be opened on the corresponding Instance Group. +// - backendServicePorts are the ports for which we require BackendServices. +// - namedPorts are the ports which must be opened on instance groups. +// Returns the list of all instance groups corresponding to the given loadbalancers. // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. -func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []backends.ServicePort) error { +func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { + if len(namedPorts) != 0 { + // Add the default backend node port to the list of named ports for instance groups. + namedPorts = append(namedPorts, c.defaultBackendNodePort) + } // Multiple ingress paths can point to the same service (and hence nodePort) // but each nodePort can only have one set of cloud resources behind it. So // don't waste time double validating GCE BackendServices. - portMap := map[int64]backends.ServicePort{} - for _, p := range nodePorts { - portMap[p.Port] = p + namedPorts = uniq(namedPorts) + backendServicePorts = uniq(backendServicePorts) + // Create Instance Groups. + igs, err := c.EnsureInstanceGroupsAndPorts(namedPorts) + if err != nil { + return igs, err } - nodePorts = []backends.ServicePort{} - for _, sp := range portMap { - nodePorts = append(nodePorts, sp) - } - if err := c.backendPool.Sync(nodePorts); err != nil { - return err + if err := c.backendPool.Sync(backendServicePorts, igs); err != nil { + return igs, err } if err := c.instancePool.Sync(nodeNames); err != nil { - return err + return igs, err } if err := c.l7Pool.Sync(lbs); err != nil { - return err + return igs, err } // TODO: Manage default backend and its firewall rule in a centralized way. // DefaultBackend is managed in l7 pool, which doesn't understand instances, // which the firewall rule requires. - fwNodePorts := nodePorts + fwNodePorts := backendServicePorts if len(lbs) != 0 { // If there are no Ingresses, we shouldn't be allowing traffic to the // default backend. Equally importantly if the cluster gets torn down @@ -155,10 +160,27 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName np = append(np, p.Port) } if err := c.firewallPool.Sync(np, nodeNames); err != nil { - return err + return igs, err } - return nil + return igs, nil +} + +func (c *ClusterManager) EnsureInstanceGroupsAndPorts(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { + var igs []*compute.InstanceGroup + var err error + for _, p := range servicePorts { + // EnsureInstanceGroupsAndPorts always returns all the instance groups, so we can return + // the output of any call, no need to append the return from all calls. + // TODO: Ideally, we want to call CreateInstaceGroups only the first time and + // then call AddNamedPort multiple times. Need to update the interface to + // achieve this. + igs, _, err = instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, p.Port) + if err != nil { + return nil, err + } + } + return igs, nil } // GC garbage collects unused resources. diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index c3236b16c..e0a8de1fa 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -149,7 +149,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerC ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addIng := obj.(*extensions.Ingress) - if !isGCEIngress(addIng) { + if !isGCEIngress(addIng) && !isGCEMultiClusterIngress(addIng) { glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, ingressClassKey) return } @@ -158,7 +158,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerC }, DeleteFunc: func(obj interface{}) { delIng := obj.(*extensions.Ingress) - if !isGCEIngress(delIng) { + if !isGCEIngress(delIng) && !isGCEMultiClusterIngress(delIng) { glog.Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, ingressClassKey) return } @@ -167,7 +167,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerC }, UpdateFunc: func(old, cur interface{}) { curIng := cur.(*extensions.Ingress) - if !isGCEIngress(curIng) { + if !isGCEIngress(curIng) && !isGCEMultiClusterIngress(curIng) { return } if !reflect.DeepEqual(old, cur) { @@ -275,13 +275,19 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { } glog.V(3).Infof("Syncing %v", key) - ingresses, err := lbc.ingLister.List() + allIngresses, err := lbc.ingLister.ListAll() if err != nil { return err } - nodePorts := lbc.tr.toNodePorts(&ingresses) + gceIngresses, err := lbc.ingLister.ListGCEIngresses() + if err != nil { + return err + } + + allNodePorts := lbc.tr.toNodePorts(&allIngresses) + gceNodePorts := lbc.tr.toNodePorts(&gceIngresses) lbNames := lbc.ingLister.Store.ListKeys() - lbs, err := lbc.ListRuntimeInfo() + lbs, err := lbc.toRuntimeInfo(gceIngresses) if err != nil { return err } @@ -307,15 +313,15 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { var syncError error defer func() { - if deferErr := lbc.CloudClusterManager.GC(lbNames, nodePorts); deferErr != nil { + if deferErr := lbc.CloudClusterManager.GC(lbNames, allNodePorts); deferErr != nil { err = fmt.Errorf("error during sync %v, error during GC %v", syncError, deferErr) } glog.V(3).Infof("Finished syncing %v", key) }() - // Record any errors during sync and throw a single error at the end. This // allows us to free up associated cloud resources ASAP. - if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil { + igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts) + if err != nil { // TODO: Implement proper backoff for the queue. eventMsg := "GCE" if ingExists { @@ -329,6 +335,22 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if !ingExists { return syncError } + ing := *obj.(*extensions.Ingress) + if isGCEMultiClusterIngress(&ing) { + // Add instance group names as annotation on the ingress. + if ing.Annotations == nil { + ing.Annotations = map[string]string{} + } + err = setInstanceGroupsAnnotation(ing.Annotations, igs) + if err != nil { + return err + } + if err := lbc.updateAnnotations(ing.Name, ing.Namespace, ing.Annotations); err != nil { + return err + } + return nil + } + // Update the UrlMap of the single loadbalancer that came through the watch. l7, err := lbc.CloudClusterManager.l7Pool.Get(key) if err != nil { @@ -336,7 +358,6 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { return syncError } - ing := *obj.(*extensions.Ingress) if urlMap, err := lbc.tr.toURLMap(&ing); err != nil { syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err) } else if err := l7.UpdateUrlMap(urlMap); err != nil { @@ -379,14 +400,23 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing lbc.recorder.Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip) } } + annotations := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool) + if err := lbc.updateAnnotations(ing.Name, ing.Namespace, annotations); err != nil { + return err + } + return nil +} + +func (lbc *LoadBalancerController) updateAnnotations(name, namespace string, annotations map[string]string) error { // Update annotations through /update endpoint - currIng, err = ingClient.Get(ing.Name, metav1.GetOptions{}) + ingClient := lbc.client.Extensions().Ingresses(namespace) + currIng, err := ingClient.Get(name, metav1.GetOptions{}) if err != nil { return err } - currIng.Annotations = loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool) - if !reflect.DeepEqual(ing.Annotations, currIng.Annotations) { - glog.V(3).Infof("Updating annotations of %v/%v", ing.Namespace, ing.Name) + if !reflect.DeepEqual(currIng.Annotations, annotations) { + glog.V(3).Infof("Updating annotations of %v/%v", namespace, name) + currIng.Annotations = annotations if _, err := ingClient.Update(currIng); err != nil { return err } @@ -394,12 +424,8 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing return nil } -// ListRuntimeInfo lists L7RuntimeInfo as understood by the loadbalancer module. -func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7RuntimeInfo, err error) { - ingList, err := lbc.ingLister.List() - if err != nil { - return lbs, err - } +// toRuntimeInfo returns L7RuntimeInfo for the given ingresses. +func (lbc *LoadBalancerController) toRuntimeInfo(ingList extensions.IngressList) (lbs []*loadbalancers.L7RuntimeInfo, err error) { for _, ing := range ingList.Items { k, err := keyFunc(&ing) if err != nil { diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 72ba593a8..d313097f5 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -26,6 +26,7 @@ import ( "github.com/golang/glog" compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/api/meta" @@ -75,12 +76,19 @@ const ( // ingressClassKey picks a specific "class" for the Ingress. The controller // only processes Ingresses with this annotation either unset, or set // to either gceIngessClass or the empty string. - ingressClassKey = "kubernetes.io/ingress.class" - gceIngressClass = "gce" + ingressClassKey = "kubernetes.io/ingress.class" + gceIngressClass = "gce" + gceMultiIngressClass = "gce-multi-cluster" // Label key to denote which GCE zone a Kubernetes node is in. zoneKey = "failure-domain.beta.kubernetes.io/zone" defaultZone = "" + + // instanceGroupsAnnotationKey is the annotation key used by controller to + // specify the name and zone of instance groups created for the ingress. + // This is read only for users. Controller will overrite any user updates. + // This is only set for ingresses with ingressClass = "gce-multi-cluster" + instanceGroupsAnnotationKey = "ingress.gcp.kubernetes.io/instance-groups" ) // ingAnnotations represents Ingress annotations. @@ -156,6 +164,13 @@ func isGCEIngress(ing *extensions.Ingress) bool { return class == "" || class == gceIngressClass } +// isGCEMultiClusterIngress returns true if the given Ingress has +// ingress.class annotation set to "gce-multi-cluster". +func isGCEMultiClusterIngress(ing *extensions.Ingress) bool { + class := ingAnnotations(ing.ObjectMeta.Annotations).ingressClass() + return class == gceMultiIngressClass +} + // errorNodePortNotFound is an implementation of error. type errorNodePortNotFound struct { backend extensions.IngressBackend @@ -285,8 +300,19 @@ func ListAll(store cache.Store, selector labels.Selector, appendFn cache.AppendF return nil } -// List lists all Ingress' in the store. -func (s *StoreToIngressLister) List() (ing extensions.IngressList, err error) { +// List lists all Ingress' in the store (both single and multi cluster ingresses). +func (s *StoreToIngressLister) ListAll() (ing extensions.IngressList, err error) { + for _, m := range s.Store.List() { + newIng := m.(*extensions.Ingress) + if isGCEIngress(newIng) || isGCEMultiClusterIngress(newIng) { + ing.Items = append(ing.Items, *newIng) + } + } + return ing, nil +} + +// ListGCEIngresses lists all GCE Ingress' in the store. +func (s *StoreToIngressLister) ListGCEIngresses() (ing extensions.IngressList, err error) { for _, m := range s.Store.List() { newIng := m.(*extensions.Ingress) if isGCEIngress(newIng) { @@ -471,32 +497,39 @@ PortLoop: return p, nil } -// toNodePorts converts a pathlist to a flat list of nodeports. +// toNodePorts is a helper method over ingressToNodePorts to process a list of ingresses. func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []backends.ServicePort { var knownPorts []backends.ServicePort for _, ing := range ings.Items { - defaultBackend := ing.Spec.Backend - if defaultBackend != nil { - port, err := t.getServiceNodePort(*defaultBackend, ing.Namespace) + knownPorts = append(knownPorts, t.ingressToNodePorts(&ing)...) + } + return knownPorts +} + +// ingressToNodePorts converts a pathlist to a flat list of nodeports for the given ingress. +func (t *GCETranslator) ingressToNodePorts(ing *extensions.Ingress) []backends.ServicePort { + var knownPorts []backends.ServicePort + defaultBackend := ing.Spec.Backend + if defaultBackend != nil { + port, err := t.getServiceNodePort(*defaultBackend, ing.Namespace) + if err != nil { + glog.Infof("%v", err) + } else { + knownPorts = append(knownPorts, port) + } + } + for _, rule := range ing.Spec.Rules { + if rule.HTTP == nil { + glog.Errorf("ignoring non http Ingress rule") + continue + } + for _, path := range rule.HTTP.Paths { + port, err := t.getServiceNodePort(path.Backend, ing.Namespace) if err != nil { glog.Infof("%v", err) - } else { - knownPorts = append(knownPorts, port) - } - } - for _, rule := range ing.Spec.Rules { - if rule.HTTP == nil { - glog.Errorf("ignoring non http Ingress rule") continue } - for _, path := range rule.HTTP.Paths { - port, err := t.getServiceNodePort(path.Backend, ing.Namespace) - if err != nil { - glog.Infof("%v", err) - continue - } - knownPorts = append(knownPorts, port) - } + knownPorts = append(knownPorts, port) } } return knownPorts @@ -640,3 +673,34 @@ func (o PodsByCreationTimestamp) Less(i, j int) bool { } return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) } + +// setInstanceGroupsAnnotation sets the instance-groups annotation with names of the given instance groups. +func setInstanceGroupsAnnotation(existing map[string]string, igs []*compute.InstanceGroup) error { + type Value struct { + Name string + Zone string + } + var instanceGroups []Value + for _, ig := range igs { + instanceGroups = append(instanceGroups, Value{Name: ig.Name, Zone: ig.Zone}) + } + jsonValue, err := json.Marshal(instanceGroups) + if err != nil { + return err + } + existing[instanceGroupsAnnotationKey] = string(jsonValue) + return nil +} + +// uniq returns an array of unique service ports from the given array. +func uniq(nodePorts []backends.ServicePort) []backends.ServicePort { + portMap := map[int64]backends.ServicePort{} + for _, p := range nodePorts { + portMap[p.Port] = p + } + nodePorts = make([]backends.ServicePort, 0, len(portMap)) + for _, sp := range portMap { + nodePorts = append(nodePorts, sp) + } + return nodePorts +} diff --git a/controllers/gce/controller/utils_test.go b/controllers/gce/controller/utils_test.go index 61438f607..196f3c937 100644 --- a/controllers/gce/controller/utils_test.go +++ b/controllers/gce/controller/utils_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -263,3 +265,43 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { func getProbePath(p *api_v1.Probe) string { return p.Handler.HTTPGet.Path } + +func TestAddInstanceGroupsAnnotation(t *testing.T) { + testCases := []struct { + Igs []*compute.InstanceGroup + ExpectedAnnotation string + }{ + { + // Single zone. + []*compute.InstanceGroup{&compute.InstanceGroup{ + Name: "ig-name", + Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b", + }}, + `[{"Name":"ig-name","Zone":"https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b"}]`, + }, + { + // Multiple zones. + []*compute.InstanceGroup{ + &compute.InstanceGroup{ + Name: "ig-name-1", + Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b", + }, + &compute.InstanceGroup{ + Name: "ig-name-2", + Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-a", + }, + }, + `[{"Name":"ig-name-1","Zone":"https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b"},{"Name":"ig-name-2","Zone":"https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-a"}]`, + }, + } + for _, c := range testCases { + annotations := map[string]string{} + err := setInstanceGroupsAnnotation(annotations, c.Igs) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if annotations[instanceGroupsAnnotationKey] != c.ExpectedAnnotation { + t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsAnnotationKey], c.ExpectedAnnotation) + } + } +} diff --git a/controllers/gce/instances/instances.go b/controllers/gce/instances/instances.go index 94664b065..fd54de034 100644 --- a/controllers/gce/instances/instances.go +++ b/controllers/gce/instances/instances.go @@ -63,8 +63,7 @@ func (i *Instances) Init(zl zoneLister) { // all of which have the exact same named port. func (i *Instances) AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) { igs := []*compute.InstanceGroup{} - // TODO: move port naming to namer - namedPort := &compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port} + namedPort := utils.GetNamedPort(port) zones, err := i.ListZones() if err != nil { diff --git a/controllers/gce/instances/utils.go b/controllers/gce/instances/utils.go new file mode 100644 index 000000000..934311c66 --- /dev/null +++ b/controllers/gce/instances/utils.go @@ -0,0 +1,13 @@ +package instances + +import ( + compute "google.golang.org/api/compute/v1" + + "k8s.io/ingress/controllers/gce/utils" +) + +// Helper method to create instance groups. +// This method exists to ensure that we are using the same logic at all places. +func EnsureInstanceGroupsAndPorts(nodePool NodePool, namer *utils.Namer, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) { + return nodePool.AddInstanceGroup(namer.IGName(), port) +} diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index e8ce54228..15ded2562 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -169,7 +169,7 @@ func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { // Lazily create a default backend so we don't tax users who don't care // about Ingress by consuming 1 of their 3 GCE BackendServices. This // BackendService is GC'd when there are no more Ingresses. - if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil { + if err := l.defaultBackendPool.Add(l.defaultBackendNodePort, nil); err != nil { return err } defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort.Port) diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index 237943a3a..c0486f557 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -353,3 +353,9 @@ func CompareLinks(l1, l2 string) bool { // FakeIngressRuleValueMap is a convenience type used by multiple submodules // that share the same testing methods. type FakeIngressRuleValueMap map[string]string + +// GetNamedPort creates the NamedPort API object for the given port. +func GetNamedPort(port int64) *compute.NamedPort { + // TODO: move port naming to namer + return &compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port} +}