Teach l7 controller about zones

This commit is contained in:
Prashanth Balasubramanian 2016-05-21 17:46:09 -07:00
parent 74b66beda9
commit 32ac61e7e3
7 changed files with 202 additions and 64 deletions

View file

@ -107,7 +107,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
return be, nil
}
func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) {
func (b *Backends) create(igs []*compute.InstanceGroup, namedPort *compute.NamedPort, name string) (*compute.BackendService, error) {
// Create a new health check
if err := b.healthChecker.Add(namedPort.Port, ""); err != nil {
return nil, err
@ -120,11 +120,7 @@ func (b *Backends) create(ig *compute.InstanceGroup, namedPort *compute.NamedPor
backend := &compute.BackendService{
Name: name,
Protocol: "HTTP",
Backends: []*compute.Backend{
{
Group: ig.SelfLink,
},
},
Backends: getBackendsForIGs(igs),
// Api expects one, means little to kubernetes.
HealthChecks: []string{hc.SelfLink},
Port: namedPort.Port,
@ -143,20 +139,20 @@ func (b *Backends) Add(port int64) error {
be := &compute.BackendService{}
defer func() { b.snapshotter.Add(portKey(port), be) }()
ig, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port)
igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), port)
if err != nil {
return err
}
be, _ = b.Get(port)
if be == nil {
glog.Infof("Creating backend for instance group %v port %v named port %v",
ig.Name, port, namedPort)
be, err = b.create(ig, namedPort, b.namer.BeName(port))
glog.Infof("Creating backend for %d instance groups, port %v named port %v",
len(igs), port, namedPort)
be, err = b.create(igs, namedPort, b.namer.BeName(port))
if err != nil {
return err
}
}
if err := b.edgeHop(be, ig); err != nil {
if err := b.edgeHop(be, igs); err != nil {
return err
}
return err
@ -201,18 +197,31 @@ func (b *Backends) List() ([]interface{}, error) {
return interList, nil
}
func getBackendsForIGs(igs []*compute.InstanceGroup) []*compute.Backend {
backends := []*compute.Backend{}
for _, ig := range igs {
backends = append(backends, &compute.Backend{Group: ig.SelfLink})
}
return backends
}
// edgeHop checks the links of the given backend by executing an edge hop.
// It fixes broken links.
func (b *Backends) edgeHop(be *compute.BackendService, ig *compute.InstanceGroup) error {
if len(be.Backends) == 1 &&
utils.CompareLinks(be.Backends[0].Group, ig.SelfLink) {
func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error {
beIGs := sets.String{}
for _, beToIG := range be.Backends {
beIGs.Insert(beToIG.Group)
}
igLinks := sets.String{}
for _, igToBE := range igs {
igLinks.Insert(igToBE.SelfLink)
}
if igLinks.Equal(beIGs) {
return nil
}
glog.Infof("Backend %v has a broken edge, adding link to %v",
be.Name, ig.Name)
be.Backends = []*compute.Backend{
{Group: ig.SelfLink},
}
glog.Infof("Backend %v has a broken edge, expected igs %+v, current igs %+v",
be.Name, igLinks.List(), beIGs.List())
be.Backends = getBackendsForIGs(igs)
if err := b.cloud.UpdateBackendService(be); err != nil {
return err
}

View file

@ -76,6 +76,11 @@ type ClusterManager struct {
firewallPool firewalls.SingleFirewallPool
}
func (c *ClusterManager) Init(tr *GCETranslator) {
c.instancePool.Init(tr)
// TODO: Initialize other members as needed.
}
// IsHealthy returns an error if the cluster manager is unhealthy.
func (c *ClusterManager) IsHealthy() (err error) {
// TODO: Expand on this, for now we just want to detect when the GCE client
@ -148,7 +153,6 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
if err := c.firewallPool.Sync(fwNodePorts, nodeNames); err != nil {
return err
}
return nil
}

View file

@ -30,7 +30,6 @@ import (
const (
testDefaultBeNodePort = int64(3000)
defaultZone = "default-zone"
)
var testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}

View file

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
@ -37,6 +38,10 @@ import (
const (
allowHTTPKey = "kubernetes.io/ingress.allow-http"
staticIPNameKey = "kubernetes.io/ingress.global-static-ip-name"
// Label key to denote which GCE zone a Kubernetes node is in.
zoneKey = "failure-domain.beta.kubernetes.io/zone"
defaultZone = ""
)
// ingAnnotations represents Ingress annotations.
@ -315,3 +320,40 @@ func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 {
}
return knownPorts
}
func getZone(n api.Node) string {
zone, ok := n.Labels[zoneKey]
if !ok {
return defaultZone
}
return zone
}
// GetZoneForNode returns the zone for a given node by looking up its zone label.
func (t *GCETranslator) GetZoneForNode(name string) (string, error) {
nodes, err := t.nodeLister.NodeCondition(nodeReady).List()
if err != nil {
return "", err
}
for _, n := range nodes.Items {
if n.Name == name {
// TODO: Make this more resilient to label changes by listing
// cloud nodes and figuring out zone.
return getZone(n), nil
}
}
return "", fmt.Errorf("Node not found %v", name)
}
// ListZones returns a list of zones this Kubernetes cluster spans.
func (t *GCETranslator) ListZones() ([]string, error) {
zones := sets.String{}
readyNodes, err := t.nodeLister.NodeCondition(nodeReady).List()
if err != nil {
return zones.List(), err
}
for _, n := range readyNodes.Items {
zones.Insert(getZone(n))
}
return zones.List(), nil
}

View file

@ -17,6 +17,7 @@ limitations under the License.
package instances
import (
"fmt"
"net/http"
"strings"
@ -35,89 +36,162 @@ const (
// Instances implements NodePool.
type Instances struct {
cloud InstanceGroups
zone string
cloud InstanceGroups
// zones is a list of zones seeded by Kubernetes node zones.
// TODO: we can figure this out.
snapshotter storage.Snapshotter
zoneLister
}
// NewNodePool creates a new node pool.
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups, zone string) NodePool {
glog.V(3).Infof("NodePool is only aware of instances in zone %v", zone)
return &Instances{cloud, zone, storage.NewInMemoryPool()}
func NewNodePool(cloud InstanceGroups, defaultZone string) NodePool {
return &Instances{cloud, storage.NewInMemoryPool(), nil}
}
func (i *Instances) Init(zl zoneLister) {
i.zoneLister = zl
}
// AddInstanceGroup creates or gets an instance group if it doesn't exist
// and adds the given port to it.
func (i *Instances) AddInstanceGroup(name string, port int64) (*compute.InstanceGroup, *compute.NamedPort, error) {
ig, _ := i.Get(name)
if ig == nil {
glog.Infof("Creating instance group %v", name)
// and adds the given port to it. Returns a list of one instance group per zone,
// all of which have the exact same named port.
func (i *Instances) AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) {
igs := []*compute.InstanceGroup{}
namedPort := &compute.NamedPort{}
zones, err := i.ListZones()
if err != nil {
return igs, namedPort, err
}
for _, zone := range zones {
ig, _ := i.Get(name, zone)
var err error
ig, err = i.cloud.CreateInstanceGroup(name, i.zone)
if ig == nil {
glog.Infof("Creating instance group %v in zone %v", name, zone)
ig, err = i.cloud.CreateInstanceGroup(name, zone)
if err != nil {
return nil, nil, err
}
} else {
glog.V(3).Infof("Instance group %v already exists in zone %v, adding port %d to it", name, zone, port)
}
defer i.snapshotter.Add(name, struct{}{})
namedPort, err = i.cloud.AddPortToInstanceGroup(ig, port)
if err != nil {
return nil, nil, err
}
} else {
glog.V(3).Infof("Instance group already exists %v", name)
igs = append(igs, ig)
}
defer i.snapshotter.Add(name, ig)
namedPort, err := i.cloud.AddPortToInstanceGroup(ig, port)
if err != nil {
return nil, nil, err
}
return ig, namedPort, nil
return igs, namedPort, nil
}
// DeleteInstanceGroup deletes the given IG by name.
// DeleteInstanceGroup deletes the given IG by name, from all zones.
func (i *Instances) DeleteInstanceGroup(name string) error {
defer i.snapshotter.Delete(name)
return i.cloud.DeleteInstanceGroup(name, i.zone)
errs := []error{}
zones, err := i.ListZones()
if err != nil {
return err
}
for _, zone := range zones {
glog.Infof("deleting instance group %v in zone %v", name, zone)
if err := i.cloud.DeleteInstanceGroup(name, zone); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
}
// list lists all instances in all zones.
func (i *Instances) list(name string) (sets.String, error) {
nodeNames := sets.NewString()
instances, err := i.cloud.ListInstancesInInstanceGroup(
name, i.zone, allInstances)
zones, err := i.ListZones()
if err != nil {
return nodeNames, err
}
for _, ins := range instances.Items {
// TODO: If round trips weren't so slow one would be inclided
// to GetInstance using this url and get the name.
parts := strings.Split(ins.Instance, "/")
nodeNames.Insert(parts[len(parts)-1])
for _, zone := range zones {
instances, err := i.cloud.ListInstancesInInstanceGroup(
name, zone, allInstances)
if err != nil {
return nodeNames, err
}
for _, ins := range instances.Items {
// TODO: If round trips weren't so slow one would be inclided
// to GetInstance using this url and get the name.
parts := strings.Split(ins.Instance, "/")
nodeNames.Insert(parts[len(parts)-1])
}
}
return nodeNames, nil
}
// Get returns the Instance Group by name.
func (i *Instances) Get(name string) (*compute.InstanceGroup, error) {
ig, err := i.cloud.GetInstanceGroup(name, i.zone)
func (i *Instances) Get(name, zone string) (*compute.InstanceGroup, error) {
ig, err := i.cloud.GetInstanceGroup(name, zone)
if err != nil {
return nil, err
}
i.snapshotter.Add(name, ig)
i.snapshotter.Add(name, struct{}{})
return ig, nil
}
// Add adds the given instances to the Instance Group.
func (i *Instances) Add(groupName string, names []string) error {
glog.V(3).Infof("Adding nodes %v to %v", names, groupName)
return i.cloud.AddInstancesToInstanceGroup(groupName, i.zone, names)
func (i *Instances) splitNodesByZone(names []string) map[string][]string {
nodesByZone := map[string][]string{}
for _, name := range names {
zone, err := i.GetZoneForNode(name)
if err != nil {
glog.Errorf("Failed to get zones for %v: %v, skipping", name, err)
continue
}
if _, ok := nodesByZone[zone]; !ok {
nodesByZone[zone] = []string{}
}
nodesByZone[zone] = append(nodesByZone[zone], name)
}
return nodesByZone
}
// Remove removes the given instances from the Instance Group.
// Add adds the given instances to the appropriately zoned Instance Group.
func (i *Instances) Add(groupName string, names []string) error {
errs := []error{}
for zone, nodeNames := range i.splitNodesByZone(names) {
glog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.AddInstancesToInstanceGroup(groupName, zone, nodeNames); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
}
// Remove removes the given instances from the appropriately zoned Instance Group.
func (i *Instances) Remove(groupName string, names []string) error {
glog.V(3).Infof("Removing nodes %v from %v", names, groupName)
return i.cloud.RemoveInstancesFromInstanceGroup(groupName, i.zone, names)
errs := []error{}
for zone, nodeNames := range i.splitNodesByZone(names) {
glog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, nodeNames); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
}
// Sync syncs kubernetes instances with the instances in the instance group.
func (i *Instances) Sync(nodes []string) (err error) {
glog.V(3).Infof("Syncing nodes %v", nodes)
glog.V(1).Infof("Syncing nodes %v", nodes)
defer func() {
// The node pool is only responsible for syncing nodes to instance

View file

@ -20,17 +20,26 @@ import (
compute "google.golang.org/api/compute/v1"
)
// zoneLister manages lookups for GCE instance groups/instances to zones.
type zoneLister interface {
ListZones() ([]string, error)
GetZoneForNode(name string) (string, error)
}
// NodePool is an interface to manage a pool of kubernetes nodes synced with vm instances in the cloud
// through the InstanceGroups interface.
// through the InstanceGroups interface. It handles zones opaquely using the zoneLister.
type NodePool interface {
AddInstanceGroup(name string, port int64) (*compute.InstanceGroup, *compute.NamedPort, error)
Init(zl zoneLister)
// The following 2 methods operate on instance groups.
AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error)
DeleteInstanceGroup(name string) error
// TODO: Refactor for modularity
Add(groupName string, nodeNames []string) error
Remove(groupName string, nodeNames []string) error
Sync(nodeNames []string) error
Get(name string) (*compute.InstanceGroup, error)
Get(name, zone string) (*compute.InstanceGroup, error)
}
// InstanceGroups is an interface for managing gce instances groups, and the instances therein.

View file

@ -220,6 +220,7 @@ func main() {
if clusterManager.ClusterNamer.ClusterName != "" {
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.ClusterName)
}
clusterManager.Init(&controller.GCETranslator{lbc})
go registerHandlers(lbc)
go handleSigterm(lbc, *deleteAllOnQuit)