Refactor code to merge multi cluster ingress sync with single cluster ingress sync
This commit is contained in:
parent
7d87f02b1f
commit
0f4f5c97d4
4 changed files with 94 additions and 71 deletions
|
@ -109,30 +109,34 @@ func (c *ClusterManager) shutdown() error {
|
|||
}
|
||||
|
||||
// Checkpoint performs a checkpoint with the cloud.
|
||||
// - lbNames are the names of L7 loadbalancers we wish to exist. If they already
|
||||
// - lbs are the single cluster L7 loadbalancers we wish to exist. If they already
|
||||
// exist, they should not have any broken links between say, a UrlMap and
|
||||
// TargetHttpProxy.
|
||||
// - nodeNames are the names of nodes we wish to add to all loadbalancer
|
||||
// instance groups.
|
||||
// - nodePorts are the ports for which we require BackendServices. Each of
|
||||
// these ports must also be opened on the corresponding Instance Group.
|
||||
// - backendServicePorts are the ports for which we require BackendServices.
|
||||
// - namedPorts are the ports which must be opened on instance groups.
|
||||
// If in performing the checkpoint the cluster manager runs out of quota, a
|
||||
// googleapi 403 is returned.
|
||||
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []backends.ServicePort) error {
|
||||
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) error {
|
||||
if len(namedPorts) != 0 {
|
||||
// Add the default backend node port to the list of named ports for instance groups.
|
||||
namedPorts = append(namedPorts, c.defaultBackendNodePort)
|
||||
}
|
||||
// Multiple ingress paths can point to the same service (and hence nodePort)
|
||||
// but each nodePort can only have one set of cloud resources behind it. So
|
||||
// don't waste time double validating GCE BackendServices.
|
||||
portMap := map[int64]backends.ServicePort{}
|
||||
for _, p := range nodePorts {
|
||||
portMap[p.Port] = p
|
||||
}
|
||||
nodePorts = []backends.ServicePort{}
|
||||
for _, sp := range portMap {
|
||||
nodePorts = append(nodePorts, sp)
|
||||
}
|
||||
if err := c.backendPool.Sync(nodePorts); err != nil {
|
||||
namedPorts = uniq(namedPorts)
|
||||
backendServicePorts = uniq(backendServicePorts)
|
||||
// Create Instance Groups.
|
||||
_, err := c.CreateInstanceGroups(namedPorts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.backendPool.Sync(backendServicePorts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.SyncNodesInInstanceGroups(nodeNames); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -143,7 +147,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
|
|||
// TODO: Manage default backend and its firewall rule in a centralized way.
|
||||
// DefaultBackend is managed in l7 pool, which doesn't understand instances,
|
||||
// which the firewall rule requires.
|
||||
fwNodePorts := nodePorts
|
||||
fwNodePorts := backendServicePorts
|
||||
if len(lbs) != 0 {
|
||||
// If there are no Ingresses, we shouldn't be allowing traffic to the
|
||||
// default backend. Equally importantly if the cluster gets torn down
|
||||
|
@ -166,6 +170,11 @@ func (c *ClusterManager) CreateInstanceGroups(servicePorts []backends.ServicePor
|
|||
var igs []*compute.InstanceGroup
|
||||
var err error
|
||||
for _, p := range servicePorts {
|
||||
// CreateInstanceGroups always returns all the instance groups, so we can return
|
||||
// the output of any call, no need to append the return from all calls.
|
||||
// TODO: Ideally, we want to call CreateInstaceGroups only the first time and
|
||||
// then call AddNamedPort multiple times. Need to update the interface to
|
||||
// achieve this.
|
||||
igs, _, err = instances.CreateInstanceGroups(c.instancePool, c.ClusterNamer, p.Port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"k8s.io/ingress/controllers/gce/backends"
|
||||
"k8s.io/ingress/controllers/gce/loadbalancers"
|
||||
)
|
||||
|
||||
|
@ -275,13 +276,19 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
}
|
||||
glog.V(3).Infof("Syncing %v", key)
|
||||
|
||||
ingresses, err := lbc.ingLister.List()
|
||||
allIngresses, err := lbc.ingLister.ListAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nodePorts := lbc.tr.toNodePorts(&ingresses)
|
||||
gceIngresses, err := lbc.ingLister.ListGCEIngresses()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allNodePorts := lbc.tr.toNodePorts(&allIngresses)
|
||||
gceNodePorts := lbc.tr.toNodePorts(&gceIngresses)
|
||||
lbNames := lbc.ingLister.Store.ListKeys()
|
||||
lbs, err := lbc.ListRuntimeInfo()
|
||||
lbs, err := lbc.ListGCERuntimeInfo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -307,22 +314,15 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
|
||||
var syncError error
|
||||
defer func() {
|
||||
if deferErr := lbc.CloudClusterManager.GC(lbNames, nodePorts); deferErr != nil {
|
||||
if deferErr := lbc.CloudClusterManager.GC(lbNames, allNodePorts); deferErr != nil {
|
||||
err = fmt.Errorf("error during sync %v, error during GC %v", syncError, deferErr)
|
||||
}
|
||||
glog.V(3).Infof("Finished syncing %v", key)
|
||||
}()
|
||||
|
||||
if ingExists {
|
||||
ing := obj.(*extensions.Ingress)
|
||||
if isGCEMultiClusterIngress(ing) {
|
||||
return lbc.syncMultiClusterIngress(ing, nodeNames)
|
||||
}
|
||||
}
|
||||
|
||||
// Record any errors during sync and throw a single error at the end. This
|
||||
// allows us to free up associated cloud resources ASAP.
|
||||
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil {
|
||||
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts); err != nil {
|
||||
// TODO: Implement proper backoff for the queue.
|
||||
eventMsg := "GCE"
|
||||
if ingExists {
|
||||
|
@ -336,6 +336,29 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
if !ingExists {
|
||||
return syncError
|
||||
}
|
||||
ing := obj.(*extensions.Ingress)
|
||||
if isGCEMultiClusterIngress(ing) {
|
||||
// Add instance group names as annotation on the ingress.
|
||||
if ing.Annotations == nil {
|
||||
ing.Annotations = map[string]string{}
|
||||
}
|
||||
// Since we just created instance groups in Checkpoint, calling create
|
||||
// instance groups again should just return names of the existing
|
||||
// instance groups. It does not matter which nodePort we pass as argument.
|
||||
igs, err := lbc.CloudClusterManager.CreateInstanceGroups([]backends.ServicePort{allNodePorts[0]})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in creating instance groups: %v", err)
|
||||
}
|
||||
err = setInstanceGroupsAnnotation(ing.Annotations, igs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := lbc.updateAnnotations(ing.Name, ing.Namespace, ing.Annotations); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the UrlMap of the single loadbalancer that came through the watch.
|
||||
l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
|
||||
if err != nil {
|
||||
|
@ -343,52 +366,18 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
return syncError
|
||||
}
|
||||
|
||||
ing := *obj.(*extensions.Ingress)
|
||||
if urlMap, err := lbc.tr.toURLMap(&ing); err != nil {
|
||||
if urlMap, err := lbc.tr.toURLMap(ing); err != nil {
|
||||
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
|
||||
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
|
||||
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
|
||||
lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
|
||||
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
|
||||
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
|
||||
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error())
|
||||
} else if err := lbc.updateIngressStatus(l7, *ing); err != nil {
|
||||
lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error())
|
||||
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
|
||||
}
|
||||
return syncError
|
||||
}
|
||||
|
||||
func (lbc *LoadBalancerController) syncMultiClusterIngress(ing *extensions.Ingress, nodeNames []string) error {
|
||||
// For multi cluster ingress, we only need to manage the instance groups and named ports on those instance groups.
|
||||
|
||||
// Ensure that all the required instance groups exist with the required node ports.
|
||||
nodePorts := lbc.tr.ingressToNodePorts(ing)
|
||||
// Add the default backend node port.
|
||||
nodePorts = append(nodePorts, lbc.CloudClusterManager.defaultBackendNodePort)
|
||||
igs, err := lbc.CloudClusterManager.CreateInstanceGroups(nodePorts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure that instance groups have the right nodes.
|
||||
// This is also done whenever a node is added or removed from the cluster.
|
||||
// We need it here as well since instance group is not created until first ingress is observed.
|
||||
if err := lbc.CloudClusterManager.SyncNodesInInstanceGroups(nodeNames); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add instance group names as annotation on the ingress.
|
||||
if ing.Annotations == nil {
|
||||
ing.Annotations = map[string]string{}
|
||||
}
|
||||
err = setInstanceGroupsAnnotation(ing.Annotations, igs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := lbc.updateAnnotations(ing.Name, ing.Namespace, ing.Annotations); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateIngressStatus updates the IP and annotations of a loadbalancer.
|
||||
// The annotations are parsed by kubectl describe.
|
||||
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
|
||||
|
@ -443,9 +432,10 @@ func (lbc *LoadBalancerController) updateAnnotations(name, namespace string, ann
|
|||
return nil
|
||||
}
|
||||
|
||||
// ListRuntimeInfo lists L7RuntimeInfo as understood by the loadbalancer module.
|
||||
func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7RuntimeInfo, err error) {
|
||||
ingList, err := lbc.ingLister.List()
|
||||
// ListGCERuntimeInfo lists L7RuntimeInfo as understood by the loadbalancer module.
|
||||
// It returns runtime info only for gce ingresses and not for multi cluster ingresses.
|
||||
func (lbc *LoadBalancerController) ListGCERuntimeInfo() (lbs []*loadbalancers.L7RuntimeInfo, err error) {
|
||||
ingList, err := lbc.ingLister.ListGCEIngresses()
|
||||
if err != nil {
|
||||
return lbs, err
|
||||
}
|
||||
|
|
|
@ -300,8 +300,19 @@ func ListAll(store cache.Store, selector labels.Selector, appendFn cache.AppendF
|
|||
return nil
|
||||
}
|
||||
|
||||
// List lists all Ingress' in the store.
|
||||
func (s *StoreToIngressLister) List() (ing extensions.IngressList, err error) {
|
||||
// List lists all Ingress' in the store (both single and multi cluster ingresses).
|
||||
func (s *StoreToIngressLister) ListAll() (ing extensions.IngressList, err error) {
|
||||
for _, m := range s.Store.List() {
|
||||
newIng := m.(*extensions.Ingress)
|
||||
if isGCEIngress(newIng) || isGCEMultiClusterIngress(newIng) {
|
||||
ing.Items = append(ing.Items, *newIng)
|
||||
}
|
||||
}
|
||||
return ing, nil
|
||||
}
|
||||
|
||||
// ListGCEIngresses lists all GCE Ingress' in the store.
|
||||
func (s *StoreToIngressLister) ListGCEIngresses() (ing extensions.IngressList, err error) {
|
||||
for _, m := range s.Store.List() {
|
||||
newIng := m.(*extensions.Ingress)
|
||||
if isGCEIngress(newIng) {
|
||||
|
@ -680,3 +691,16 @@ func setInstanceGroupsAnnotation(existing map[string]string, igs []*compute.Inst
|
|||
existing[instanceGroupsAnnotationKey] = string(jsonValue)
|
||||
return nil
|
||||
}
|
||||
|
||||
// uniq returns an array of unique service ports from the given array.
|
||||
func uniq(nodePorts []backends.ServicePort) []backends.ServicePort {
|
||||
portMap := map[int64]backends.ServicePort{}
|
||||
for _, p := range nodePorts {
|
||||
portMap[p.Port] = p
|
||||
}
|
||||
nodePorts = []backends.ServicePort{}
|
||||
for _, sp := range portMap {
|
||||
nodePorts = append(nodePorts, sp)
|
||||
}
|
||||
return nodePorts
|
||||
}
|
||||
|
|
|
@ -300,8 +300,8 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if annotations[instanceGroupsKey] != c.ExpectedAnnotation {
|
||||
t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsKey], c.ExpectedAnnotation)
|
||||
if annotations[instanceGroupsAnnotationKey] != c.ExpectedAnnotation {
|
||||
t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsAnnotationKey], c.ExpectedAnnotation)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue