Adding logic to GCE ingress controller to handle multi cluster ingresses
This commit is contained in:
parent
7434c50cdb
commit
7d87f02b1f
6 changed files with 195 additions and 31 deletions
|
@ -217,7 +217,7 @@ func (b *Backends) Add(p ServicePort) error {
|
|||
be := &compute.BackendService{}
|
||||
defer func() { b.snapshotter.Add(portKey(p.Port), be) }()
|
||||
|
||||
igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port)
|
||||
igs, namedPort, err := instances.CreateInstanceGroups(b.nodePool, b.namer, p.Port)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||
|
||||
"k8s.io/ingress/controllers/gce/backends"
|
||||
|
@ -132,7 +133,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
|
|||
if err := c.backendPool.Sync(nodePorts); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.instancePool.Sync(nodeNames); err != nil {
|
||||
if err := c.SyncNodesInInstanceGroups(nodeNames); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.l7Pool.Sync(lbs); err != nil {
|
||||
|
@ -161,6 +162,25 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *ClusterManager) CreateInstanceGroups(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
|
||||
var igs []*compute.InstanceGroup
|
||||
var err error
|
||||
for _, p := range servicePorts {
|
||||
igs, _, err = instances.CreateInstanceGroups(c.instancePool, c.ClusterNamer, p.Port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return igs, nil
|
||||
}
|
||||
|
||||
func (c *ClusterManager) SyncNodesInInstanceGroups(nodeNames []string) error {
|
||||
if err := c.instancePool.Sync(nodeNames); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GC garbage collects unused resources.
|
||||
// - lbNames are the names of L7 loadbalancers we wish to exist. Those not in
|
||||
// this list are removed from the cloud.
|
||||
|
|
|
@ -149,7 +149,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerC
|
|||
ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
addIng := obj.(*extensions.Ingress)
|
||||
if !isGCEIngress(addIng) {
|
||||
if !isGCEIngress(addIng) && !isGCEMultiClusterIngress(addIng) {
|
||||
glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, ingressClassKey)
|
||||
return
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerC
|
|||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
delIng := obj.(*extensions.Ingress)
|
||||
if !isGCEIngress(delIng) {
|
||||
if !isGCEIngress(delIng) && !isGCEMultiClusterIngress(delIng) {
|
||||
glog.Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, ingressClassKey)
|
||||
return
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerC
|
|||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
curIng := cur.(*extensions.Ingress)
|
||||
if !isGCEIngress(curIng) {
|
||||
if !isGCEIngress(curIng) && !isGCEMultiClusterIngress(curIng) {
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(old, cur) {
|
||||
|
@ -313,6 +313,13 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
glog.V(3).Infof("Finished syncing %v", key)
|
||||
}()
|
||||
|
||||
if ingExists {
|
||||
ing := obj.(*extensions.Ingress)
|
||||
if isGCEMultiClusterIngress(ing) {
|
||||
return lbc.syncMultiClusterIngress(ing, nodeNames)
|
||||
}
|
||||
}
|
||||
|
||||
// Record any errors during sync and throw a single error at the end. This
|
||||
// allows us to free up associated cloud resources ASAP.
|
||||
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil {
|
||||
|
@ -349,6 +356,39 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
return syncError
|
||||
}
|
||||
|
||||
func (lbc *LoadBalancerController) syncMultiClusterIngress(ing *extensions.Ingress, nodeNames []string) error {
|
||||
// For multi cluster ingress, we only need to manage the instance groups and named ports on those instance groups.
|
||||
|
||||
// Ensure that all the required instance groups exist with the required node ports.
|
||||
nodePorts := lbc.tr.ingressToNodePorts(ing)
|
||||
// Add the default backend node port.
|
||||
nodePorts = append(nodePorts, lbc.CloudClusterManager.defaultBackendNodePort)
|
||||
igs, err := lbc.CloudClusterManager.CreateInstanceGroups(nodePorts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure that instance groups have the right nodes.
|
||||
// This is also done whenever a node is added or removed from the cluster.
|
||||
// We need it here as well since instance group is not created until first ingress is observed.
|
||||
if err := lbc.CloudClusterManager.SyncNodesInInstanceGroups(nodeNames); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add instance group names as annotation on the ingress.
|
||||
if ing.Annotations == nil {
|
||||
ing.Annotations = map[string]string{}
|
||||
}
|
||||
err = setInstanceGroupsAnnotation(ing.Annotations, igs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := lbc.updateAnnotations(ing.Name, ing.Namespace, ing.Annotations); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateIngressStatus updates the IP and annotations of a loadbalancer.
|
||||
// The annotations are parsed by kubectl describe.
|
||||
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
|
||||
|
@ -379,14 +419,23 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
|
|||
lbc.recorder.Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip)
|
||||
}
|
||||
}
|
||||
annotations := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool)
|
||||
if err := lbc.updateAnnotations(ing.Name, ing.Namespace, annotations); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lbc *LoadBalancerController) updateAnnotations(name, namespace string, annotations map[string]string) error {
|
||||
// Update annotations through /update endpoint
|
||||
currIng, err = ingClient.Get(ing.Name, metav1.GetOptions{})
|
||||
ingClient := lbc.client.Extensions().Ingresses(namespace)
|
||||
currIng, err := ingClient.Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
currIng.Annotations = loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool)
|
||||
if !reflect.DeepEqual(ing.Annotations, currIng.Annotations) {
|
||||
glog.V(3).Infof("Updating annotations of %v/%v", ing.Namespace, ing.Name)
|
||||
if !reflect.DeepEqual(currIng.Annotations, annotations) {
|
||||
glog.V(3).Infof("Updating annotations of %v/%v", namespace, name)
|
||||
currIng.Annotations = annotations
|
||||
if _, err := ingClient.Update(currIng); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
|
||||
api_v1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
|
@ -77,10 +78,17 @@ const (
|
|||
// to either gceIngessClass or the empty string.
|
||||
ingressClassKey = "kubernetes.io/ingress.class"
|
||||
gceIngressClass = "gce"
|
||||
gceMultiIngressClass = "gce-multi-cluster"
|
||||
|
||||
// Label key to denote which GCE zone a Kubernetes node is in.
|
||||
zoneKey = "failure-domain.beta.kubernetes.io/zone"
|
||||
defaultZone = ""
|
||||
|
||||
// instanceGroupsAnnotationKey is the annotation key used by controller to
|
||||
// specify the name and zone of instance groups created for the ingress.
|
||||
// This is read only for users. Controller will overrite any user updates.
|
||||
// This is only set for ingresses with ingressClass = "gce-multi-cluster"
|
||||
instanceGroupsAnnotationKey = "ingress.gcp.kubernetes.io/instance-groups"
|
||||
)
|
||||
|
||||
// ingAnnotations represents Ingress annotations.
|
||||
|
@ -156,6 +164,13 @@ func isGCEIngress(ing *extensions.Ingress) bool {
|
|||
return class == "" || class == gceIngressClass
|
||||
}
|
||||
|
||||
// isGCEMultiClusterIngress returns true if the given Ingress has
|
||||
// ingress.class annotation set to "gce-multi-cluster".
|
||||
func isGCEMultiClusterIngress(ing *extensions.Ingress) bool {
|
||||
class := ingAnnotations(ing.ObjectMeta.Annotations).ingressClass()
|
||||
return class == gceMultiIngressClass
|
||||
}
|
||||
|
||||
// errorNodePortNotFound is an implementation of error.
|
||||
type errorNodePortNotFound struct {
|
||||
backend extensions.IngressBackend
|
||||
|
@ -471,10 +486,18 @@ PortLoop:
|
|||
return p, nil
|
||||
}
|
||||
|
||||
// toNodePorts converts a pathlist to a flat list of nodeports.
|
||||
// toNodePorts is a helper method over ingressToNodePorts to process a list of ingresses.
|
||||
func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []backends.ServicePort {
|
||||
var knownPorts []backends.ServicePort
|
||||
for _, ing := range ings.Items {
|
||||
knownPorts = append(knownPorts, t.ingressToNodePorts(&ing)...)
|
||||
}
|
||||
return knownPorts
|
||||
}
|
||||
|
||||
// ingressToNodePorts converts a pathlist to a flat list of nodeports for the given ingress.
|
||||
func (t *GCETranslator) ingressToNodePorts(ing *extensions.Ingress) []backends.ServicePort {
|
||||
var knownPorts []backends.ServicePort
|
||||
defaultBackend := ing.Spec.Backend
|
||||
if defaultBackend != nil {
|
||||
port, err := t.getServiceNodePort(*defaultBackend, ing.Namespace)
|
||||
|
@ -487,18 +510,17 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []backends.Ser
|
|||
for _, rule := range ing.Spec.Rules {
|
||||
if rule.HTTP == nil {
|
||||
glog.Errorf("ignoring non http Ingress rule")
|
||||
continue
|
||||
return knownPorts
|
||||
}
|
||||
for _, path := range rule.HTTP.Paths {
|
||||
port, err := t.getServiceNodePort(path.Backend, ing.Namespace)
|
||||
if err != nil {
|
||||
glog.Infof("%v", err)
|
||||
continue
|
||||
return knownPorts
|
||||
}
|
||||
knownPorts = append(knownPorts, port)
|
||||
}
|
||||
}
|
||||
}
|
||||
return knownPorts
|
||||
}
|
||||
|
||||
|
@ -640,3 +662,21 @@ func (o PodsByCreationTimestamp) Less(i, j int) bool {
|
|||
}
|
||||
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
|
||||
}
|
||||
|
||||
// setInstanceGroupsAnnotation sets the instance-groups annotation with names of the given instance groups.
|
||||
func setInstanceGroupsAnnotation(existing map[string]string, igs []*compute.InstanceGroup) error {
|
||||
type Value struct {
|
||||
Name string
|
||||
Zone string
|
||||
}
|
||||
instanceGroups := []Value{}
|
||||
for _, ig := range igs {
|
||||
instanceGroups = append(instanceGroups, Value{Name: ig.Name, Zone: ig.Zone})
|
||||
}
|
||||
jsonValue, err := json.Marshal(instanceGroups)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
existing[instanceGroupsAnnotationKey] = string(jsonValue)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
|
||||
api_v1 "k8s.io/api/core/v1"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
|
@ -263,3 +265,43 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) {
|
|||
func getProbePath(p *api_v1.Probe) string {
|
||||
return p.Handler.HTTPGet.Path
|
||||
}
|
||||
|
||||
func TestAddInstanceGroupsAnnotation(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Igs []*compute.InstanceGroup
|
||||
ExpectedAnnotation string
|
||||
}{
|
||||
{
|
||||
// Single zone.
|
||||
[]*compute.InstanceGroup{&compute.InstanceGroup{
|
||||
Name: "ig-name",
|
||||
Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b",
|
||||
}},
|
||||
`[{"Name":"ig-name","Zone":"https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b"}]`,
|
||||
},
|
||||
{
|
||||
// Multiple zones.
|
||||
[]*compute.InstanceGroup{
|
||||
&compute.InstanceGroup{
|
||||
Name: "ig-name-1",
|
||||
Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b",
|
||||
},
|
||||
&compute.InstanceGroup{
|
||||
Name: "ig-name-2",
|
||||
Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-a",
|
||||
},
|
||||
},
|
||||
`[{"Name":"ig-name-1","Zone":"https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b"},{"Name":"ig-name-2","Zone":"https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-a"}]`,
|
||||
},
|
||||
}
|
||||
for _, c := range testCases {
|
||||
annotations := map[string]string{}
|
||||
err := setInstanceGroupsAnnotation(annotations, c.Igs)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if annotations[instanceGroupsKey] != c.ExpectedAnnotation {
|
||||
t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsKey], c.ExpectedAnnotation)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
13
controllers/gce/instances/utils.go
Normal file
13
controllers/gce/instances/utils.go
Normal file
|
@ -0,0 +1,13 @@
|
|||
package instances
|
||||
|
||||
import (
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
|
||||
"k8s.io/ingress/controllers/gce/utils"
|
||||
)
|
||||
|
||||
// Helper method to create instance groups.
|
||||
// This method exists to ensure that we are using the same logic at all places.
|
||||
func CreateInstanceGroups(nodePool NodePool, namer *utils.Namer, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) {
|
||||
return nodePool.AddInstanceGroup(namer.IGName(), port)
|
||||
}
|
Loading…
Reference in a new issue