Cluster UID store/retrieval
This commit is contained in:
parent
e93d8d8152
commit
24fb4b70aa
12 changed files with 263 additions and 20 deletions
|
@ -38,11 +38,11 @@ type Backends struct {
|
|||
nodePool instances.NodePool
|
||||
healthChecker healthchecks.HealthChecker
|
||||
snapshotter storage.Snapshotter
|
||||
namer utils.Namer
|
||||
// ignoredPorts are a set of ports excluded from GC, even
|
||||
// after the Ingress has been deleted. Note that invoking
|
||||
// a Delete() on these ports will still delete the backend.
|
||||
ignoredPorts sets.String
|
||||
namer *utils.Namer
|
||||
}
|
||||
|
||||
func portKey(port int64) string {
|
||||
|
@ -60,7 +60,7 @@ func NewBackendPool(
|
|||
cloud BackendServices,
|
||||
healthChecker healthchecks.HealthChecker,
|
||||
nodePool instances.NodePool,
|
||||
namer utils.Namer,
|
||||
namer *utils.Namer,
|
||||
ignorePorts []int64,
|
||||
resyncWithCloud bool) *Backends {
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ import (
|
|||
)
|
||||
|
||||
func newBackendPool(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) BackendPool {
|
||||
namer := utils.Namer{}
|
||||
namer := &utils.Namer{}
|
||||
return NewBackendPool(
|
||||
f,
|
||||
healthchecks.NewHealthChecker(healthchecks.NewFakeHealthChecks(), "/", namer),
|
||||
|
|
|
@ -66,7 +66,7 @@ const (
|
|||
|
||||
// ClusterManager manages cluster resource pools.
|
||||
type ClusterManager struct {
|
||||
ClusterNamer utils.Namer
|
||||
ClusterNamer *utils.Namer
|
||||
defaultBackendNodePort int64
|
||||
instancePool instances.NodePool
|
||||
backendPool backends.BackendPool
|
||||
|
@ -227,12 +227,17 @@ func NewClusterManager(
|
|||
// and continue.
|
||||
cloud := getGCEClient()
|
||||
|
||||
cluster := ClusterManager{ClusterNamer: utils.Namer{name}}
|
||||
// Names are fundamental to the cluster, the uid allocator makes sure names don't collide.
|
||||
cluster := ClusterManager{ClusterNamer: &utils.Namer{name}}
|
||||
zone, err := cloud.GetZone()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// NodePool stores GCE vms that are in this Kubernetes cluster.
|
||||
cluster.instancePool = instances.NewNodePool(cloud, zone.FailureDomain)
|
||||
|
||||
// BackendPool creates GCE BackendServices and associated health checks.
|
||||
healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer)
|
||||
|
||||
// TODO: This needs to change to a consolidated management of the default backend.
|
||||
|
@ -242,6 +247,8 @@ func NewClusterManager(
|
|||
defaultBackendPool := backends.NewBackendPool(
|
||||
cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
|
||||
cluster.defaultBackendNodePort = defaultBackendNodePort
|
||||
|
||||
// L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs.
|
||||
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(
|
||||
cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)
|
||||
cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer)
|
||||
|
|
|
@ -301,7 +301,7 @@ func (lbc *LoadBalancerController) sync(key string) {
|
|||
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
|
||||
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "UrlMap", err.Error())
|
||||
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
|
||||
} else if lbc.updateIngressStatus(l7, ing); err != nil {
|
||||
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
|
||||
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error())
|
||||
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
|
||||
}
|
||||
|
|
|
@ -17,14 +17,15 @@ limitations under the License.
|
|||
package controller
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/util/intstr"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
||||
"k8s.io/contrib/ingress/controllers/gce/backends"
|
||||
"k8s.io/contrib/ingress/controllers/gce/firewalls"
|
||||
"k8s.io/contrib/ingress/controllers/gce/healthchecks"
|
||||
"k8s.io/contrib/ingress/controllers/gce/instances"
|
||||
"k8s.io/contrib/ingress/controllers/gce/loadbalancers"
|
||||
"k8s.io/contrib/ingress/controllers/gce/utils"
|
||||
"k8s.io/kubernetes/pkg/util/intstr"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -48,7 +49,7 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager {
|
|||
fakeBackends := backends.NewFakeBackendServices()
|
||||
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
|
||||
fakeHCs := healthchecks.NewFakeHealthChecks()
|
||||
namer := utils.Namer{clusterName}
|
||||
namer := &utils.Namer{clusterName}
|
||||
nodePool := instances.NewNodePool(fakeIGs, defaultZone)
|
||||
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
|
||||
backendPool := backends.NewBackendPool(
|
||||
|
|
|
@ -30,14 +30,14 @@ const l7SrcRange = "130.211.0.0/22"
|
|||
// FirewallRules manages firewall rules.
|
||||
type FirewallRules struct {
|
||||
cloud Firewall
|
||||
namer utils.Namer
|
||||
namer *utils.Namer
|
||||
srcRange netset.IPNet
|
||||
}
|
||||
|
||||
// NewFirewallPool creates a new firewall rule manager.
|
||||
// cloud: the cloud object implementing Firewall.
|
||||
// namer: cluster namer.
|
||||
func NewFirewallPool(cloud Firewall, namer utils.Namer) SingleFirewallPool {
|
||||
func NewFirewallPool(cloud Firewall, namer *utils.Namer) SingleFirewallPool {
|
||||
srcNetSet, err := netset.ParseIPNets(l7SrcRange)
|
||||
if err != nil {
|
||||
glog.Fatalf("Could not parse L7 src range %v for firewall rule: %v", l7SrcRange, err)
|
||||
|
|
|
@ -28,13 +28,13 @@ import (
|
|||
type HealthChecks struct {
|
||||
cloud SingleHealthCheck
|
||||
defaultPath string
|
||||
namer utils.Namer
|
||||
namer *utils.Namer
|
||||
}
|
||||
|
||||
// NewHealthChecker creates a new health checker.
|
||||
// cloud: the cloud object implementing SingleHealthCheck.
|
||||
// defaultHealthCheckPath: is the HTTP path to use for health checks.
|
||||
func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer utils.Namer) HealthChecker {
|
||||
func NewHealthChecker(cloud SingleHealthCheck, defaultHealthCheckPath string, namer *utils.Namer) HealthChecker {
|
||||
return &HealthChecks{cloud, defaultHealthCheckPath, namer}
|
||||
}
|
||||
|
||||
|
|
|
@ -67,7 +67,7 @@ type L7s struct {
|
|||
glbcDefaultBackend *compute.BackendService
|
||||
defaultBackendPool backends.BackendPool
|
||||
defaultBackendNodePort int64
|
||||
namer utils.Namer
|
||||
namer *utils.Namer
|
||||
}
|
||||
|
||||
// NewLoadBalancerPool returns a new loadbalancer pool.
|
||||
|
@ -80,7 +80,7 @@ type L7s struct {
|
|||
func NewLoadBalancerPool(
|
||||
cloud LoadBalancers,
|
||||
defaultBackendPool backends.BackendPool,
|
||||
defaultBackendNodePort int64, namer utils.Namer) LoadBalancerPool {
|
||||
defaultBackendNodePort int64, namer *utils.Namer) LoadBalancerPool {
|
||||
return &L7s{cloud, storage.NewInMemoryPool(), nil, defaultBackendPool, defaultBackendNodePort, namer}
|
||||
}
|
||||
|
||||
|
@ -284,7 +284,7 @@ type L7 struct {
|
|||
// TODO: Expose this to users.
|
||||
glbcDefaultBackend *compute.BackendService
|
||||
// namer is used to compute names of the various sub-components of an L7.
|
||||
namer utils.Namer
|
||||
namer *utils.Namer
|
||||
}
|
||||
|
||||
func (l *L7) checkUrlMap(backend *compute.BackendService) (err error) {
|
||||
|
|
|
@ -36,7 +36,7 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T) LoadBalancerPool {
|
|||
fakeBackends := backends.NewFakeBackendServices()
|
||||
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
|
||||
fakeHCs := healthchecks.NewFakeHealthChecks()
|
||||
namer := utils.Namer{}
|
||||
namer := &utils.Namer{}
|
||||
healthChecker := healthchecks.NewHealthChecker(fakeHCs, "/", namer)
|
||||
backendPool := backends.NewBackendPool(
|
||||
fakeBackends, healthChecker, instances.NewNodePool(fakeIGs, defaultZone), namer, []int64{}, false)
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
|
||||
flag "github.com/spf13/pflag"
|
||||
"k8s.io/contrib/ingress/controllers/gce/controller"
|
||||
"k8s.io/contrib/ingress/controllers/gce/storage"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
kubectl_util "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||
|
@ -56,7 +57,10 @@ const (
|
|||
alphaNumericChar = "0"
|
||||
|
||||
// Current docker image version. Only used in debug logging.
|
||||
imageVersion = "glbc:0.6.2"
|
||||
imageVersion = "glbc:0.6.3"
|
||||
|
||||
// Key used to persist UIDs to configmaps.
|
||||
uidConfigMapName = "ingress-uid"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -156,7 +160,7 @@ func main() {
|
|||
go_flag.Lookup("logtostderr").Value.Set("true")
|
||||
go_flag.Set("v", "4")
|
||||
}
|
||||
glog.Infof("Starting GLBC image: %v", imageVersion)
|
||||
glog.Infof("Starting GLBC image: %v, cluster name %v", imageVersion, *clusterName)
|
||||
if *defaultSvc == "" {
|
||||
glog.Fatalf("Please specify --default-backend")
|
||||
}
|
||||
|
@ -188,7 +192,7 @@ func main() {
|
|||
if *inCluster || *useRealCloud {
|
||||
// Create cluster manager
|
||||
clusterManager, err = controller.NewClusterManager(
|
||||
*clusterName, defaultBackendNodePort, *healthCheckPath)
|
||||
*clusterName, defaultBackendNodePort, *healthCheckPath, storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName))
|
||||
if err != nil {
|
||||
glog.Fatalf("%v", err)
|
||||
}
|
||||
|
|
177
controllers/gce/storage/configmaps.go
Normal file
177
controllers/gce/storage/configmaps.go
Normal file
|
@ -0,0 +1,177 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
)
|
||||
|
||||
// UIDVault stores UIDs.
|
||||
type UIDVault interface {
|
||||
Get() (string, bool, error)
|
||||
Put(string) error
|
||||
Delete() error
|
||||
}
|
||||
|
||||
// uidDataKey is the key used in config maps to store the UID.
|
||||
const uidDataKey = "uid"
|
||||
|
||||
// ConfigMapVault stores cluster UIDs in config maps.
|
||||
// It's a layer on top of ConfigMapStore that just implements the utils.uidVault
|
||||
// interface.
|
||||
type ConfigMapVault struct {
|
||||
ConfigMapStore cache.Store
|
||||
namespace string
|
||||
name string
|
||||
}
|
||||
|
||||
// Get retrieves the cluster UID from the cluster config map.
|
||||
// If this method returns an error, it's guaranteed to be apiserver flake.
|
||||
// If the error is a not found error it sets the boolean to false and
|
||||
// returns and error of nil instead.
|
||||
func (c *ConfigMapVault) Get() (string, bool, error) {
|
||||
key := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
||||
item, found, err := c.ConfigMapStore.GetByKey(key)
|
||||
if err != nil || !found {
|
||||
return "", found, err
|
||||
}
|
||||
cfg := item.(*api.ConfigMap)
|
||||
if k, ok := cfg.Data[uidDataKey]; ok {
|
||||
return k, false, nil
|
||||
}
|
||||
return "", found, fmt.Errorf("Found config map %v but it doesn't contain uid key: %+v", key, cfg.Data)
|
||||
}
|
||||
|
||||
// Put stores the given UID in the cluster config map.
|
||||
func (c *ConfigMapVault) Put(uid string) error {
|
||||
apiObj := &api.ConfigMap{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: c.name,
|
||||
Namespace: c.namespace,
|
||||
},
|
||||
Data: map[string]string{uidDataKey: uid},
|
||||
}
|
||||
cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
||||
|
||||
item, exists, err := c.ConfigMapStore.GetByKey(cfgMapKey)
|
||||
if err == nil && exists {
|
||||
data := item.(*api.ConfigMap).Data
|
||||
if k, ok := data[uidDataKey]; ok && k == uid {
|
||||
return nil
|
||||
} else if ok {
|
||||
glog.Infof("Configmap %v has key %v but wrong value %v, updating", cfgMapKey, k, uid)
|
||||
}
|
||||
|
||||
if err := c.ConfigMapStore.Update(apiObj); err != nil {
|
||||
return fmt.Errorf("Failed to update %v: %v", cfgMapKey, err)
|
||||
}
|
||||
} else if err := c.ConfigMapStore.Add(apiObj); err != nil {
|
||||
return fmt.Errorf("Failed to add %v: %v", cfgMapKey, err)
|
||||
}
|
||||
glog.Infof("Successfully stored uid %v in config map %v", uid, cfgMapKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes the cluster UID storing config map.
|
||||
func (c *ConfigMapVault) Delete() error {
|
||||
cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
||||
item, _, err := c.ConfigMapStore.GetByKey(cfgMapKey)
|
||||
if err == nil {
|
||||
return c.ConfigMapStore.Delete(item)
|
||||
}
|
||||
glog.Warningf("Couldn't find item %v in vault, unable to delete", cfgMapKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewConfigMapVault creates a config map client.
|
||||
// This client is essentially meant to abstract out the details of
|
||||
// configmaps and the API, and just store/retrieve a single value, the cluster uid.
|
||||
func NewConfigMapVault(c *client.Client, uidNs, uidConfigMapName string) *ConfigMapVault {
|
||||
return &ConfigMapVault{NewConfigMapStore(c), uidNs, uidConfigMapName}
|
||||
}
|
||||
|
||||
// FakeConfigMapStore is an implementation of the ConfigMapStore that doesn't
|
||||
// persist configmaps. Only used in testing.
|
||||
func NewFakeConfigMapVault(ns, name string) *ConfigMapVault {
|
||||
return &ConfigMapVault{cache.NewStore(cache.MetaNamespaceKeyFunc), ns, name}
|
||||
}
|
||||
|
||||
// ConfigMapStore wraps the store interface. Implementations usually persist
|
||||
// contents of the store transparently.
|
||||
type ConfigMapStore interface {
|
||||
cache.Store
|
||||
}
|
||||
|
||||
// ApiServerConfigMapStore only services Add and GetByKey from apiserver.
|
||||
// TODO: Implement all the other store methods and make this a write
|
||||
// through cache.
|
||||
type ApiServerConfigMapStore struct {
|
||||
ConfigMapStore
|
||||
client *client.Client
|
||||
}
|
||||
|
||||
// Add adds the given config map to the apiserver's store.
|
||||
func (a *ApiServerConfigMapStore) Add(obj interface{}) error {
|
||||
cfg := obj.(*api.ConfigMap)
|
||||
_, err := a.client.ConfigMaps(cfg.Namespace).Create(cfg)
|
||||
return err
|
||||
}
|
||||
|
||||
// Update updates the existing config map object.
|
||||
func (a *ApiServerConfigMapStore) Update(obj interface{}) error {
|
||||
cfg := obj.(*api.ConfigMap)
|
||||
_, err := a.client.ConfigMaps(cfg.Namespace).Update(cfg)
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete deletes the existing config map object.
|
||||
func (a *ApiServerConfigMapStore) Delete(obj interface{}) error {
|
||||
cfg := obj.(*api.ConfigMap)
|
||||
return a.client.ConfigMaps(cfg.Namespace).Delete(cfg.Name)
|
||||
}
|
||||
|
||||
// GetByKey returns the config map for a given key.
|
||||
// The key must take the form namespace/name.
|
||||
func (a *ApiServerConfigMapStore) GetByKey(key string) (item interface{}, exists bool, err error) {
|
||||
nsName := strings.Split(key, "/")
|
||||
if len(nsName) != 2 {
|
||||
return nil, false, fmt.Errorf("Failed to get key %v, unexpecte format, expecting ns/name", key)
|
||||
}
|
||||
ns, name := nsName[0], nsName[1]
|
||||
cfg, err := a.client.ConfigMaps(ns).Get(name)
|
||||
if err != nil {
|
||||
// Translate not found errors to found=false, err=nil
|
||||
if errors.IsNotFound(err) {
|
||||
return nil, false, nil
|
||||
}
|
||||
return nil, false, err
|
||||
}
|
||||
return cfg, true, nil
|
||||
}
|
||||
|
||||
// NewConfigMapStore returns a config map store capable of persisting updates
|
||||
// to apiserver.
|
||||
func NewConfigMapStore(c *client.Client) ConfigMapStore {
|
||||
return &ApiServerConfigMapStore{ConfigMapStore: cache.NewStore(cache.MetaNamespaceKeyFunc), client: c}
|
||||
}
|
54
controllers/gce/storage/configmaps_test.go
Normal file
54
controllers/gce/storage/configmaps_test.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
func TestConfigMapUID(t *testing.T) {
|
||||
vault := NewFakeConfigMapVault(api.NamespaceSystem, "ingress-uid")
|
||||
uid := ""
|
||||
k, exists, err := vault.Get()
|
||||
if exists {
|
||||
t.Errorf("Got a key from an empyt vault")
|
||||
}
|
||||
vault.Put(uid)
|
||||
k, exists, err = vault.Get()
|
||||
if !exists || err != nil {
|
||||
t.Errorf("Failed to retrieve value from vault")
|
||||
}
|
||||
if k != "" {
|
||||
t.Errorf("Failed to store empty string as a key in the vault")
|
||||
}
|
||||
vault.Put("newuid")
|
||||
k, exists, err = vault.Get()
|
||||
if !exists || err != nil {
|
||||
t.Errorf("Failed to retrieve value from vault")
|
||||
}
|
||||
if k != "newuid" {
|
||||
t.Errorf("Failed to modify uid")
|
||||
}
|
||||
if err := vault.Delete(); err != nil {
|
||||
t.Errorf("Failed to delete uid %v", err)
|
||||
}
|
||||
if uid, exists, _ := vault.Get(); exists {
|
||||
t.Errorf("Found uid %v, expected none", uid)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue