diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 7d3116b76..902c32354 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -109,30 +109,34 @@ func (c *ClusterManager) shutdown() error { } // Checkpoint performs a checkpoint with the cloud. -// - lbNames are the names of L7 loadbalancers we wish to exist. If they already +// - lbs are the single cluster L7 loadbalancers we wish to exist. If they already // exist, they should not have any broken links between say, a UrlMap and // TargetHttpProxy. // - nodeNames are the names of nodes we wish to add to all loadbalancer // instance groups. -// - nodePorts are the ports for which we require BackendServices. Each of -// these ports must also be opened on the corresponding Instance Group. +// - backendServicePorts are the ports for which we require BackendServices. +// - namedPorts are the ports which must be opened on instance groups. // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. -func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []backends.ServicePort) error { +func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) error { + if len(namedPorts) != 0 { + // Add the default backend node port to the list of named ports for instance groups. + namedPorts = append(namedPorts, c.defaultBackendNodePort) + } // Multiple ingress paths can point to the same service (and hence nodePort) // but each nodePort can only have one set of cloud resources behind it. So // don't waste time double validating GCE BackendServices. - portMap := map[int64]backends.ServicePort{} - for _, p := range nodePorts { - portMap[p.Port] = p - } - nodePorts = []backends.ServicePort{} - for _, sp := range portMap { - nodePorts = append(nodePorts, sp) - } - if err := c.backendPool.Sync(nodePorts); err != nil { + namedPorts = uniq(namedPorts) + backendServicePorts = uniq(backendServicePorts) + // Create Instance Groups. + _, err := c.CreateInstanceGroups(namedPorts) + if err != nil { return err } + if err := c.backendPool.Sync(backendServicePorts); err != nil { + return err + } + if err := c.SyncNodesInInstanceGroups(nodeNames); err != nil { return err } @@ -143,7 +147,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName // TODO: Manage default backend and its firewall rule in a centralized way. // DefaultBackend is managed in l7 pool, which doesn't understand instances, // which the firewall rule requires. - fwNodePorts := nodePorts + fwNodePorts := backendServicePorts if len(lbs) != 0 { // If there are no Ingresses, we shouldn't be allowing traffic to the // default backend. Equally importantly if the cluster gets torn down @@ -166,6 +170,11 @@ func (c *ClusterManager) CreateInstanceGroups(servicePorts []backends.ServicePor var igs []*compute.InstanceGroup var err error for _, p := range servicePorts { + // CreateInstanceGroups always returns all the instance groups, so we can return + // the output of any call, no need to append the return from all calls. + // TODO: Ideally, we want to call CreateInstaceGroups only the first time and + // then call AddNamedPort multiple times. Need to update the interface to + // achieve this. igs, _, err = instances.CreateInstanceGroups(c.instancePool, c.ClusterNamer, p.Port) if err != nil { return nil, err diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index 663e42961..0bf1a69b5 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/ingress/controllers/gce/backends" "k8s.io/ingress/controllers/gce/loadbalancers" ) @@ -275,13 +276,19 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { } glog.V(3).Infof("Syncing %v", key) - ingresses, err := lbc.ingLister.List() + allIngresses, err := lbc.ingLister.ListAll() if err != nil { return err } - nodePorts := lbc.tr.toNodePorts(&ingresses) + gceIngresses, err := lbc.ingLister.ListGCEIngresses() + if err != nil { + return err + } + + allNodePorts := lbc.tr.toNodePorts(&allIngresses) + gceNodePorts := lbc.tr.toNodePorts(&gceIngresses) lbNames := lbc.ingLister.Store.ListKeys() - lbs, err := lbc.ListRuntimeInfo() + lbs, err := lbc.ListGCERuntimeInfo() if err != nil { return err } @@ -307,22 +314,15 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { var syncError error defer func() { - if deferErr := lbc.CloudClusterManager.GC(lbNames, nodePorts); deferErr != nil { + if deferErr := lbc.CloudClusterManager.GC(lbNames, allNodePorts); deferErr != nil { err = fmt.Errorf("error during sync %v, error during GC %v", syncError, deferErr) } glog.V(3).Infof("Finished syncing %v", key) }() - if ingExists { - ing := obj.(*extensions.Ingress) - if isGCEMultiClusterIngress(ing) { - return lbc.syncMultiClusterIngress(ing, nodeNames) - } - } - // Record any errors during sync and throw a single error at the end. This // allows us to free up associated cloud resources ASAP. - if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil { + if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts); err != nil { // TODO: Implement proper backoff for the queue. eventMsg := "GCE" if ingExists { @@ -336,6 +336,29 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if !ingExists { return syncError } + ing := obj.(*extensions.Ingress) + if isGCEMultiClusterIngress(ing) { + // Add instance group names as annotation on the ingress. + if ing.Annotations == nil { + ing.Annotations = map[string]string{} + } + // Since we just created instance groups in Checkpoint, calling create + // instance groups again should just return names of the existing + // instance groups. It does not matter which nodePort we pass as argument. + igs, err := lbc.CloudClusterManager.CreateInstanceGroups([]backends.ServicePort{allNodePorts[0]}) + if err != nil { + return fmt.Errorf("error in creating instance groups: %v", err) + } + err = setInstanceGroupsAnnotation(ing.Annotations, igs) + if err != nil { + return err + } + if err := lbc.updateAnnotations(ing.Name, ing.Namespace, ing.Annotations); err != nil { + return err + } + return nil + } + // Update the UrlMap of the single loadbalancer that came through the watch. l7, err := lbc.CloudClusterManager.l7Pool.Get(key) if err != nil { @@ -343,52 +366,18 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { return syncError } - ing := *obj.(*extensions.Ingress) - if urlMap, err := lbc.tr.toURLMap(&ing); err != nil { + if urlMap, err := lbc.tr.toURLMap(ing); err != nil { syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err) } else if err := l7.UpdateUrlMap(urlMap); err != nil { - lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error()) + lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error()) syncError = fmt.Errorf("%v, update url map error: %v", syncError, err) - } else if err := lbc.updateIngressStatus(l7, ing); err != nil { - lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error()) + } else if err := lbc.updateIngressStatus(l7, *ing); err != nil { + lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error()) syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err) } return syncError } -func (lbc *LoadBalancerController) syncMultiClusterIngress(ing *extensions.Ingress, nodeNames []string) error { - // For multi cluster ingress, we only need to manage the instance groups and named ports on those instance groups. - - // Ensure that all the required instance groups exist with the required node ports. - nodePorts := lbc.tr.ingressToNodePorts(ing) - // Add the default backend node port. - nodePorts = append(nodePorts, lbc.CloudClusterManager.defaultBackendNodePort) - igs, err := lbc.CloudClusterManager.CreateInstanceGroups(nodePorts) - if err != nil { - return err - } - - // Ensure that instance groups have the right nodes. - // This is also done whenever a node is added or removed from the cluster. - // We need it here as well since instance group is not created until first ingress is observed. - if err := lbc.CloudClusterManager.SyncNodesInInstanceGroups(nodeNames); err != nil { - return err - } - - // Add instance group names as annotation on the ingress. - if ing.Annotations == nil { - ing.Annotations = map[string]string{} - } - err = setInstanceGroupsAnnotation(ing.Annotations, igs) - if err != nil { - return err - } - if err := lbc.updateAnnotations(ing.Name, ing.Namespace, ing.Annotations); err != nil { - return err - } - return nil -} - // updateIngressStatus updates the IP and annotations of a loadbalancer. // The annotations are parsed by kubectl describe. func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error { @@ -443,9 +432,10 @@ func (lbc *LoadBalancerController) updateAnnotations(name, namespace string, ann return nil } -// ListRuntimeInfo lists L7RuntimeInfo as understood by the loadbalancer module. -func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7RuntimeInfo, err error) { - ingList, err := lbc.ingLister.List() +// ListGCERuntimeInfo lists L7RuntimeInfo as understood by the loadbalancer module. +// It returns runtime info only for gce ingresses and not for multi cluster ingresses. +func (lbc *LoadBalancerController) ListGCERuntimeInfo() (lbs []*loadbalancers.L7RuntimeInfo, err error) { + ingList, err := lbc.ingLister.ListGCEIngresses() if err != nil { return lbs, err } diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index c025ae3d8..075865776 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -300,8 +300,19 @@ func ListAll(store cache.Store, selector labels.Selector, appendFn cache.AppendF return nil } -// List lists all Ingress' in the store. -func (s *StoreToIngressLister) List() (ing extensions.IngressList, err error) { +// List lists all Ingress' in the store (both single and multi cluster ingresses). +func (s *StoreToIngressLister) ListAll() (ing extensions.IngressList, err error) { + for _, m := range s.Store.List() { + newIng := m.(*extensions.Ingress) + if isGCEIngress(newIng) || isGCEMultiClusterIngress(newIng) { + ing.Items = append(ing.Items, *newIng) + } + } + return ing, nil +} + +// ListGCEIngresses lists all GCE Ingress' in the store. +func (s *StoreToIngressLister) ListGCEIngresses() (ing extensions.IngressList, err error) { for _, m := range s.Store.List() { newIng := m.(*extensions.Ingress) if isGCEIngress(newIng) { @@ -680,3 +691,16 @@ func setInstanceGroupsAnnotation(existing map[string]string, igs []*compute.Inst existing[instanceGroupsAnnotationKey] = string(jsonValue) return nil } + +// uniq returns an array of unique service ports from the given array. +func uniq(nodePorts []backends.ServicePort) []backends.ServicePort { + portMap := map[int64]backends.ServicePort{} + for _, p := range nodePorts { + portMap[p.Port] = p + } + nodePorts = []backends.ServicePort{} + for _, sp := range portMap { + nodePorts = append(nodePorts, sp) + } + return nodePorts +} diff --git a/controllers/gce/controller/utils_test.go b/controllers/gce/controller/utils_test.go index 5a3523a62..196f3c937 100644 --- a/controllers/gce/controller/utils_test.go +++ b/controllers/gce/controller/utils_test.go @@ -300,8 +300,8 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if annotations[instanceGroupsKey] != c.ExpectedAnnotation { - t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsKey], c.ExpectedAnnotation) + if annotations[instanceGroupsAnnotationKey] != c.ExpectedAnnotation { + t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsAnnotationKey], c.ExpectedAnnotation) } } }