First stab at extending the "uid" configmap to store firewall
rule information.
This commit is contained in:
parent
a8b89677d6
commit
85a34516b3
4 changed files with 200 additions and 74 deletions
|
@ -120,6 +120,9 @@ var (
|
|||
|
||||
healthzPort = flags.Int("healthz-port", lbApiPort,
|
||||
`Port to run healthz server. Must match the health check port in yaml.`)
|
||||
|
||||
firewallName = flags.String("firewall-name", "",
|
||||
`Name to use for firewall rule names in lieu of cluster-uid.`)
|
||||
)
|
||||
|
||||
func registerHandlers(lbc *controller.LoadBalancerController) {
|
||||
|
@ -213,7 +216,7 @@ func main() {
|
|||
|
||||
if *inCluster || *useRealCloud {
|
||||
// Create cluster manager
|
||||
namer, err := newNamer(kubeClient, *clusterName)
|
||||
namer, err := newNamer(kubeClient, *clusterName, *firewallName)
|
||||
if err != nil {
|
||||
glog.Fatalf("%v", err)
|
||||
}
|
||||
|
@ -245,32 +248,98 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
func newNamer(kubeClient client.Interface, clusterName string) (*utils.Namer, error) {
|
||||
func newNamer(kubeClient client.Interface, clusterName string, fwName string) (*utils.Namer, error) {
|
||||
name, err := getClusterUID(kubeClient, clusterName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fw_name, err := getFirewallName(kubeClient, fwName, clusterName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
namer := utils.NewNamer(name)
|
||||
vault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
|
||||
namer := utils.NewNamerWithFirewall(name, fw_name)
|
||||
uid_vault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
|
||||
|
||||
// Start a goroutine to poll the cluster UID config map
|
||||
// We don't watch because we know exactly which configmap we want and this
|
||||
// controller already watches 5 other resources, so it isn't worth the cost
|
||||
// of another connection and complexity.
|
||||
go wait.Forever(func() {
|
||||
uid, found, err := vault.Get()
|
||||
existing := namer.GetClusterName()
|
||||
if found && uid != existing {
|
||||
glog.Infof("Cluster uid changed from %v -> %v", existing, uid)
|
||||
namer.SetClusterName(uid)
|
||||
} else if err != nil {
|
||||
glog.Errorf("Failed to reconcile cluster uid %v, currently set to %v", err, existing)
|
||||
// First look for changes in firewallConfigMap.
|
||||
vaultmap, found, err := uid_vault.Get()
|
||||
if !found || err != nil {
|
||||
glog.Errorf("Can't read uidConfigMap %v", uidConfigMapName)
|
||||
return
|
||||
}
|
||||
|
||||
for key, val := range vaultmap {
|
||||
switch key {
|
||||
case storage.UidDataKey:
|
||||
if uid := namer.GetClusterName(); uid != val {
|
||||
glog.Infof("Cluster uid changed from %v -> %v", val, uid)
|
||||
namer.SetClusterName(uid)
|
||||
}
|
||||
case storage.FirewallRuleKey:
|
||||
if fw_name := namer.GetFirewallName(); fw_name != val {
|
||||
glog.Infof("Cluster firewall name changed from %v -> %v", val, fw_name)
|
||||
namer.SetClusterFirewallName(fw_name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 5*time.Second)
|
||||
return namer, nil
|
||||
}
|
||||
|
||||
// getFlagOrLookupVault returns the name to use associated to a flag and configmap.
|
||||
// The returned value follows this priority:
|
||||
// If the provided 'name' is not empty, that name is used.
|
||||
// This is effectively a client override via a command line flag.
|
||||
// else, check configmap under 'configmap_name' as a key and if found, use the associated value
|
||||
// else, return an empty 'name' and pass along an error iff the configmap lookup is erroneous.
|
||||
func getFlagOrLookupVault(cfgVault *storage.ConfigMapVault, cm_key string, name string) (string, error) {
|
||||
if name != "" {
|
||||
glog.Infof("Using user provided %v %v", cm_key, name)
|
||||
// Don't save the uid in the vault, so users can rollback through
|
||||
// setting the accompany flag to ""
|
||||
return name, nil
|
||||
}
|
||||
vaultmap, found, err := cfgVault.Get()
|
||||
if found {
|
||||
if val, found := vaultmap[cm_key]; found {
|
||||
glog.Infof("Using saved %v %q", cm_key, val)
|
||||
return val, nil
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// This can fail because of:
|
||||
// 1. No such config map - found=false, err=nil
|
||||
// 2. No such key in config map - found=false, err=nil
|
||||
// 3. Apiserver flake - found=false, err!=nil
|
||||
// It is not safe to proceed in 3.
|
||||
return "", fmt.Errorf("Failed to retrieve %v: %v, using %q as name", cm_key, err, name)
|
||||
}
|
||||
// Not found but safe to proceed.
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// getFirewallName returns the firewall rule name to use for this cluster. For
|
||||
// backwards compatibility, the firewall name will default to the cluster UID.
|
||||
// Use getFlagOrLookupVault to obtain a stored or overridden value for the firewall name.
|
||||
// else, use the cluster UID as a backup (this retains backwards compatibility).
|
||||
func getFirewallName(kubeClient client.Interface, name string, cluster_uid string) (string, error) {
|
||||
cfgVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
|
||||
if fw_name, err := getFlagOrLookupVault(cfgVault, storage.FirewallRuleKey, name); err != nil {
|
||||
return "", err
|
||||
} else if fw_name != "" {
|
||||
return fw_name, nil
|
||||
} else {
|
||||
glog.Infof("Using cluster UID %v as firewall name", cluster_uid)
|
||||
return cluster_uid, nil
|
||||
}
|
||||
}
|
||||
|
||||
// getClusterUID returns the cluster UID. Rules for UID generation:
|
||||
// If the user specifies a --cluster-uid param it overwrites everything
|
||||
// else, check UID config map for a previously recorded uid
|
||||
|
@ -279,26 +348,12 @@ func newNamer(kubeClient client.Interface, clusterName string) (*utils.Namer, er
|
|||
// else, allocate a new uid
|
||||
func getClusterUID(kubeClient client.Interface, name string) (string, error) {
|
||||
cfgVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
|
||||
if name != "" {
|
||||
glog.Infof("Using user provided cluster uid %v", name)
|
||||
// Don't save the uid in the vault, so users can rollback through
|
||||
// --cluster-uid=""
|
||||
if name, err := getFlagOrLookupVault(cfgVault, storage.UidDataKey, name); err != nil {
|
||||
return "", err
|
||||
} else if name != "" {
|
||||
return name, nil
|
||||
}
|
||||
|
||||
existingUID, found, err := cfgVault.Get()
|
||||
if found {
|
||||
glog.Infof("Using saved cluster uid %q", existingUID)
|
||||
return existingUID, nil
|
||||
} else if err != nil {
|
||||
// This can fail because of:
|
||||
// 1. No such config map - found=false, err=nil
|
||||
// 2. No such key in config map - found=false, err=nil
|
||||
// 3. Apiserver flake - found=false, err!=nil
|
||||
// It is not safe to proceed in 3.
|
||||
return "", fmt.Errorf("Failed to retrieve current uid: %v, using %q as name", err, name)
|
||||
}
|
||||
|
||||
// Check if the cluster has an Ingress with ip
|
||||
ings, err := kubeClient.Extensions().Ingresses(api.NamespaceAll).List(api.ListOptions{LabelSelector: labels.Everything()})
|
||||
if err != nil {
|
||||
|
@ -309,10 +364,10 @@ func getClusterUID(kubeClient client.Interface, name string) (string, error) {
|
|||
if len(ing.Status.LoadBalancer.Ingress) != 0 {
|
||||
c := namer.ParseName(loadbalancers.GCEResourceName(ing.Annotations, "forwarding-rule"))
|
||||
if c.ClusterName != "" {
|
||||
return c.ClusterName, cfgVault.Put(c.ClusterName)
|
||||
return c.ClusterName, cfgVault.Put(map[string]string{storage.UidDataKey: c.ClusterName})
|
||||
}
|
||||
glog.Infof("Found a working Ingress, assuming uid is empty string")
|
||||
return "", cfgVault.Put("")
|
||||
return "", cfgVault.Put(map[string]string{storage.UidDataKey: ""})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -327,7 +382,7 @@ func getClusterUID(kubeClient client.Interface, name string) (string, error) {
|
|||
return "", err
|
||||
}
|
||||
uid := fmt.Sprintf("%x", b)
|
||||
return uid, cfgVault.Put(uid)
|
||||
return uid, cfgVault.Put(map[string]string{storage.UidDataKey: uid})
|
||||
}
|
||||
|
||||
// getNodePort waits for the Service, and returns it's first node port.
|
||||
|
|
|
@ -27,15 +27,11 @@ import (
|
|||
client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
)
|
||||
|
||||
// UIDVault stores UIDs.
|
||||
type UIDVault interface {
|
||||
Get() (string, bool, error)
|
||||
Put(string) error
|
||||
Delete() error
|
||||
}
|
||||
|
||||
// uidDataKey is the key used in config maps to store the UID.
|
||||
const uidDataKey = "uid"
|
||||
const (
|
||||
// UidDataKey is the key used in config maps to store the UID.
|
||||
UidDataKey = "uid"
|
||||
FirewallRuleKey = "fwName"
|
||||
)
|
||||
|
||||
// ConfigMapVault stores cluster UIDs in config maps.
|
||||
// It's a layer on top of ConfigMapStore that just implements the utils.uidVault
|
||||
|
@ -50,46 +46,59 @@ type ConfigMapVault struct {
|
|||
// If this method returns an error, it's guaranteed to be apiserver flake.
|
||||
// If the error is a not found error it sets the boolean to false and
|
||||
// returns and error of nil instead.
|
||||
func (c *ConfigMapVault) Get() (string, bool, error) {
|
||||
func (c *ConfigMapVault) Get() (map[string]string, bool, error) {
|
||||
key := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
||||
item, found, err := c.ConfigMapStore.GetByKey(key)
|
||||
if err != nil || !found {
|
||||
return "", false, err
|
||||
return nil, false, err
|
||||
}
|
||||
cfg := item.(*api.ConfigMap)
|
||||
if k, ok := cfg.Data[uidDataKey]; ok {
|
||||
return k, true, nil
|
||||
}
|
||||
return "", false, fmt.Errorf("Found config map %v but it doesn't contain uid key: %+v", key, cfg.Data)
|
||||
return cfg.Data, true, nil
|
||||
}
|
||||
|
||||
// Put stores the given UID in the cluster config map.
|
||||
func (c *ConfigMapVault) Put(uid string) error {
|
||||
// Put stores the given UID and EUID in the cluster config map.
|
||||
func (c *ConfigMapVault) Put(keyvals map[string]string) error {
|
||||
if len(keyvals) == 0 {
|
||||
return nil
|
||||
}
|
||||
apiObj := &api.ConfigMap{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: c.name,
|
||||
Namespace: c.namespace,
|
||||
},
|
||||
Data: map[string]string{uidDataKey: uid},
|
||||
Data: keyvals,
|
||||
}
|
||||
cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
||||
|
||||
item, exists, err := c.ConfigMapStore.GetByKey(cfgMapKey)
|
||||
if err == nil && exists {
|
||||
data := item.(*api.ConfigMap).Data
|
||||
if k, ok := data[uidDataKey]; ok && k == uid {
|
||||
return nil
|
||||
} else if ok {
|
||||
glog.Infof("Configmap %v has key %v but wrong value %v, updating", cfgMapKey, k, uid)
|
||||
|
||||
updated_needed := false
|
||||
|
||||
// Dump everything from keyvals into data.
|
||||
for key, val := range keyvals {
|
||||
if val_stored, ok := data[key]; !ok {
|
||||
glog.Infof("Configmap %v will be updated with %v = %v", cfgMapKey, key, val)
|
||||
data[key] = val
|
||||
updated_needed = true
|
||||
} else if val_stored != val {
|
||||
glog.Infof("Configmap %v has key %v but wrong value %v, updating to %v", cfgMapKey, key, val_stored, val)
|
||||
data[key] = val
|
||||
updated_needed = true
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.ConfigMapStore.Update(apiObj); err != nil {
|
||||
return fmt.Errorf("Failed to update %v: %v", cfgMapKey, err)
|
||||
if updated_needed {
|
||||
if err := c.ConfigMapStore.Update(apiObj); err != nil {
|
||||
return fmt.Errorf("Failed to update %v: %v", cfgMapKey, err)
|
||||
}
|
||||
}
|
||||
|
||||
} else if err := c.ConfigMapStore.Add(apiObj); err != nil {
|
||||
return fmt.Errorf("Failed to add %v: %v", cfgMapKey, err)
|
||||
}
|
||||
glog.Infof("Successfully stored uid %q in config map %v", uid, cfgMapKey)
|
||||
glog.Infof("Successfully stored map in config map %v", cfgMapKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package storage
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -24,31 +25,61 @@ import (
|
|||
|
||||
func TestConfigMapUID(t *testing.T) {
|
||||
vault := NewFakeConfigMapVault(api.NamespaceSystem, "ingress-uid")
|
||||
uid := ""
|
||||
k, exists, err := vault.Get()
|
||||
//uid := ""
|
||||
keyvals, exists, err := vault.Get()
|
||||
if exists {
|
||||
t.Errorf("Got a key from an empyt vault")
|
||||
t.Errorf("Got keyvals from an empty vault")
|
||||
}
|
||||
vault.Put(uid)
|
||||
k, exists, err = vault.Get()
|
||||
|
||||
// Store empty value for UidDataKey.
|
||||
uidmap := map[string]string{UidDataKey: ""}
|
||||
vault.Put(uidmap)
|
||||
keyvals, exists, err = vault.Get()
|
||||
if !exists || err != nil {
|
||||
t.Errorf("Failed to retrieve value from vault")
|
||||
}
|
||||
if k != "" {
|
||||
if val, ok := keyvals[UidDataKey]; !ok {
|
||||
t.Errorf("Failed to retried UidDataKey from vault")
|
||||
} else if val != "" {
|
||||
t.Errorf("Failed to store empty string as a key in the vault")
|
||||
}
|
||||
vault.Put("newuid")
|
||||
k, exists, err = vault.Get()
|
||||
|
||||
// Store actual value in key.
|
||||
uidmap[UidDataKey] = "newuid"
|
||||
vault.Put(uidmap)
|
||||
keyvals, exists, err = vault.Get()
|
||||
if !exists || err != nil {
|
||||
t.Errorf("Failed to retrieve value from vault")
|
||||
} else if val, ok := keyvals[UidDataKey]; !ok {
|
||||
t.Errorf("Failed to retried UidDataKey from vault")
|
||||
} else if val != "newuid" {
|
||||
t.Errorf("Failed to store empty string as a key in the vault")
|
||||
}
|
||||
if k != "newuid" {
|
||||
t.Errorf("Failed to modify uid")
|
||||
}
|
||||
|
||||
// Delete value.
|
||||
if err := vault.Delete(); err != nil {
|
||||
t.Errorf("Failed to delete uid %v", err)
|
||||
}
|
||||
if uid, exists, _ := vault.Get(); exists {
|
||||
t.Errorf("Found uid %v, expected none", uid)
|
||||
if keyvals, exists, _ = vault.Get(); exists {
|
||||
t.Errorf("Found uid but expected none after deletion")
|
||||
}
|
||||
|
||||
// Ensure Keystore is not wiped on second update.
|
||||
uidmap[UidDataKey] = "newuid"
|
||||
uidmap[FirewallRuleKey] = "fwrule"
|
||||
vault.Put(uidmap)
|
||||
keyvals, exists, err = vault.Get()
|
||||
if !exists || err != nil || len(keyvals) != 2 {
|
||||
t.Errorf("Failed to retrieve value from vault")
|
||||
}
|
||||
uidmap[UidDataKey] = "newnewuid"
|
||||
vault.Put(uidmap)
|
||||
keyvals, exists, err = vault.Get()
|
||||
if !exists || err != nil || len(keyvals) != 2 {
|
||||
t.Errorf("Failed to retrieve value from vault")
|
||||
}
|
||||
if !reflect.DeepEqual(keyvals, uidmap) {
|
||||
t.Errorf("Failed to provide equal maps from vault after a partial update")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -92,8 +92,9 @@ const (
|
|||
|
||||
// Namer handles centralized naming for the cluster.
|
||||
type Namer struct {
|
||||
clusterName string
|
||||
nameLock sync.Mutex
|
||||
clusterName string
|
||||
firewallName string
|
||||
nameLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewNamer creates a new namer.
|
||||
|
@ -103,6 +104,14 @@ func NewNamer(clusterName string) *Namer {
|
|||
return namer
|
||||
}
|
||||
|
||||
// NewNamer creates a new namer with a Firewall Name
|
||||
func NewNamerWithFirewall(clusterName string, firewallName string) *Namer {
|
||||
namer := &Namer{}
|
||||
namer.SetClusterName(clusterName)
|
||||
namer.SetClusterFirewallName(firewallName)
|
||||
return namer
|
||||
}
|
||||
|
||||
// NameComponents is a struct representing the components of a a GCE resource
|
||||
// name constructed by the namer. The format of such a name is:
|
||||
// k8s-resource-<metadata, eg port>--uid
|
||||
|
@ -123,6 +132,14 @@ func (n *Namer) SetClusterName(name string) {
|
|||
n.clusterName = name
|
||||
}
|
||||
|
||||
func (n *Namer) SetClusterFirewallName(firewall_name string) {
|
||||
// Retain backwards compatible behavior where firewallName == clusterName.
|
||||
n.nameLock.Lock()
|
||||
defer n.nameLock.Unlock()
|
||||
glog.Infof("Changing cluster %v firewall name to %v", n.clusterName, firewall_name)
|
||||
n.firewallName = firewall_name
|
||||
}
|
||||
|
||||
// GetClusterName returns the UID/name of this cluster.
|
||||
func (n *Namer) GetClusterName() string {
|
||||
n.nameLock.Lock()
|
||||
|
@ -130,6 +147,20 @@ func (n *Namer) GetClusterName() string {
|
|||
return n.clusterName
|
||||
}
|
||||
|
||||
// GetClusterNameAndFirewall returns the UID/name and firewall/name of this cluster.
|
||||
func (n *Namer) GetClusterAndFirewallNames() (string, string) {
|
||||
n.nameLock.Lock()
|
||||
defer n.nameLock.Unlock()
|
||||
return n.clusterName, n.firewallName
|
||||
}
|
||||
|
||||
// GetFirewallName returns the firewall name of this cluster.
|
||||
func (n *Namer) GetFirewallName() string {
|
||||
n.nameLock.Lock()
|
||||
defer n.nameLock.Unlock()
|
||||
return n.firewallName
|
||||
}
|
||||
|
||||
// Truncate truncates the given key to a GCE length limit.
|
||||
func (n *Namer) Truncate(key string) string {
|
||||
if len(key) > nameLenLimit {
|
||||
|
@ -216,12 +247,12 @@ func (n *Namer) IGName() string {
|
|||
|
||||
// FrSuffix constructs the glbc specific suffix for the FirewallRule.
|
||||
func (n *Namer) FrSuffix() string {
|
||||
clusterName := n.GetClusterName()
|
||||
firewallName := n.GetFirewallName()
|
||||
// The entire cluster only needs a single firewall rule.
|
||||
if clusterName == "" {
|
||||
if firewallName == "" {
|
||||
return globalFirewallSuffix
|
||||
}
|
||||
return n.Truncate(fmt.Sprintf("%v%v%v", globalFirewallSuffix, clusterNameDelimiter, clusterName))
|
||||
return n.Truncate(fmt.Sprintf("%v%v%v", globalFirewallSuffix, clusterNameDelimiter, firewallName))
|
||||
}
|
||||
|
||||
// FrName constructs the full firewall rule name, this is the name assigned by
|
||||
|
|
Loading…
Reference in a new issue