From 24fb4b70aa2cf08ed25632e1022770ccf9aea37c Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Mon, 14 Mar 2016 17:33:12 -0700 Subject: [PATCH] Cluster UID store/retrieval --- controllers/gce/backends/backends.go | 4 +- controllers/gce/backends/backends_test.go | 2 +- controllers/gce/controller/cluster_manager.go | 11 +- controllers/gce/controller/controller.go | 2 +- controllers/gce/controller/fakes.go | 7 +- controllers/gce/firewalls/firewalls.go | 4 +- controllers/gce/healthchecks/healthchecks.go | 4 +- .../gce/loadbalancers/loadbalancers.go | 6 +- .../gce/loadbalancers/loadbalancers_test.go | 2 +- controllers/gce/main.go | 10 +- controllers/gce/storage/configmaps.go | 177 ++++++++++++++++++ controllers/gce/storage/configmaps_test.go | 54 ++++++ 12 files changed, 263 insertions(+), 20 deletions(-) create mode 100644 controllers/gce/storage/configmaps.go create mode 100644 controllers/gce/storage/configmaps_test.go diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index a47190cc5..b307f8e33 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -38,11 +38,11 @@ type Backends struct { nodePool instances.NodePool healthChecker healthchecks.HealthChecker snapshotter storage.Snapshotter - namer utils.Namer // ignoredPorts are a set of ports excluded from GC, even // after the Ingress has been deleted. Note that invoking // a Delete() on these ports will still delete the backend. ignoredPorts sets.String + namer *utils.Namer } func portKey(port int64) string { @@ -60,7 +60,7 @@ func NewBackendPool( cloud BackendServices, healthChecker healthchecks.HealthChecker, nodePool instances.NodePool, - namer utils.Namer, + namer *utils.Namer, ignorePorts []int64, resyncWithCloud bool) *Backends { diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index f8f74e2d2..6c7c07c5b 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -28,7 +28,7 @@ import ( ) func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool { - namer := utils.Namer{} + namer := &utils.Namer{} return NewBackendPool( f, healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer), diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 8f31c9590..be3e899f5 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -66,7 +66,7 @@ const ( // ClusterManager manages cluster resource pools. type ClusterManager struct { - ClusterNamer utils.Namer + ClusterNamer *utils.Namer defaultBackendNodePort int64 instancePool instances.NodePool backendPool backends.BackendPool @@ -227,12 +227,17 @@ func NewClusterManager( // and continue. cloud := getGCEClient() - cluster := ClusterManager{ClusterNamer: utils.Namer{name}} + // Names are fundamental to the cluster, the uid allocator makes sure names don't collide. + cluster := ClusterManager{ClusterNamer: &utils.Namer{name}} zone, err := cloud.GetZone() if err != nil { return nil, err } + + // NodePool stores GCE vms that are in this Kubernetes cluster. cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain) + + // BackendPool creates GCE BackendServices and associated health checks. healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer) // TODO: This needs to change to a consolidated management of the default backend. @@ -242,6 +247,8 @@ func NewClusterManager( defaultBackendPool := backends.NewBackendPool( cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false) cluster.defaultBackendNodePort = defaultBackendNodePort + + // L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs. cluster.l7Pool = loadbalancers.NewLoadBalancerPool( cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer) diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index 9fadf9a33..1fb5d926a 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -301,7 +301,7 @@ func (lbc *LoadBalancerController) sync(key string) { } else if err := l7.UpdateUrlMap(urlMap); err != nil { lbc.recorder.Eventf(&ing, api.EventTypeWarning, "UrlMap", err.Error()) syncError = fmt.Errorf("%v, update url map error: %v", syncError, err) - } else if lbc.updateIngressStatus(l7, ing); err != nil { + } else if err := lbc.updateIngressStatus(l7, ing); err != nil { lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error()) syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err) } diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index cc5341714..911c71dda 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -17,14 +17,15 @@ limitations under the License. package controller import ( + "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/contrib/ingress/controllers/gce/backends" "k8s.io/contrib/ingress/controllers/gce/firewalls" "k8s.io/contrib/ingress/controllers/gce/healthchecks" "k8s.io/contrib/ingress/controllers/gce/instances" "k8s.io/contrib/ingress/controllers/gce/loadbalancers" "k8s.io/contrib/ingress/controllers/gce/utils" - "k8s.io/kubernetes/pkg/util/intstr" - "k8s.io/kubernetes/pkg/util/sets" ) const ( @@ -48,7 +49,7 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager { fakeBackends := backends.NewFakeBackendServices() fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) fakeHCs := healthchecks.NewFakeHealthChecks() - namer := utils.Namer{clusterName} + namer := &utils.Namer{clusterName} nodePool := instances.NewNodePool(fakeIGs, defaultZone) healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) backendPool := backends.NewBackendPool( diff --git a/controllers/gce/firewalls/firewalls.go b/controllers/gce/firewalls/firewalls.go index b111aed1e..9fed37118 100644 --- a/controllers/gce/firewalls/firewalls.go +++ b/controllers/gce/firewalls/firewalls.go @@ -30,14 +30,14 @@ const l7SrcRange = "130.211.0.0/22" // FirewallRules manages firewall rules. type FirewallRules struct { cloud Firewall - namer utils.Namer + namer *utils.Namer srcRange netset.IPNet } // NewFirewallPool creates a new firewall rule manager. // cloud: the cloud object implementing Firewall. // namer: cluster namer. -func NewFirewallPool(cloud Firewall, namer utils.Namer) SingleFirewallPool { +func NewFirewallPool(cloud Firewall, namer *utils.Namer) SingleFirewallPool { srcNetSet, err := netset.ParseIPNets(l7SrcRange) if err != nil { glog.Fatalf("Could not parse L7 src range %v for firewall rule: %v", l7SrcRange, err) diff --git a/controllers/gce/healthchecks/healthchecks.go b/controllers/gce/healthchecks/healthchecks.go index 856847aab..849899850 100644 --- a/controllers/gce/healthchecks/healthchecks.go +++ b/controllers/gce/healthchecks/healthchecks.go @@ -28,13 +28,13 @@ import ( type HealthChecks struct { cloud SingleHealthCheck defaultPath string - namer utils.Namer + namer *utils.Namer } // NewHealthChecker creates a new health checker. // cloud: the cloud object implementing SingleHealthCheck. // defaultHealthCheckPath: is the HTTP path to use for health checks. -func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer utils.Namer) HealthChecker { +func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker { return &HealthChecks{cloud, defaultHealthCheckPath, namer} } diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index 69321e780..45d2a76cc 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -67,7 +67,7 @@ type L7s struct { glbcDefaultBackend *compute.BackendService defaultBackendPool backends.BackendPool defaultBackendNodePort int64 - namer utils.Namer + namer *utils.Namer } // NewLoadBalancerPool returns a new loadbalancer pool. @@ -80,7 +80,7 @@ type L7s struct { func NewLoadBalancerPool( cloud LoadBalancers, defaultBackendPool backends.BackendPool, - defaultBackendNodePort int64, namer utils.Namer) LoadBalancerPool { + defaultBackendNodePort int64, namer *utils.Namer) LoadBalancerPool { return &L7s{cloud, storage.NewInMemoryPool(), nil, defaultBackendPool, defaultBackendNodePort, namer} } @@ -284,7 +284,7 @@ type L7 struct { // TODO: Expose this to users. glbcDefaultBackend *compute.BackendService // namer is used to compute names of the various sub-components of an L7. - namer utils.Namer + namer *utils.Namer } func (l *L7) checkUrlMap(backend *compute.BackendService) (err error) { diff --git a/controllers/gce/loadbalancers/loadbalancers_test.go b/controllers/gce/loadbalancers/loadbalancers_test.go index aa07b68b1..795cf2c89 100644 --- a/controllers/gce/loadbalancers/loadbalancers_test.go +++ b/controllers/gce/loadbalancers/loadbalancers_test.go @@ -36,7 +36,7 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool { fakeBackends := backends.NewFakeBackendServices() fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) fakeHCs := healthchecks.NewFakeHealthChecks() - namer := utils.Namer{} + namer := &utils.Namer{} healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer) backendPool := backends.NewBackendPool( fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer, []int64{}, false) diff --git a/controllers/gce/main.go b/controllers/gce/main.go index 48c51cc97..dcd9413d0 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -28,6 +28,7 @@ import ( flag "github.com/spf13/pflag" "k8s.io/contrib/ingress/controllers/gce/controller" + "k8s.io/contrib/ingress/controllers/gce/storage" "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" kubectl_util "k8s.io/kubernetes/pkg/kubectl/cmd/util" @@ -56,7 +57,10 @@ const ( alphaNumericChar = "0" // Current docker image version. Only used in debug logging. - imageVersion = "glbc:0.6.2" + imageVersion = "glbc:0.6.3" + + // Key used to persist UIDs to configmaps. + uidConfigMapName = "ingress-uid" ) var ( @@ -156,7 +160,7 @@ func main() { go_flag.Lookup("logtostderr").Value.Set("true") go_flag.Set("v", "4") } - glog.Infof("Starting GLBC image: %v", imageVersion) + glog.Infof("Starting GLBC image: %v, cluster name %v", imageVersion, *clusterName) if *defaultSvc == "" { glog.Fatalf("Please specify --default-backend") } @@ -188,7 +192,7 @@ func main() { if *inCluster || *useRealCloud { // Create cluster manager clusterManager, err = controller.NewClusterManager( - *clusterName, defaultBackendNodePort, *healthCheckPath) + *clusterName, defaultBackendNodePort, *healthCheckPath, storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)) if err != nil { glog.Fatalf("%v", err) } diff --git a/controllers/gce/storage/configmaps.go b/controllers/gce/storage/configmaps.go new file mode 100644 index 000000000..deba3b737 --- /dev/null +++ b/controllers/gce/storage/configmaps.go @@ -0,0 +1,177 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "fmt" + "strings" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/client/cache" + client "k8s.io/kubernetes/pkg/client/unversioned" +) + +// UIDVault stores UIDs. +type UIDVault interface { + Get() (string, bool, error) + Put(string) error + Delete() error +} + +// uidDataKey is the key used in config maps to store the UID. +const uidDataKey = "uid" + +// ConfigMapVault stores cluster UIDs in config maps. +// It's a layer on top of ConfigMapStore that just implements the utils.uidVault +// interface. +type ConfigMapVault struct { + ConfigMapStore cache.Store + namespace string + name string +} + +// Get retrieves the cluster UID from the cluster config map. +// If this method returns an error, it's guaranteed to be apiserver flake. +// If the error is a not found error it sets the boolean to false and +// returns and error of nil instead. +func (c *ConfigMapVault) Get() (string, bool, error) { + key := fmt.Sprintf("%v/%v", c.namespace, c.name) + item, found, err := c.ConfigMapStore.GetByKey(key) + if err != nil || !found { + return "", found, err + } + cfg := item.(*api.ConfigMap) + if k, ok := cfg.Data[uidDataKey]; ok { + return k, false, nil + } + return "", found, fmt.Errorf("Found config map %v but it doesn't contain uid key: %+v", key, cfg.Data) +} + +// Put stores the given UID in the cluster config map. +func (c *ConfigMapVault) Put(uid string) error { + apiObj := &api.ConfigMap{ + ObjectMeta: api.ObjectMeta{ + Name: c.name, + Namespace: c.namespace, + }, + Data: map[string]string{uidDataKey: uid}, + } + cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name) + + item, exists, err := c.ConfigMapStore.GetByKey(cfgMapKey) + if err == nil && exists { + data := item.(*api.ConfigMap).Data + if k, ok := data[uidDataKey]; ok && k == uid { + return nil + } else if ok { + glog.Infof("Configmap %v has key %v but wrong value %v, updating", cfgMapKey, k, uid) + } + + if err := c.ConfigMapStore.Update(apiObj); err != nil { + return fmt.Errorf("Failed to update %v: %v", cfgMapKey, err) + } + } else if err := c.ConfigMapStore.Add(apiObj); err != nil { + return fmt.Errorf("Failed to add %v: %v", cfgMapKey, err) + } + glog.Infof("Successfully stored uid %v in config map %v", uid, cfgMapKey) + return nil +} + +// Delete deletes the cluster UID storing config map. +func (c *ConfigMapVault) Delete() error { + cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name) + item, _, err := c.ConfigMapStore.GetByKey(cfgMapKey) + if err == nil { + return c.ConfigMapStore.Delete(item) + } + glog.Warningf("Couldn't find item %v in vault, unable to delete", cfgMapKey) + return nil +} + +// NewConfigMapVault creates a config map client. +// This client is essentially meant to abstract out the details of +// configmaps and the API, and just store/retrieve a single value, the cluster uid. +func NewConfigMapVault(c *client.Client, uidNs, uidConfigMapName string) *ConfigMapVault { + return &ConfigMapVault{NewConfigMapStore(c), uidNs, uidConfigMapName} +} + +// FakeConfigMapStore is an implementation of the ConfigMapStore that doesn't +// persist configmaps. Only used in testing. +func NewFakeConfigMapVault(ns, name string) *ConfigMapVault { + return &ConfigMapVault{cache.NewStore(cache.MetaNamespaceKeyFunc), ns, name} +} + +// ConfigMapStore wraps the store interface. Implementations usually persist +// contents of the store transparently. +type ConfigMapStore interface { + cache.Store +} + +// ApiServerConfigMapStore only services Add and GetByKey from apiserver. +// TODO: Implement all the other store methods and make this a write +// through cache. +type ApiServerConfigMapStore struct { + ConfigMapStore + client *client.Client +} + +// Add adds the given config map to the apiserver's store. +func (a *ApiServerConfigMapStore) Add(obj interface{}) error { + cfg := obj.(*api.ConfigMap) + _, err := a.client.ConfigMaps(cfg.Namespace).Create(cfg) + return err +} + +// Update updates the existing config map object. +func (a *ApiServerConfigMapStore) Update(obj interface{}) error { + cfg := obj.(*api.ConfigMap) + _, err := a.client.ConfigMaps(cfg.Namespace).Update(cfg) + return err +} + +// Delete deletes the existing config map object. +func (a *ApiServerConfigMapStore) Delete(obj interface{}) error { + cfg := obj.(*api.ConfigMap) + return a.client.ConfigMaps(cfg.Namespace).Delete(cfg.Name) +} + +// GetByKey returns the config map for a given key. +// The key must take the form namespace/name. +func (a *ApiServerConfigMapStore) GetByKey(key string) (item interface{}, exists bool, err error) { + nsName := strings.Split(key, "/") + if len(nsName) != 2 { + return nil, false, fmt.Errorf("Failed to get key %v, unexpecte format, expecting ns/name", key) + } + ns, name := nsName[0], nsName[1] + cfg, err := a.client.ConfigMaps(ns).Get(name) + if err != nil { + // Translate not found errors to found=false, err=nil + if errors.IsNotFound(err) { + return nil, false, nil + } + return nil, false, err + } + return cfg, true, nil +} + +// NewConfigMapStore returns a config map store capable of persisting updates +// to apiserver. +func NewConfigMapStore(c *client.Client) ConfigMapStore { + return &ApiServerConfigMapStore{ConfigMapStore: cache.NewStore(cache.MetaNamespaceKeyFunc), client: c} +} diff --git a/controllers/gce/storage/configmaps_test.go b/controllers/gce/storage/configmaps_test.go new file mode 100644 index 000000000..50710d6ca --- /dev/null +++ b/controllers/gce/storage/configmaps_test.go @@ -0,0 +1,54 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api" +) + +func TestConfigMapUID(t *testing.T) { + vault := NewFakeConfigMapVault(api.NamespaceSystem, "ingress-uid") + uid := "" + k, exists, err := vault.Get() + if exists { + t.Errorf("Got a key from an empyt vault") + } + vault.Put(uid) + k, exists, err = vault.Get() + if !exists || err != nil { + t.Errorf("Failed to retrieve value from vault") + } + if k != "" { + t.Errorf("Failed to store empty string as a key in the vault") + } + vault.Put("newuid") + k, exists, err = vault.Get() + if !exists || err != nil { + t.Errorf("Failed to retrieve value from vault") + } + if k != "newuid" { + t.Errorf("Failed to modify uid") + } + if err := vault.Delete(); err != nil { + t.Errorf("Failed to delete uid %v", err) + } + if uid, exists, _ := vault.Get(); exists { + t.Errorf("Found uid %v, expected none", uid) + } +}