List from cloud and resync InMemoryPool.

This commit is contained in:
Prashanth Balasubramanian 2016-03-28 11:29:44 -07:00
parent ba26bcabf5
commit 8d59739bb1
9 changed files with 229 additions and 18 deletions

View file

@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"strconv"
"time"
"k8s.io/kubernetes/pkg/util/sets"
@ -38,6 +39,10 @@ type Backends struct {
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
namer utils.Namer
// ignoredPorts are a set of ports excluded from GC, even
// after the Ingress has been deleted. Note that invoking
// a Delete() on these ports will still delete the backend.
ignoredPorts sets.String
}
func portKey(port int64) string {
@ -46,18 +51,46 @@ func portKey(port int64) string {
// NewBackendPool returns a new backend pool.
// - cloud: implements BackendServices and syncs backends with a cloud provider
// - healthChecker: is capable of producing health checks for backends.
// - nodePool: implements NodePool, used to create/delete new instance groups.
// - namer: procudes names for backends.
// - ignorePorts: is a set of ports to avoid syncing/GCing.
// - resyncWithCloud: if true, periodically syncs with cloud resources.
func NewBackendPool(
cloud BackendServices,
healthChecker healthchecks.HealthChecker,
nodePool instances.NodePool, namer utils.Namer) *Backends {
return &Backends{
nodePool instances.NodePool, namer utils.Namer, ignorePorts []int64, resyncWithCloud bool) *Backends {
ignored := []string{}
for _, p := range ignorePorts {
ignored = append(ignored, portKey(p))
}
backendPool := &Backends{
cloud: cloud,
nodePool: nodePool,
snapshotter: storage.NewInMemoryPool(),
healthChecker: healthChecker,
namer: namer,
ignoredPorts: sets.NewString(ignored...),
}
if !resyncWithCloud {
backendPool.snapshotter = storage.NewInMemoryPool()
return backendPool
}
backendPool.snapshotter = storage.NewCloudListingPool(
func(i interface{}) (string, error) {
bs := i.(*compute.BackendService)
if !namer.NameBelongsToCluster(bs.Name) {
return "", fmt.Errorf("Unrecognized name %v", bs.Name)
}
port, err := namer.BePort(bs.Name)
if err != nil {
return "", err
}
return port, nil
},
backendPool,
30*time.Second,
)
return backendPool
}
// Get returns a single backend.
@ -150,10 +183,18 @@ func (b *Backends) Delete(port int64) (err error) {
}
// List lists all backends.
func (b *Backends) List() (*compute.BackendServiceList, error) {
func (b *Backends) List() ([]interface{}, error) {
// TODO: for consistency with the rest of this sub-package this method
// should return a list of backend ports.
return b.cloud.ListBackendServices()
interList := []interface{}{}
be, err := b.cloud.ListBackendServices()
if err != nil {
return interList, err
}
for i := range be.Items {
interList = append(interList, be.Items[i])
}
return interList, nil
}
// edgeHop checks the links of the given backend by executing an edge hop.
@ -200,7 +241,7 @@ func (b *Backends) GC(svcNodePorts []int64) error {
return err
}
nodePort := int64(p)
if knownPorts.Has(portKey(nodePort)) {
if knownPorts.Has(portKey(nodePort)) || b.ignoredPorts.Has(portKey(nodePort)) {
continue
}
glog.V(3).Infof("GCing backend for port %v", p)

View file

@ -19,24 +19,26 @@ package backends
import (
"testing"
compute "google.golang.org/api/compute/v1"
"k8s.io/contrib/ingress/controllers/gce/healthchecks"
"k8s.io/contrib/ingress/controllers/gce/instances"
"k8s.io/contrib/ingress/controllers/gce/storage"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/util/sets"
)
func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups) BackendPool {
func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool {
namer := utils.Namer{}
return NewBackendPool(
f,
healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer),
instances.NewNodePool(fakeIGs, "default-zone"), namer)
instances.NewNodePool(fakeIGs, "default-zone"), namer, []int64{}, syncWithCloud)
}
func TestBackendPoolAdd(t *testing.T) {
f := NewFakeBackendServices()
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
pool := newBackendPool(f, fakeIGs)
pool := newBackendPool(f, fakeIGs, false)
namer := utils.Namer{}
// Add a backend for a port, then re-add the same port and
@ -89,13 +91,12 @@ func TestBackendPoolAdd(t *testing.T) {
}
func TestBackendPoolSync(t *testing.T) {
// Call sync on a backend pool with a list of ports, make sure the pool
// creates/deletes required ports.
svcNodePorts := []int64{81, 82, 83}
f := NewFakeBackendServices()
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
pool := newBackendPool(f, fakeIGs)
pool := newBackendPool(f, fakeIGs, true)
pool.Add(81)
pool.Add(90)
pool.Sync(svcNodePorts)
@ -109,12 +110,57 @@ func TestBackendPoolSync(t *testing.T) {
}
}
svcNodePorts = []int64{81}
deletedPorts := []int64{82, 83}
pool.GC(svcNodePorts)
for _, port := range deletedPorts {
if _, err := pool.Get(port); err == nil {
t.Fatalf("Pool contains %v after deletion", port)
}
}
// All these backends should be ignored because they don't belong to the cluster.
// foo - non k8s managed backend
// k8s-be-foo - foo is not a nodeport
// k8s--bar--foo - too many cluster delimiters
// k8s-be-3001--uid - another cluster tagged with uid
unrelatedBackends := sets.NewString([]string{"foo", "k8s-be-foo", "k8s--bar--foo", "k8s-be-30001--uid"}...)
for _, name := range unrelatedBackends.List() {
f.CreateBackendService(&compute.BackendService{Name: name})
}
namer := &utils.Namer{}
// This backend should get deleted again since it is managed by this cluster.
f.CreateBackendService(&compute.BackendService{Name: namer.BeName(deletedPorts[0])})
// TODO: Avoid casting.
// Repopulate the pool with a cloud list, which now includes the 82 port
// backend. This would happen if, say, an ingress backend is removed
// while the controller is restarting.
pool.(*Backends).snapshotter.(*storage.CloudListingPool).ReplinishPool()
pool.GC(svcNodePorts)
currBackends, _ := f.ListBackendServices()
currSet := sets.NewString()
for _, b := range currBackends.Items {
currSet.Insert(b.Name)
}
// Port 81 still exists because it's an in-use service NodePort.
knownBe := namer.BeName(81)
if !currSet.Has(knownBe) {
t.Fatalf("Expected %v to exist in backend pool", knownBe)
}
currSet.Delete(knownBe)
if !currSet.Equal(unrelatedBackends) {
t.Fatalf("Some unrelated backends were deleted. Expected %+v, got %+v", unrelatedBackends, currSet)
}
}
func TestBackendPoolShutdown(t *testing.T) {
f := NewFakeBackendServices()
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
pool := newBackendPool(f, fakeIGs)
pool := newBackendPool(f, fakeIGs, false)
namer := utils.Namer{}
pool.Add(80)

View file

@ -30,7 +30,7 @@ type BackendPool interface {
GC(ports []int64) error
Shutdown() error
Status(name string) string
List() (*compute.BackendServiceList, error)
List() ([]interface{}, error)
}
// BackendServices is an interface for managing gce backend services.

View file

@ -161,13 +161,16 @@ func NewClusterManager(
}
cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain)
healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer)
// TODO: This needs to change to a consolidated management of the default backend.
cluster.backendPool = backends.NewBackendPool(
cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer)
cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort}, true)
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
defaultBackendPool := backends.NewBackendPool(
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer)
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
cluster.defaultBackendNodePort = defaultBackendNodePort
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(
cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)
return &cluster, nil
}

View file

@ -52,7 +52,7 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager {
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
backendPool := backends.NewBackendPool(
fakeBackends,
healthChecker, nodePool, namer)
healthChecker, nodePool, namer, []int64{}, false)
l7Pool := loadbalancers.NewLoadBalancerPool(
fakeLbs,
// TODO: change this

View file

@ -174,7 +174,7 @@ func (l *L7s) Sync(lbs []*L7RuntimeInfo) error {
// The default backend is completely managed by the l7 pool.
// This includes recreating it if it's deleted, or fixing broken links.
if err := l.defaultBackendPool.Sync([]int64{l.defaultBackendNodePort}); err != nil {
if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil {
return err
}
// create new loadbalancers, perform an edge hop for existing

View file

@ -39,7 +39,7 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool {
namer := utils.Namer{}
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
backendPool := backends.NewBackendPool(
fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer)
fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer, []int64{}, false)
return NewLoadBalancerPool(f, backendPool, testDefaultBeNodePort, namer)
}

View file

@ -17,7 +17,12 @@ limitations under the License.
package storage
import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/util/wait"
)
// Snapshotter is an interface capable of providing a consistent snapshot of
@ -51,3 +56,78 @@ func NewInMemoryPool() *InMemoryPool {
return &InMemoryPool{
cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{})}
}
type keyFunc func(interface{}) (string, error)
type cloudLister interface {
List() ([]interface{}, error)
}
// CloudListingPool wraps InMemoryPool but relists from the cloud periodically.
type CloudListingPool struct {
// A lock to protect against concurrent mutation of the pool
lock sync.Mutex
// The pool that is re-populated via re-list from cloud, and written to
// from controller
*InMemoryPool
// An interface that lists objects from the cloud.
lister cloudLister
// A function capable of producing a key for a given object.
// This key must match the key used to store the same object in the user of
// this cache.
keyGetter keyFunc
}
// ReplinishPool lists through the cloudLister and inserts into the pool.
func (c *CloudListingPool) ReplinishPool() {
c.lock.Lock()
defer c.lock.Unlock()
glog.V(4).Infof("Replinishing pool")
items, err := c.lister.List()
if err != nil {
glog.Warningf("Failed to list: %v", err)
return
}
for i := range items {
key, err := c.keyGetter(items[i])
if err != nil {
glog.V(4).Infof("CloudListingPool: %v", err)
continue
}
c.InMemoryPool.Add(key, items[i])
}
}
// Snapshot just snapshots the underlying pool.
func (c *CloudListingPool) Snapshot() map[string]interface{} {
c.lock.Lock()
defer c.lock.Unlock()
return c.InMemoryPool.Snapshot()
}
// Add simply adds to the underlying pool.
func (c *CloudListingPool) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.InMemoryPool.Add(key, obj)
}
// Delete just deletes from underlying pool.
func (c *CloudListingPool) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
c.InMemoryPool.Delete(key)
}
// NewCloudListingPool replinishes the InMemoryPool through a background
// goroutine that lists from the given cloudLister.
func NewCloudListingPool(k keyFunc, lister cloudLister, relistPeriod time.Duration) *CloudListingPool {
cl := &CloudListingPool{
InMemoryPool: NewInMemoryPool(),
lister: lister,
keyGetter: k,
}
glog.V(4).Infof("Starting pool replinish goroutine")
go wait.Until(cl.ReplinishPool, relistPeriod, make(chan struct{}))
return cl
}

View file

@ -18,10 +18,13 @@ package utils
import (
"fmt"
"strconv"
"strings"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"regexp"
)
const (
@ -46,6 +49,7 @@ const (
// This allows sharing of backends across loadbalancers.
backendPrefix = "k8s-be"
backendRegex = "k8s-be-([0-9]+).*"
// Prefix used for instance groups involved in L7 balancing.
igPrefix = "k8s-ig"
@ -93,11 +97,48 @@ func (n *Namer) decorateName(name string) string {
return n.Truncate(fmt.Sprintf("%v%v%v", name, clusterNameDelimiter, n.ClusterName))
}
// NameBelongsToCluster checks if a given name is tagged with this cluster's UID.
func (n *Namer) NameBelongsToCluster(name string) bool {
if !strings.HasPrefix(name, "k8s-") {
glog.V(4).Infof("%v not part of cluster", name)
return false
}
parts := strings.Split(name, clusterNameDelimiter)
if len(parts) == 1 {
if n.ClusterName == "" {
return true
}
return false
}
if len(parts) > 2 {
glog.Warningf("Too many parts to name %v, ignoring", name)
return false
}
return parts[1] == n.ClusterName
}
// BeName constructs the name for a backend.
func (n *Namer) BeName(port int64) string {
return n.decorateName(fmt.Sprintf("%v-%d", backendPrefix, port))
}
// BePort retrieves the port from the given backend name.
func (n *Namer) BePort(beName string) (string, error) {
r, err := regexp.Compile(backendRegex)
if err != nil {
return "", err
}
match := r.FindStringSubmatch(beName)
if len(match) < 2 {
return "", fmt.Errorf("Unable to lookup port for %v", beName)
}
_, err = strconv.Atoi(match[1])
if err != nil {
return "", fmt.Errorf("Unexpected regex match: %v", beName)
}
return match[1], nil
}
// IGName constructs the name for an Instance Group.
func (n *Namer) IGName() string {
// Currently all ports are added to a single instance group.