From 8d59739bb167544a444e3147b00982b3948bca6c Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Mon, 28 Mar 2016 11:29:44 -0700 Subject: [PATCH] List from cloud and resync InMemoryPool. --- controllers/gce/backends/backends.go | 53 ++++++++++-- controllers/gce/backends/backends_test.go | 58 ++++++++++++-- controllers/gce/backends/interfaces.go | 2 +- controllers/gce/controller/cluster_manager.go | 7 +- controllers/gce/controller/fakes.go | 2 +- .../gce/loadbalancers/loadbalancers.go | 2 +- .../gce/loadbalancers/loadbalancers_test.go | 2 +- controllers/gce/storage/pools.go | 80 +++++++++++++++++++ controllers/gce/utils/utils.go | 41 ++++++++++ 9 files changed, 229 insertions(+), 18 deletions(-) diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index 7c5076a16..711cd40ba 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "strconv" + "time" "k8s.io/kubernetes/pkg/util/sets" @@ -38,6 +39,10 @@ type Backends struct { healthChecker healthchecks.HealthChecker snapshotter storage.Snapshotter namer utils.Namer + // ignoredPorts are a set of ports excluded from GC, even + // after the Ingress has been deleted. Note that invoking + // a Delete() on these ports will still delete the backend. + ignoredPorts sets.String } func portKey(port int64) string { @@ -46,18 +51,46 @@ func portKey(port int64) string { // NewBackendPool returns a new backend pool. // - cloud: implements BackendServices and syncs backends with a cloud provider +// - healthChecker: is capable of producing health checks for backends. // - nodePool: implements NodePool, used to create/delete new instance groups. +// - namer: procudes names for backends. +// - ignorePorts: is a set of ports to avoid syncing/GCing. +// - resyncWithCloud: if true, periodically syncs with cloud resources. func NewBackendPool( cloud BackendServices, healthChecker healthchecks.HealthChecker, - nodePool instances.NodePool, namer utils.Namer) *Backends { - return &Backends{ + nodePool instances.NodePool, namer utils.Namer, ignorePorts []int64, resyncWithCloud bool) *Backends { + ignored := []string{} + for _, p := range ignorePorts { + ignored = append(ignored, portKey(p)) + } + backendPool := &Backends{ cloud: cloud, nodePool: nodePool, - snapshotter: storage.NewInMemoryPool(), healthChecker: healthChecker, namer: namer, + ignoredPorts: sets.NewString(ignored...), } + if !resyncWithCloud { + backendPool.snapshotter = storage.NewInMemoryPool() + return backendPool + } + backendPool.snapshotter = storage.NewCloudListingPool( + func(i interface{}) (string, error) { + bs := i.(*compute.BackendService) + if !namer.NameBelongsToCluster(bs.Name) { + return "", fmt.Errorf("Unrecognized name %v", bs.Name) + } + port, err := namer.BePort(bs.Name) + if err != nil { + return "", err + } + return port, nil + }, + backendPool, + 30*time.Second, + ) + return backendPool } // Get returns a single backend. @@ -150,10 +183,18 @@ func (b *Backends) Delete(port int64) (err error) { } // List lists all backends. -func (b *Backends) List() (*compute.BackendServiceList, error) { +func (b *Backends) List() ([]interface{}, error) { // TODO: for consistency with the rest of this sub-package this method // should return a list of backend ports. - return b.cloud.ListBackendServices() + interList := []interface{}{} + be, err := b.cloud.ListBackendServices() + if err != nil { + return interList, err + } + for i := range be.Items { + interList = append(interList, be.Items[i]) + } + return interList, nil } // edgeHop checks the links of the given backend by executing an edge hop. @@ -200,7 +241,7 @@ func (b *Backends) GC(svcNodePorts []int64) error { return err } nodePort := int64(p) - if knownPorts.Has(portKey(nodePort)) { + if knownPorts.Has(portKey(nodePort)) || b.ignoredPorts.Has(portKey(nodePort)) { continue } glog.V(3).Infof("GCing backend for port %v", p) diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index 6461ec68f..93d761902 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -19,24 +19,26 @@ package backends import ( "testing" + compute "google.golang.org/api/compute/v1" "k8s.io/contrib/ingress/controllers/gce/healthchecks" "k8s.io/contrib/ingress/controllers/gce/instances" + "k8s.io/contrib/ingress/controllers/gce/storage" "k8s.io/contrib/ingress/controllers/gce/utils" "k8s.io/kubernetes/pkg/util/sets" ) -func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups) BackendPool { +func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool { namer := utils.Namer{} return NewBackendPool( f, healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer), - instances.NewNodePool(fakeIGs, "default-zone"), namer) + instances.NewNodePool(fakeIGs, "default-zone"), namer, []int64{}, syncWithCloud) } func TestBackendPoolAdd(t *testing.T) { f := NewFakeBackendServices() fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - pool := newBackendPool(f, fakeIGs) + pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} // Add a backend for a port, then re-add the same port and @@ -89,13 +91,12 @@ func TestBackendPoolAdd(t *testing.T) { } func TestBackendPoolSync(t *testing.T) { - // Call sync on a backend pool with a list of ports, make sure the pool // creates/deletes required ports. svcNodePorts := []int64{81, 82, 83} f := NewFakeBackendServices() fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - pool := newBackendPool(f, fakeIGs) + pool := newBackendPool(f, fakeIGs, true) pool.Add(81) pool.Add(90) pool.Sync(svcNodePorts) @@ -109,12 +110,57 @@ func TestBackendPoolSync(t *testing.T) { } } + svcNodePorts = []int64{81} + deletedPorts := []int64{82, 83} + pool.GC(svcNodePorts) + for _, port := range deletedPorts { + if _, err := pool.Get(port); err == nil { + t.Fatalf("Pool contains %v after deletion", port) + } + } + + // All these backends should be ignored because they don't belong to the cluster. + // foo - non k8s managed backend + // k8s-be-foo - foo is not a nodeport + // k8s--bar--foo - too many cluster delimiters + // k8s-be-3001--uid - another cluster tagged with uid + unrelatedBackends := sets.NewString([]string{"foo", "k8s-be-foo", "k8s--bar--foo", "k8s-be-30001--uid"}...) + for _, name := range unrelatedBackends.List() { + f.CreateBackendService(&compute.BackendService{Name: name}) + } + + namer := &utils.Namer{} + // This backend should get deleted again since it is managed by this cluster. + f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0])}) + + // TODO: Avoid casting. + // Repopulate the pool with a cloud list, which now includes the 82 port + // backend. This would happen if, say, an ingress backend is removed + // while the controller is restarting. + pool.(*Backends).snapshotter.(*storage.CloudListingPool).ReplinishPool() + + pool.GC(svcNodePorts) + + currBackends, _ := f.ListBackendServices() + currSet := sets.NewString() + for _, b := range currBackends.Items { + currSet.Insert(b.Name) + } + // Port 81 still exists because it's an in-use service NodePort. + knownBe := namer.BeName(81) + if !currSet.Has(knownBe) { + t.Fatalf("Expected %v to exist in backend pool", knownBe) + } + currSet.Delete(knownBe) + if !currSet.Equal(unrelatedBackends) { + t.Fatalf("Some unrelated backends were deleted. Expected %+v, got %+v", unrelatedBackends, currSet) + } } func TestBackendPoolShutdown(t *testing.T) { f := NewFakeBackendServices() fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) - pool := newBackendPool(f, fakeIGs) + pool := newBackendPool(f, fakeIGs, false) namer := utils.Namer{} pool.Add(80) diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 0030092ee..3e199f9e5 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -30,7 +30,7 @@ type BackendPool interface { GC(ports []int64) error Shutdown() error Status(name string) string - List() (*compute.BackendServiceList, error) + List() ([]interface{}, error) } // BackendServices is an interface for managing gce backend services. diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 076f74cff..4128c60d7 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -161,13 +161,16 @@ func NewClusterManager( } cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain) healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer) + + // TODO: This needs to change to a consolidated management of the default backend. cluster.backendPool = backends.NewBackendPool( - cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer) + cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true) defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer) defaultBackendPool := backends.NewBackendPool( - cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer) + cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) cluster.defaultBackendNodePort = defaultBackendNodePort cluster.l7Pool = loadbalancers.NewLoadBalancerPool( cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) + return &cluster, nil } diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index ec3adee49..9b036164b 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -52,7 +52,7 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager { healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) backendPool := backends.NewBackendPool( fakeBackends, - healthChecker, nodePool, namer) + healthChecker, nodePool, namer, []int64{}, false) l7Pool := loadbalancers.NewLoadBalancerPool( fakeLbs, // TODO: change this diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index cf5585c22..c3f3bd947 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -174,7 +174,7 @@ func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { // The default backend is completely managed by the l7 pool. // This includes recreating it if it's deleted, or fixing broken links. - if err := l.defaultBackendPool.Sync([]int64{l.defaultBackendNodePort}); err != nil { + if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil { return err } // create new loadbalancers, perform an edge hop for existing diff --git a/controllers/gce/loadbalancers/loadbalancers_test.go b/controllers/gce/loadbalancers/loadbalancers_test.go index b5622fadf..aa07b68b1 100644 --- a/controllers/gce/loadbalancers/loadbalancers_test.go +++ b/controllers/gce/loadbalancers/loadbalancers_test.go @@ -39,7 +39,7 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { namer := utils.Namer{} healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) backendPool := backends.NewBackendPool( - fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer) + fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer, []int64{}, false) return NewLoadBalancerPool(f, backendPool, testDefaultBeNodePort, namer) } diff --git a/controllers/gce/storage/pools.go b/controllers/gce/storage/pools.go index af8e6e63e..7cd26da94 100644 --- a/controllers/gce/storage/pools.go +++ b/controllers/gce/storage/pools.go @@ -17,7 +17,12 @@ limitations under the License. package storage import ( + "sync" + "time" + + "github.com/golang/glog" "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/util/wait" ) // Snapshotter is an interface capable of providing a consistent snapshot of @@ -51,3 +56,78 @@ func NewInMemoryPool() *InMemoryPool { return &InMemoryPool{ cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{})} } + +type keyFunc func(interface{}) (string, error) + +type cloudLister interface { + List() ([]interface{}, error) +} + +// CloudListingPool wraps InMemoryPool but relists from the cloud periodically. +type CloudListingPool struct { + // A lock to protect against concurrent mutation of the pool + lock sync.Mutex + // The pool that is re-populated via re-list from cloud, and written to + // from controller + *InMemoryPool + // An interface that lists objects from the cloud. + lister cloudLister + // A function capable of producing a key for a given object. + // This key must match the key used to store the same object in the user of + // this cache. + keyGetter keyFunc +} + +// ReplinishPool lists through the cloudLister and inserts into the pool. +func (c *CloudListingPool) ReplinishPool() { + c.lock.Lock() + defer c.lock.Unlock() + glog.V(4).Infof("Replinishing pool") + items, err := c.lister.List() + if err != nil { + glog.Warningf("Failed to list: %v", err) + return + } + for i := range items { + key, err := c.keyGetter(items[i]) + if err != nil { + glog.V(4).Infof("CloudListingPool: %v", err) + continue + } + c.InMemoryPool.Add(key, items[i]) + } +} + +// Snapshot just snapshots the underlying pool. +func (c *CloudListingPool) Snapshot() map[string]interface{} { + c.lock.Lock() + defer c.lock.Unlock() + return c.InMemoryPool.Snapshot() +} + +// Add simply adds to the underlying pool. +func (c *CloudListingPool) Add(key string, obj interface{}) { + c.lock.Lock() + defer c.lock.Unlock() + c.InMemoryPool.Add(key, obj) +} + +// Delete just deletes from underlying pool. +func (c *CloudListingPool) Delete(key string) { + c.lock.Lock() + defer c.lock.Unlock() + c.InMemoryPool.Delete(key) +} + +// NewCloudListingPool replinishes the InMemoryPool through a background +// goroutine that lists from the given cloudLister. +func NewCloudListingPool(k keyFunc, lister cloudLister, relistPeriod time.Duration) *CloudListingPool { + cl := &CloudListingPool{ + InMemoryPool: NewInMemoryPool(), + lister: lister, + keyGetter: k, + } + glog.V(4).Infof("Starting pool replinish goroutine") + go wait.Until(cl.ReplinishPool, relistPeriod, make(chan struct{})) + return cl +} diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index fd93694fc..a7dd117c7 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -18,10 +18,13 @@ package utils import ( "fmt" + "strconv" "strings" + "github.com/golang/glog" compute "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" + "regexp" ) const ( @@ -46,6 +49,7 @@ const ( // This allows sharing of backends across loadbalancers. backendPrefix = "k8s-be" + backendRegex = "k8s-be-([0-9]+).*" // Prefix used for instance groups involved in L7 balancing. igPrefix = "k8s-ig" @@ -93,11 +97,48 @@ func (n *Namer) decorateName(name string) string { return n.Truncate(fmt.Sprintf("%v%v%v", name, clusterNameDelimiter, n.ClusterName)) } +// NameBelongsToCluster checks if a given name is tagged with this cluster's UID. +func (n *Namer) NameBelongsToCluster(name string) bool { + if !strings.HasPrefix(name, "k8s-") { + glog.V(4).Infof("%v not part of cluster", name) + return false + } + parts := strings.Split(name, clusterNameDelimiter) + if len(parts) == 1 { + if n.ClusterName == "" { + return true + } + return false + } + if len(parts) > 2 { + glog.Warningf("Too many parts to name %v, ignoring", name) + return false + } + return parts[1] == n.ClusterName +} + // BeName constructs the name for a backend. func (n *Namer) BeName(port int64) string { return n.decorateName(fmt.Sprintf("%v-%d", backendPrefix, port)) } +// BePort retrieves the port from the given backend name. +func (n *Namer) BePort(beName string) (string, error) { + r, err := regexp.Compile(backendRegex) + if err != nil { + return "", err + } + match := r.FindStringSubmatch(beName) + if len(match) < 2 { + return "", fmt.Errorf("Unable to lookup port for %v", beName) + } + _, err = strconv.Atoi(match[1]) + if err != nil { + return "", fmt.Errorf("Unexpected regex match: %v", beName) + } + return match[1], nil +} + // IGName constructs the name for an Instance Group. func (n *Namer) IGName() string { // Currently all ports are added to a single instance group.