diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index b307f8e33..a697748bd 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -107,7 +107,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) { return be, nil } -func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { +func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) { // Create a new health check if err := b.healthChecker.Add(namedPort.Port, ""); err != nil { return nil, err @@ -120,11 +120,7 @@ func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPor backend := &compute.BackendService{ Name: name, Protocol: "HTTP", - Backends: []*compute.Backend{ - { - Group: ig.SelfLink, - }, - }, + Backends: getBackendsForIGs(igs), // Api expects one, means little to kubernetes. HealthChecks: []string{hc.SelfLink}, Port: namedPort.Port, @@ -143,20 +139,20 @@ func (b *Backends) Add(port int64) error { be := &compute.BackendService{} defer func() { b.snapshotter.Add(portKey(port), be) }() - ig, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port) + igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port) if err != nil { return err } be, _ = b.Get(port) if be == nil { - glog.Infof("Creating backend for instance group %v port %v named port %v", - ig.Name, port, namedPort) - be, err = b.create(ig, namedPort, b.namer.BeName(port)) + glog.Infof("Creating backend for %d instance groups, port %v named port %v", + len(igs), port, namedPort) + be, err = b.create(igs, namedPort, b.namer.BeName(port)) if err != nil { return err } } - if err := b.edgeHop(be, ig); err != nil { + if err := b.edgeHop(be, igs); err != nil { return err } return err @@ -201,18 +197,31 @@ func (b *Backends) List() ([]interface{}, error) { return interList, nil } +func getBackendsForIGs(igs []*compute.InstanceGroup) []*compute.Backend { + backends := []*compute.Backend{} + for _, ig := range igs { + backends = append(backends, &compute.Backend{Group: ig.SelfLink}) + } + return backends +} + // edgeHop checks the links of the given backend by executing an edge hop. // It fixes broken links. -func (b *Backends) edgeHop(be *compute.BackendService, ig *compute.InstanceGroup) error { - if len(be.Backends) == 1 && - utils.CompareLinks(be.Backends[0].Group, ig.SelfLink) { +func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error { + beIGs := sets.String{} + for _, beToIG := range be.Backends { + beIGs.Insert(beToIG.Group) + } + igLinks := sets.String{} + for _, igToBE := range igs { + igLinks.Insert(igToBE.SelfLink) + } + if igLinks.Equal(beIGs) { return nil } - glog.Infof("Backend %v has a broken edge, adding link to %v", - be.Name, ig.Name) - be.Backends = []*compute.Backend{ - {Group: ig.SelfLink}, - } + glog.Infof("Backend %v has a broken edge, expected igs %+v, current igs %+v", + be.Name, igLinks.List(), beIGs.List()) + be.Backends = getBackendsForIGs(igs) if err := b.cloud.UpdateBackendService(be); err != nil { return err } diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 860c8e20a..65f1b6a92 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -76,6 +76,11 @@ type ClusterManager struct { firewallPool firewalls.SingleFirewallPool } +func (c *ClusterManager) Init(tr *GCETranslator) { + c.instancePool.Init(tr) + // TODO: Initialize other members as needed. +} + // IsHealthy returns an error if the cluster manager is unhealthy. func (c *ClusterManager) IsHealthy() (err error) { // TODO: Expand on this, for now we just want to detect when the GCE client @@ -148,7 +153,6 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName if err := c.firewallPool.Sync(fwNodePorts, nodeNames); err != nil { return err } - return nil } diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index 911c71dda..fd649bc27 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -30,7 +30,6 @@ import ( const ( testDefaultBeNodePort = int64(3000) - defaultZone = "default-zone" ) var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 50a3bb846..8c2de8aa5 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" @@ -37,6 +38,10 @@ import ( const ( allowHTTPKey = "kubernetes.io/ingress.allow-http" staticIPNameKey = "kubernetes.io/ingress.global-static-ip-name" + + // Label key to denote which GCE zone a Kubernetes node is in. + zoneKey = "failure-domain.beta.kubernetes.io/zone" + defaultZone = "" ) // ingAnnotations represents Ingress annotations. @@ -315,3 +320,40 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 { } return knownPorts } + +func getZone(n api.Node) string { + zone, ok := n.Labels[zoneKey] + if !ok { + return defaultZone + } + return zone +} + +// GetZoneForNode returns the zone for a given node by looking up its zone label. +func (t *GCETranslator) GetZoneForNode(name string) (string, error) { + nodes, err := t.nodeLister.NodeCondition(nodeReady).List() + if err != nil { + return "", err + } + for _, n := range nodes.Items { + if n.Name == name { + // TODO: Make this more resilient to label changes by listing + // cloud nodes and figuring out zone. + return getZone(n), nil + } + } + return "", fmt.Errorf("Node not found %v", name) +} + +// ListZones returns a list of zones this Kubernetes cluster spans. +func (t *GCETranslator) ListZones() ([]string, error) { + zones := sets.String{} + readyNodes, err := t.nodeLister.NodeCondition(nodeReady).List() + if err != nil { + return zones.List(), err + } + for _, n := range readyNodes.Items { + zones.Insert(getZone(n)) + } + return zones.List(), nil +} diff --git a/controllers/gce/instances/instances.go b/controllers/gce/instances/instances.go index 49e4d222d..54f17c33f 100644 --- a/controllers/gce/instances/instances.go +++ b/controllers/gce/instances/instances.go @@ -17,6 +17,7 @@ limitations under the License. package instances import ( + "fmt" "net/http" "strings" @@ -35,89 +36,162 @@ const ( // Instances implements NodePool. type Instances struct { - cloud InstanceGroups - zone string + cloud InstanceGroups + // zones is a list of zones seeded by Kubernetes node zones. + // TODO: we can figure this out. snapshotter storage.Snapshotter + zoneLister } // NewNodePool creates a new node pool. // - cloud: implements InstanceGroups, used to sync Kubernetes nodes with // members of the cloud InstanceGroup. -func NewNodePool(cloud InstanceGroups, zone string) NodePool { - glog.V(3).Infof("NodePool is only aware of instances in zone %v", zone) - return &Instances{cloud, zone, storage.NewInMemoryPool()} +func NewNodePool(cloud InstanceGroups, defaultZone string) NodePool { + return &Instances{cloud, storage.NewInMemoryPool(), nil} +} + +func (i *Instances) Init(zl zoneLister) { + i.zoneLister = zl } // AddInstanceGroup creates or gets an instance group if it doesn't exist -// and adds the given port to it. -func (i *Instances) AddInstanceGroup(name string, port int64) (*compute.InstanceGroup, *compute.NamedPort, error) { - ig, _ := i.Get(name) - if ig == nil { - glog.Infof("Creating instance group %v", name) +// and adds the given port to it. Returns a list of one instance group per zone, +// all of which have the exact same named port. +func (i *Instances) AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) { + igs := []*compute.InstanceGroup{} + namedPort := &compute.NamedPort{} + + zones, err := i.ListZones() + if err != nil { + return igs, namedPort, err + } + + for _, zone := range zones { + ig, _ := i.Get(name, zone) var err error - ig, err = i.cloud.CreateInstanceGroup(name, i.zone) + if ig == nil { + glog.Infof("Creating instance group %v in zone %v", name, zone) + ig, err = i.cloud.CreateInstanceGroup(name, zone) + if err != nil { + return nil, nil, err + } + } else { + glog.V(3).Infof("Instance group %v already exists in zone %v, adding port %d to it", name, zone, port) + } + defer i.snapshotter.Add(name, struct{}{}) + namedPort, err = i.cloud.AddPortToInstanceGroup(ig, port) if err != nil { return nil, nil, err } - } else { - glog.V(3).Infof("Instance group already exists %v", name) + igs = append(igs, ig) } - defer i.snapshotter.Add(name, ig) - namedPort, err := i.cloud.AddPortToInstanceGroup(ig, port) - if err != nil { - return nil, nil, err - } - - return ig, namedPort, nil + return igs, namedPort, nil } -// DeleteInstanceGroup deletes the given IG by name. +// DeleteInstanceGroup deletes the given IG by name, from all zones. func (i *Instances) DeleteInstanceGroup(name string) error { defer i.snapshotter.Delete(name) - return i.cloud.DeleteInstanceGroup(name, i.zone) + errs := []error{} + + zones, err := i.ListZones() + if err != nil { + return err + } + for _, zone := range zones { + glog.Infof("deleting instance group %v in zone %v", name, zone) + if err := i.cloud.DeleteInstanceGroup(name, zone); err != nil { + errs = append(errs, err) + } + } + if len(errs) == 0 { + return nil + } + return fmt.Errorf("%v", errs) } +// list lists all instances in all zones. func (i *Instances) list(name string) (sets.String, error) { nodeNames := sets.NewString() - instances, err := i.cloud.ListInstancesInInstanceGroup( - name, i.zone, allInstances) + zones, err := i.ListZones() if err != nil { return nodeNames, err } - for _, ins := range instances.Items { - // TODO: If round trips weren't so slow one would be inclided - // to GetInstance using this url and get the name. - parts := strings.Split(ins.Instance, "/") - nodeNames.Insert(parts[len(parts)-1]) + + for _, zone := range zones { + instances, err := i.cloud.ListInstancesInInstanceGroup( + name, zone, allInstances) + if err != nil { + return nodeNames, err + } + for _, ins := range instances.Items { + // TODO: If round trips weren't so slow one would be inclided + // to GetInstance using this url and get the name. + parts := strings.Split(ins.Instance, "/") + nodeNames.Insert(parts[len(parts)-1]) + } } return nodeNames, nil } // Get returns the Instance Group by name. -func (i *Instances) Get(name string) (*compute.InstanceGroup, error) { - ig, err := i.cloud.GetInstanceGroup(name, i.zone) +func (i *Instances) Get(name, zone string) (*compute.InstanceGroup, error) { + ig, err := i.cloud.GetInstanceGroup(name, zone) if err != nil { return nil, err } - i.snapshotter.Add(name, ig) + i.snapshotter.Add(name, struct{}{}) return ig, nil } -// Add adds the given instances to the Instance Group. -func (i *Instances) Add(groupName string, names []string) error { - glog.V(3).Infof("Adding nodes %v to %v", names, groupName) - return i.cloud.AddInstancesToInstanceGroup(groupName, i.zone, names) +func (i *Instances) splitNodesByZone(names []string) map[string][]string { + nodesByZone := map[string][]string{} + for _, name := range names { + zone, err := i.GetZoneForNode(name) + if err != nil { + glog.Errorf("Failed to get zones for %v: %v, skipping", name, err) + continue + } + if _, ok := nodesByZone[zone]; !ok { + nodesByZone[zone] = []string{} + } + nodesByZone[zone] = append(nodesByZone[zone], name) + } + return nodesByZone } -// Remove removes the given instances from the Instance Group. +// Add adds the given instances to the appropriately zoned Instance Group. +func (i *Instances) Add(groupName string, names []string) error { + errs := []error{} + for zone, nodeNames := range i.splitNodesByZone(names) { + glog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone) + if err := i.cloud.AddInstancesToInstanceGroup(groupName, zone, nodeNames); err != nil { + errs = append(errs, err) + } + } + if len(errs) == 0 { + return nil + } + return fmt.Errorf("%v", errs) +} + +// Remove removes the given instances from the appropriately zoned Instance Group. func (i *Instances) Remove(groupName string, names []string) error { - glog.V(3).Infof("Removing nodes %v from %v", names, groupName) - return i.cloud.RemoveInstancesFromInstanceGroup(groupName, i.zone, names) + errs := []error{} + for zone, nodeNames := range i.splitNodesByZone(names) { + glog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone) + if err := i.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, nodeNames); err != nil { + errs = append(errs, err) + } + } + if len(errs) == 0 { + return nil + } + return fmt.Errorf("%v", errs) } // Sync syncs kubernetes instances with the instances in the instance group. func (i *Instances) Sync(nodes []string) (err error) { - glog.V(3).Infof("Syncing nodes %v", nodes) + glog.V(1).Infof("Syncing nodes %v", nodes) defer func() { // The node pool is only responsible for syncing nodes to instance diff --git a/controllers/gce/instances/interfaces.go b/controllers/gce/instances/interfaces.go index 2bb2bf3b5..f7d03ed57 100644 --- a/controllers/gce/instances/interfaces.go +++ b/controllers/gce/instances/interfaces.go @@ -20,17 +20,26 @@ import ( compute "google.golang.org/api/compute/v1" ) +// zoneLister manages lookups for GCE instance groups/instances to zones. +type zoneLister interface { + ListZones() ([]string, error) + GetZoneForNode(name string) (string, error) +} + // NodePool is an interface to manage a pool of kubernetes nodes synced with vm instances in the cloud -// through the InstanceGroups interface. +// through the InstanceGroups interface. It handles zones opaquely using the zoneLister. type NodePool interface { - AddInstanceGroup(name string, port int64) (*compute.InstanceGroup, *compute.NamedPort, error) + Init(zl zoneLister) + + // The following 2 methods operate on instance groups. + AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) DeleteInstanceGroup(name string) error // TODO: Refactor for modularity Add(groupName string, nodeNames []string) error Remove(groupName string, nodeNames []string) error Sync(nodeNames []string) error - Get(name string) (*compute.InstanceGroup, error) + Get(name, zone string) (*compute.InstanceGroup, error) } // InstanceGroups is an interface for managing gce instances groups, and the instances therein. diff --git a/controllers/gce/main.go b/controllers/gce/main.go index f4199147f..cbdb8b837 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -220,6 +220,7 @@ func main() { if clusterManager.ClusterNamer.ClusterName != "" { glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.ClusterName) } + clusterManager.Init(&controller.GCETranslator{lbc}) go registerHandlers(lbc) go handleSigterm(lbc, *deleteAllOnQuit)