Merge pull request #278 from csbell/fw-name
Extend ConfigMap to store fwrule names
This commit is contained in:
commit
a6e38221ee
9 changed files with 220 additions and 95 deletions
|
@ -46,6 +46,10 @@ var (
|
||||||
// L7 controller created without specifying the --cluster-uid flag.
|
// L7 controller created without specifying the --cluster-uid flag.
|
||||||
DefaultClusterUID = ""
|
DefaultClusterUID = ""
|
||||||
|
|
||||||
|
// DefaultFirewallName is the name to user for firewall rules created
|
||||||
|
// by an L7 controller when the --fireall-rule is not used.
|
||||||
|
DefaultFirewallName = ""
|
||||||
|
|
||||||
// Frequency to poll on local stores to sync.
|
// Frequency to poll on local stores to sync.
|
||||||
storeSyncPollPeriod = 5 * time.Second
|
storeSyncPollPeriod = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
|
@ -199,7 +199,8 @@ func addIngress(lbc *LoadBalancerController, ing *extensions.Ingress, pm *nodePo
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLbCreateDelete(t *testing.T) {
|
func TestLbCreateDelete(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
testFirewallName := "quux"
|
||||||
|
cm := NewFakeClusterManager(DefaultClusterUID, testFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
inputMap1 := map[string]utils.FakeIngressRuleValueMap{
|
inputMap1 := map[string]utils.FakeIngressRuleValueMap{
|
||||||
"foo.example.com": {
|
"foo.example.com": {
|
||||||
|
@ -240,6 +241,7 @@ func TestLbCreateDelete(t *testing.T) {
|
||||||
unexpected := []int{pm.portMap["foo2svc"], pm.portMap["bar2svc"]}
|
unexpected := []int{pm.portMap["foo2svc"], pm.portMap["bar2svc"]}
|
||||||
expected := []int{pm.portMap["foo1svc"], pm.portMap["bar1svc"]}
|
expected := []int{pm.portMap["foo1svc"], pm.portMap["bar1svc"]}
|
||||||
firewallPorts := sets.NewString()
|
firewallPorts := sets.NewString()
|
||||||
|
pm.namer.SetFirewallName(testFirewallName)
|
||||||
firewallName := pm.namer.FrName(pm.namer.FrSuffix())
|
firewallName := pm.namer.FrName(pm.namer.FrSuffix())
|
||||||
|
|
||||||
if firewallRule, err := cm.firewallPool.(*firewalls.FirewallRules).GetFirewall(firewallName); err != nil {
|
if firewallRule, err := cm.firewallPool.(*firewalls.FirewallRules).GetFirewall(firewallName); err != nil {
|
||||||
|
@ -290,7 +292,7 @@ func TestLbCreateDelete(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLbFaultyUpdate(t *testing.T) {
|
func TestLbFaultyUpdate(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
inputMap := map[string]utils.FakeIngressRuleValueMap{
|
inputMap := map[string]utils.FakeIngressRuleValueMap{
|
||||||
"foo.example.com": {
|
"foo.example.com": {
|
||||||
|
@ -327,7 +329,7 @@ func TestLbFaultyUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLbDefaulting(t *testing.T) {
|
func TestLbDefaulting(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
// Make sure the controller plugs in the default values accepted by GCE.
|
// Make sure the controller plugs in the default values accepted by GCE.
|
||||||
ing := newIngress(map[string]utils.FakeIngressRuleValueMap{"": {"": "foo1svc"}})
|
ing := newIngress(map[string]utils.FakeIngressRuleValueMap{"": {"": "foo1svc"}})
|
||||||
|
@ -345,7 +347,7 @@ func TestLbDefaulting(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLbNoService(t *testing.T) {
|
func TestLbNoService(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
inputMap := map[string]utils.FakeIngressRuleValueMap{
|
inputMap := map[string]utils.FakeIngressRuleValueMap{
|
||||||
"foo.example.com": {
|
"foo.example.com": {
|
||||||
|
@ -389,7 +391,7 @@ func TestLbNoService(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLbChangeStaticIP(t *testing.T) {
|
func TestLbChangeStaticIP(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
inputMap := map[string]utils.FakeIngressRuleValueMap{
|
inputMap := map[string]utils.FakeIngressRuleValueMap{
|
||||||
"foo.example.com": {
|
"foo.example.com": {
|
||||||
|
|
|
@ -44,12 +44,12 @@ type fakeClusterManager struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFakeClusterManager creates a new fake ClusterManager.
|
// NewFakeClusterManager creates a new fake ClusterManager.
|
||||||
func NewFakeClusterManager(clusterName string) *fakeClusterManager {
|
func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager {
|
||||||
fakeLbs := loadbalancers.NewFakeLoadBalancers(clusterName)
|
fakeLbs := loadbalancers.NewFakeLoadBalancers(clusterName)
|
||||||
fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil })
|
fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil })
|
||||||
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
|
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
|
||||||
fakeHCs := healthchecks.NewFakeHealthChecks()
|
fakeHCs := healthchecks.NewFakeHealthChecks()
|
||||||
namer := utils.NewNamer(clusterName)
|
namer := utils.NewNamer(clusterName, firewallName)
|
||||||
|
|
||||||
nodePool := instances.NewNodePool(fakeIGs)
|
nodePool := instances.NewNodePool(fakeIGs)
|
||||||
nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}})
|
nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}})
|
||||||
|
|
|
@ -32,7 +32,7 @@ import (
|
||||||
var firstPodCreationTime = time.Date(2006, 01, 02, 15, 04, 05, 0, time.UTC)
|
var firstPodCreationTime = time.Date(2006, 01, 02, 15, 04, 05, 0, time.UTC)
|
||||||
|
|
||||||
func TestZoneListing(t *testing.T) {
|
func TestZoneListing(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
zoneToNode := map[string][]string{
|
zoneToNode := map[string][]string{
|
||||||
"zone-1": {"n1"},
|
"zone-1": {"n1"},
|
||||||
|
@ -57,7 +57,7 @@ func TestZoneListing(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInstancesAddedToZones(t *testing.T) {
|
func TestInstancesAddedToZones(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
zoneToNode := map[string][]string{
|
zoneToNode := map[string][]string{
|
||||||
"zone-1": {"n1", "n2"},
|
"zone-1": {"n1", "n2"},
|
||||||
|
@ -92,7 +92,7 @@ func TestInstancesAddedToZones(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProbeGetter(t *testing.T) {
|
func TestProbeGetter(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
nodePortToHealthCheck := map[int64]string{
|
nodePortToHealthCheck := map[int64]string{
|
||||||
3001: "/healthz",
|
3001: "/healthz",
|
||||||
|
@ -110,7 +110,7 @@ func TestProbeGetter(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProbeGetterNamedPort(t *testing.T) {
|
func TestProbeGetterNamedPort(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
nodePortToHealthCheck := map[int64]string{
|
nodePortToHealthCheck := map[int64]string{
|
||||||
3001: "/healthz",
|
3001: "/healthz",
|
||||||
|
@ -133,7 +133,7 @@ func TestProbeGetterNamedPort(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProbeGetterCrossNamespace(t *testing.T) {
|
func TestProbeGetterCrossNamespace(t *testing.T) {
|
||||||
cm := NewFakeClusterManager(DefaultClusterUID)
|
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
|
||||||
lbc := newLoadBalancerController(t, cm, "")
|
lbc := newLoadBalancerController(t, cm, "")
|
||||||
|
|
||||||
firstPod := &api.Pod{
|
firstPod := &api.Pod{
|
||||||
|
|
|
@ -236,7 +236,8 @@ func TestUpdateUrlMapNoChanges(t *testing.T) {
|
||||||
|
|
||||||
func TestNameParsing(t *testing.T) {
|
func TestNameParsing(t *testing.T) {
|
||||||
clusterName := "123"
|
clusterName := "123"
|
||||||
namer := utils.NewNamer(clusterName)
|
firewallName := clusterName
|
||||||
|
namer := utils.NewNamer(clusterName, firewallName)
|
||||||
fullName := namer.Truncate(fmt.Sprintf("%v-%v", forwardingRulePrefix, namer.LBName("testlb")))
|
fullName := namer.Truncate(fmt.Sprintf("%v-%v", forwardingRulePrefix, namer.LBName("testlb")))
|
||||||
annotationsMap := map[string]string{
|
annotationsMap := map[string]string{
|
||||||
fmt.Sprintf("%v/forwarding-rule", utils.K8sAnnotationPrefix): fullName,
|
fmt.Sprintf("%v/forwarding-rule", utils.K8sAnnotationPrefix): fullName,
|
||||||
|
@ -308,7 +309,7 @@ func TestClusterNameChange(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInvalidClusterNameChange(t *testing.T) {
|
func TestInvalidClusterNameChange(t *testing.T) {
|
||||||
namer := utils.NewNamer("test--123")
|
namer := utils.NewNamer("test--123", "test--123")
|
||||||
if got := namer.GetClusterName(); got != "123" {
|
if got := namer.GetClusterName(); got != "123" {
|
||||||
t.Fatalf("Expected name 123, got %v", got)
|
t.Fatalf("Expected name 123, got %v", got)
|
||||||
}
|
}
|
||||||
|
|
|
@ -215,7 +215,7 @@ func main() {
|
||||||
|
|
||||||
if *inCluster || *useRealCloud {
|
if *inCluster || *useRealCloud {
|
||||||
// Create cluster manager
|
// Create cluster manager
|
||||||
namer, err := newNamer(kubeClient, *clusterName)
|
namer, err := newNamer(kubeClient, *clusterName, controller.DefaultFirewallName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("%v", err)
|
glog.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
|
@ -225,7 +225,7 @@ func main() {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Create fake cluster manager
|
// Create fake cluster manager
|
||||||
clusterManager = controller.NewFakeClusterManager(*clusterName).ClusterManager
|
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start loadbalancer controller
|
// Start loadbalancer controller
|
||||||
|
@ -247,32 +247,100 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newNamer(kubeClient client.Interface, clusterName string) (*utils.Namer, error) {
|
func newNamer(kubeClient client.Interface, clusterName string, fwName string) (*utils.Namer, error) {
|
||||||
name, err := getClusterUID(kubeClient, clusterName)
|
name, err := getClusterUID(kubeClient, clusterName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
fw_name, err := getFirewallName(kubeClient, fwName, name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
namer := utils.NewNamer(name)
|
namer := utils.NewNamer(name, fw_name)
|
||||||
vault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
|
uidVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
|
||||||
|
|
||||||
// Start a goroutine to poll the cluster UID config map
|
// Start a goroutine to poll the cluster UID config map
|
||||||
// We don't watch because we know exactly which configmap we want and this
|
// We don't watch because we know exactly which configmap we want and this
|
||||||
// controller already watches 5 other resources, so it isn't worth the cost
|
// controller already watches 5 other resources, so it isn't worth the cost
|
||||||
// of another connection and complexity.
|
// of another connection and complexity.
|
||||||
go wait.Forever(func() {
|
go wait.Forever(func() {
|
||||||
uid, found, err := vault.Get()
|
for _, key := range [...]string{storage.UidDataKey, storage.ProviderDataKey} {
|
||||||
existing := namer.GetClusterName()
|
val, found, err := uidVault.Get(key)
|
||||||
if found && uid != existing {
|
if err != nil {
|
||||||
glog.Infof("Cluster uid changed from %v -> %v", existing, uid)
|
glog.Errorf("Can't read uidConfigMap %v", uidConfigMapName)
|
||||||
namer.SetClusterName(uid)
|
} else if !found {
|
||||||
} else if err != nil {
|
errmsg := fmt.Sprintf("Can't read %v from uidConfigMap %v", key, uidConfigMapName)
|
||||||
glog.Errorf("Failed to reconcile cluster uid %v, currently set to %v", err, existing)
|
if key == storage.UidDataKey {
|
||||||
|
glog.Errorf(errmsg)
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof(errmsg)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
|
||||||
|
switch key {
|
||||||
|
case storage.UidDataKey:
|
||||||
|
if uid := namer.GetClusterName(); uid != val {
|
||||||
|
glog.Infof("Cluster uid changed from %v -> %v", uid, val)
|
||||||
|
namer.SetClusterName(val)
|
||||||
|
}
|
||||||
|
case storage.ProviderDataKey:
|
||||||
|
if fw_name := namer.GetFirewallName(); fw_name != val {
|
||||||
|
glog.Infof("Cluster firewall name changed from %v -> %v", fw_name, val)
|
||||||
|
namer.SetFirewallName(val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, 5*time.Second)
|
}, 5*time.Second)
|
||||||
return namer, nil
|
return namer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// useDefaultOrLookupVault returns either a 'default_name' or if unset, obtains a name from a ConfigMap.
|
||||||
|
// The returned value follows this priority:
|
||||||
|
// If the provided 'default_name' is not empty, that name is used.
|
||||||
|
// This is effectively a client override via a command line flag.
|
||||||
|
// else, check cfgVault with 'cm_key' as a key and if found, use the associated value
|
||||||
|
// else, return an empty 'name' and pass along an error iff the configmap lookup is erroneous.
|
||||||
|
func useDefaultOrLookupVault(cfgVault *storage.ConfigMapVault, cm_key, default_name string) (string, error) {
|
||||||
|
if default_name != "" {
|
||||||
|
glog.Infof("Using user provided %v %v", cm_key, default_name)
|
||||||
|
// Don't save the uid in the vault, so users can rollback through
|
||||||
|
// setting the accompany flag to ""
|
||||||
|
return default_name, nil
|
||||||
|
}
|
||||||
|
val, found, err := cfgVault.Get(cm_key)
|
||||||
|
if err != nil {
|
||||||
|
// This can fail because of:
|
||||||
|
// 1. No such config map - found=false, err=nil
|
||||||
|
// 2. No such key in config map - found=false, err=nil
|
||||||
|
// 3. Apiserver flake - found=false, err!=nil
|
||||||
|
// It is not safe to proceed in 3.
|
||||||
|
return "", fmt.Errorf("Failed to retrieve %v: %v, returning empty name", cm_key, err)
|
||||||
|
} else if !found {
|
||||||
|
// Not found but safe to proceed.
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
glog.Infof("Using %v = %q saved in ConfigMap", cm_key, val)
|
||||||
|
return val, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getFirewallName returns the firewall rule name to use for this cluster. For
|
||||||
|
// backwards compatibility, the firewall name will default to the cluster UID.
|
||||||
|
// Use getFlagOrLookupVault to obtain a stored or overridden value for the firewall name.
|
||||||
|
// else, use the cluster UID as a backup (this retains backwards compatibility).
|
||||||
|
func getFirewallName(kubeClient client.Interface, name, cluster_uid string) (string, error) {
|
||||||
|
cfgVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
|
||||||
|
if fw_name, err := useDefaultOrLookupVault(cfgVault, storage.ProviderDataKey, name); err != nil {
|
||||||
|
return "", err
|
||||||
|
} else if fw_name != "" {
|
||||||
|
return fw_name, cfgVault.Put(storage.ProviderDataKey, fw_name)
|
||||||
|
} else {
|
||||||
|
glog.Infof("Using cluster UID %v as firewall name", cluster_uid)
|
||||||
|
return cluster_uid, cfgVault.Put(storage.ProviderDataKey, cluster_uid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// getClusterUID returns the cluster UID. Rules for UID generation:
|
// getClusterUID returns the cluster UID. Rules for UID generation:
|
||||||
// If the user specifies a --cluster-uid param it overwrites everything
|
// If the user specifies a --cluster-uid param it overwrites everything
|
||||||
// else, check UID config map for a previously recorded uid
|
// else, check UID config map for a previously recorded uid
|
||||||
|
@ -281,26 +349,12 @@ func newNamer(kubeClient client.Interface, clusterName string) (*utils.Namer, er
|
||||||
// else, allocate a new uid
|
// else, allocate a new uid
|
||||||
func getClusterUID(kubeClient client.Interface, name string) (string, error) {
|
func getClusterUID(kubeClient client.Interface, name string) (string, error) {
|
||||||
cfgVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
|
cfgVault := storage.NewConfigMapVault(kubeClient, api.NamespaceSystem, uidConfigMapName)
|
||||||
if name != "" {
|
if name, err := useDefaultOrLookupVault(cfgVault, storage.UidDataKey, name); err != nil {
|
||||||
glog.Infof("Using user provided cluster uid %v", name)
|
return "", err
|
||||||
// Don't save the uid in the vault, so users can rollback through
|
} else if name != "" {
|
||||||
// --cluster-uid=""
|
|
||||||
return name, nil
|
return name, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
existingUID, found, err := cfgVault.Get()
|
|
||||||
if found {
|
|
||||||
glog.Infof("Using saved cluster uid %q", existingUID)
|
|
||||||
return existingUID, nil
|
|
||||||
} else if err != nil {
|
|
||||||
// This can fail because of:
|
|
||||||
// 1. No such config map - found=false, err=nil
|
|
||||||
// 2. No such key in config map - found=false, err=nil
|
|
||||||
// 3. Apiserver flake - found=false, err!=nil
|
|
||||||
// It is not safe to proceed in 3.
|
|
||||||
return "", fmt.Errorf("Failed to retrieve current uid: %v, using %q as name", err, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the cluster has an Ingress with ip
|
// Check if the cluster has an Ingress with ip
|
||||||
ings, err := kubeClient.Extensions().Ingresses(api.NamespaceAll).List(api.ListOptions{LabelSelector: labels.Everything()})
|
ings, err := kubeClient.Extensions().Ingresses(api.NamespaceAll).List(api.ListOptions{LabelSelector: labels.Everything()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -311,10 +365,10 @@ func getClusterUID(kubeClient client.Interface, name string) (string, error) {
|
||||||
if len(ing.Status.LoadBalancer.Ingress) != 0 {
|
if len(ing.Status.LoadBalancer.Ingress) != 0 {
|
||||||
c := namer.ParseName(loadbalancers.GCEResourceName(ing.Annotations, "forwarding-rule"))
|
c := namer.ParseName(loadbalancers.GCEResourceName(ing.Annotations, "forwarding-rule"))
|
||||||
if c.ClusterName != "" {
|
if c.ClusterName != "" {
|
||||||
return c.ClusterName, cfgVault.Put(c.ClusterName)
|
return c.ClusterName, cfgVault.Put(storage.UidDataKey, c.ClusterName)
|
||||||
}
|
}
|
||||||
glog.Infof("Found a working Ingress, assuming uid is empty string")
|
glog.Infof("Found a working Ingress, assuming uid is empty string")
|
||||||
return "", cfgVault.Put("")
|
return "", cfgVault.Put(storage.UidDataKey, "")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -329,7 +383,7 @@ func getClusterUID(kubeClient client.Interface, name string) (string, error) {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
uid := fmt.Sprintf("%x", b)
|
uid := fmt.Sprintf("%x", b)
|
||||||
return uid, cfgVault.Put(uid)
|
return uid, cfgVault.Put(storage.UidDataKey, uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getNodePort waits for the Service, and returns it's first node port.
|
// getNodePort waits for the Service, and returns it's first node port.
|
||||||
|
|
|
@ -19,6 +19,7 @@ package storage
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
@ -27,73 +28,86 @@ import (
|
||||||
client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
)
|
)
|
||||||
|
|
||||||
// UIDVault stores UIDs.
|
const (
|
||||||
type UIDVault interface {
|
// UidDataKey is the key used in config maps to store the UID.
|
||||||
Get() (string, bool, error)
|
UidDataKey = "uid"
|
||||||
Put(string) error
|
// ProviderDataKey is the key used in config maps to store the Provider
|
||||||
Delete() error
|
// UID which we use to ensure unique firewalls.
|
||||||
}
|
ProviderDataKey = "provider-uid"
|
||||||
|
)
|
||||||
// uidDataKey is the key used in config maps to store the UID.
|
|
||||||
const uidDataKey = "uid"
|
|
||||||
|
|
||||||
// ConfigMapVault stores cluster UIDs in config maps.
|
// ConfigMapVault stores cluster UIDs in config maps.
|
||||||
// It's a layer on top of ConfigMapStore that just implements the utils.uidVault
|
// It's a layer on top of ConfigMapStore that just implements the utils.uidVault
|
||||||
// interface.
|
// interface.
|
||||||
type ConfigMapVault struct {
|
type ConfigMapVault struct {
|
||||||
|
storeLock sync.Mutex
|
||||||
ConfigMapStore cache.Store
|
ConfigMapStore cache.Store
|
||||||
namespace string
|
namespace string
|
||||||
name string
|
name string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get retrieves the cluster UID from the cluster config map.
|
// Get retrieves the value associated to the provided 'key' from the cluster config map.
|
||||||
// If this method returns an error, it's guaranteed to be apiserver flake.
|
// If this method returns an error, it's guaranteed to be apiserver flake.
|
||||||
// If the error is a not found error it sets the boolean to false and
|
// If the error is a not found error it sets the boolean to false and
|
||||||
// returns and error of nil instead.
|
// returns and error of nil instead.
|
||||||
func (c *ConfigMapVault) Get() (string, bool, error) {
|
func (c *ConfigMapVault) Get(key string) (string, bool, error) {
|
||||||
key := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
keyStore := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
||||||
item, found, err := c.ConfigMapStore.GetByKey(key)
|
item, found, err := c.ConfigMapStore.GetByKey(keyStore)
|
||||||
if err != nil || !found {
|
if err != nil || !found {
|
||||||
return "", false, err
|
return "", false, err
|
||||||
}
|
}
|
||||||
cfg := item.(*api.ConfigMap)
|
data := item.(*api.ConfigMap).Data
|
||||||
if k, ok := cfg.Data[uidDataKey]; ok {
|
c.storeLock.Lock()
|
||||||
|
defer c.storeLock.Unlock()
|
||||||
|
if k, ok := data[key]; ok {
|
||||||
return k, true, nil
|
return k, true, nil
|
||||||
}
|
}
|
||||||
return "", false, fmt.Errorf("Found config map %v but it doesn't contain uid key: %+v", key, cfg.Data)
|
glog.Infof("Found config map %v but it doesn't contain key %v: %+v", keyStore, key, data)
|
||||||
|
return "", false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the given UID in the cluster config map.
|
// Put inserts a key/value pair in the cluster config map.
|
||||||
func (c *ConfigMapVault) Put(uid string) error {
|
// If the key already exists, the value provided is stored.
|
||||||
|
func (c *ConfigMapVault) Put(key, val string) error {
|
||||||
|
c.storeLock.Lock()
|
||||||
|
defer c.storeLock.Unlock()
|
||||||
apiObj := &api.ConfigMap{
|
apiObj := &api.ConfigMap{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Name: c.name,
|
Name: c.name,
|
||||||
Namespace: c.namespace,
|
Namespace: c.namespace,
|
||||||
},
|
},
|
||||||
Data: map[string]string{uidDataKey: uid},
|
|
||||||
}
|
}
|
||||||
cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
||||||
|
|
||||||
item, exists, err := c.ConfigMapStore.GetByKey(cfgMapKey)
|
item, exists, err := c.ConfigMapStore.GetByKey(cfgMapKey)
|
||||||
if err == nil && exists {
|
if err == nil && exists {
|
||||||
data := item.(*api.ConfigMap).Data
|
data := item.(*api.ConfigMap).Data
|
||||||
if k, ok := data[uidDataKey]; ok && k == uid {
|
existingVal, ok := data[key]
|
||||||
|
if ok && existingVal == val {
|
||||||
|
// duplicate, no need to update.
|
||||||
return nil
|
return nil
|
||||||
} else if ok {
|
|
||||||
glog.Infof("Configmap %v has key %v but wrong value %v, updating", cfgMapKey, k, uid)
|
|
||||||
}
|
}
|
||||||
|
data[key] = val
|
||||||
|
apiObj.Data = data
|
||||||
|
if existingVal != val {
|
||||||
|
glog.Infof("Configmap %v has key %v but wrong value %v, updating to %v", cfgMapKey, key, existingVal, val)
|
||||||
|
} else {
|
||||||
|
glog.Infof("Configmap %v will be updated with %v = %v", cfgMapKey, key, val)
|
||||||
|
}
|
||||||
if err := c.ConfigMapStore.Update(apiObj); err != nil {
|
if err := c.ConfigMapStore.Update(apiObj); err != nil {
|
||||||
return fmt.Errorf("Failed to update %v: %v", cfgMapKey, err)
|
return fmt.Errorf("Failed to update %v: %v", cfgMapKey, err)
|
||||||
}
|
}
|
||||||
} else if err := c.ConfigMapStore.Add(apiObj); err != nil {
|
} else {
|
||||||
return fmt.Errorf("Failed to add %v: %v", cfgMapKey, err)
|
apiObj.Data = map[string]string{key: val}
|
||||||
|
if err := c.ConfigMapStore.Add(apiObj); err != nil {
|
||||||
|
return fmt.Errorf("Failed to add %v: %v", cfgMapKey, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
glog.Infof("Successfully stored uid %q in config map %v", uid, cfgMapKey)
|
glog.Infof("Successfully stored key %v = %v in config map %v", key, val, cfgMapKey)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes the cluster UID storing config map.
|
// Delete deletes the ConfigMapStore.
|
||||||
func (c *ConfigMapVault) Delete() error {
|
func (c *ConfigMapVault) Delete() error {
|
||||||
cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name)
|
||||||
item, _, err := c.ConfigMapStore.GetByKey(cfgMapKey)
|
item, _, err := c.ConfigMapStore.GetByKey(cfgMapKey)
|
||||||
|
@ -108,13 +122,19 @@ func (c *ConfigMapVault) Delete() error {
|
||||||
// This client is essentially meant to abstract out the details of
|
// This client is essentially meant to abstract out the details of
|
||||||
// configmaps and the API, and just store/retrieve a single value, the cluster uid.
|
// configmaps and the API, and just store/retrieve a single value, the cluster uid.
|
||||||
func NewConfigMapVault(c client.Interface, uidNs, uidConfigMapName string) *ConfigMapVault {
|
func NewConfigMapVault(c client.Interface, uidNs, uidConfigMapName string) *ConfigMapVault {
|
||||||
return &ConfigMapVault{NewConfigMapStore(c), uidNs, uidConfigMapName}
|
return &ConfigMapVault{
|
||||||
|
ConfigMapStore: NewConfigMapStore(c),
|
||||||
|
namespace: uidNs,
|
||||||
|
name: uidConfigMapName}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFakeConfigMapVault is an implementation of the ConfigMapStore that doesn't
|
// NewFakeConfigMapVault is an implementation of the ConfigMapStore that doesn't
|
||||||
// persist configmaps. Only used in testing.
|
// persist configmaps. Only used in testing.
|
||||||
func NewFakeConfigMapVault(ns, name string) *ConfigMapVault {
|
func NewFakeConfigMapVault(ns, name string) *ConfigMapVault {
|
||||||
return &ConfigMapVault{cache.NewStore(cache.MetaNamespaceKeyFunc), ns, name}
|
return &ConfigMapVault{
|
||||||
|
ConfigMapStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||||
|
namespace: ns,
|
||||||
|
name: name}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConfigMapStore wraps the store interface. Implementations usually persist
|
// ConfigMapStore wraps the store interface. Implementations usually persist
|
||||||
|
|
|
@ -24,31 +24,51 @@ import (
|
||||||
|
|
||||||
func TestConfigMapUID(t *testing.T) {
|
func TestConfigMapUID(t *testing.T) {
|
||||||
vault := NewFakeConfigMapVault(api.NamespaceSystem, "ingress-uid")
|
vault := NewFakeConfigMapVault(api.NamespaceSystem, "ingress-uid")
|
||||||
uid := ""
|
// Get value from an empty vault.
|
||||||
k, exists, err := vault.Get()
|
val, exists, err := vault.Get(UidDataKey)
|
||||||
if exists {
|
if exists {
|
||||||
t.Errorf("Got a key from an empyt vault")
|
t.Errorf("Got value from an empty vault")
|
||||||
}
|
}
|
||||||
vault.Put(uid)
|
|
||||||
k, exists, err = vault.Get()
|
// Store empty value for UidDataKey.
|
||||||
|
uid := ""
|
||||||
|
vault.Put(UidDataKey, uid)
|
||||||
|
val, exists, err = vault.Get(UidDataKey)
|
||||||
if !exists || err != nil {
|
if !exists || err != nil {
|
||||||
t.Errorf("Failed to retrieve value from vault")
|
t.Errorf("Failed to retrieve value from vault: %v", err)
|
||||||
}
|
}
|
||||||
if k != "" {
|
if val != "" {
|
||||||
t.Errorf("Failed to store empty string as a key in the vault")
|
t.Errorf("Failed to store empty string as a key in the vault")
|
||||||
}
|
}
|
||||||
vault.Put("newuid")
|
|
||||||
k, exists, err = vault.Get()
|
// Store actual value in key.
|
||||||
|
storedVal := "newuid"
|
||||||
|
vault.Put(UidDataKey, storedVal)
|
||||||
|
val, exists, err = vault.Get(UidDataKey)
|
||||||
if !exists || err != nil {
|
if !exists || err != nil {
|
||||||
t.Errorf("Failed to retrieve value from vault")
|
t.Errorf("Failed to retrieve value from vault")
|
||||||
|
} else if val != storedVal {
|
||||||
|
t.Errorf("Failed to store empty string as a key in the vault")
|
||||||
}
|
}
|
||||||
if k != "newuid" {
|
|
||||||
t.Errorf("Failed to modify uid")
|
// Store second value which will have the affect of updating to Store
|
||||||
|
// rather than adding.
|
||||||
|
secondVal := "bar"
|
||||||
|
vault.Put("foo", secondVal)
|
||||||
|
val, exists, err = vault.Get("foo")
|
||||||
|
if !exists || err != nil || val != secondVal {
|
||||||
|
t.Errorf("Failed to retrieve second value from vault")
|
||||||
}
|
}
|
||||||
|
val, exists, err = vault.Get(UidDataKey)
|
||||||
|
if !exists || err != nil || val != storedVal {
|
||||||
|
t.Errorf("Failed to retrieve first value from vault")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete value.
|
||||||
if err := vault.Delete(); err != nil {
|
if err := vault.Delete(); err != nil {
|
||||||
t.Errorf("Failed to delete uid %v", err)
|
t.Errorf("Failed to delete uid %v", err)
|
||||||
}
|
}
|
||||||
if uid, exists, _ := vault.Get(); exists {
|
if _, exists, _ := vault.Get(UidDataKey); exists {
|
||||||
t.Errorf("Found uid %v, expected none", uid)
|
t.Errorf("Found uid but expected none after deletion")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,14 +92,16 @@ const (
|
||||||
|
|
||||||
// Namer handles centralized naming for the cluster.
|
// Namer handles centralized naming for the cluster.
|
||||||
type Namer struct {
|
type Namer struct {
|
||||||
clusterName string
|
clusterName string
|
||||||
nameLock sync.Mutex
|
firewallName string
|
||||||
|
nameLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNamer creates a new namer.
|
// NewNamer creates a new namer with a Cluster and Firewall name.
|
||||||
func NewNamer(clusterName string) *Namer {
|
func NewNamer(clusterName, firewallName string) *Namer {
|
||||||
namer := &Namer{}
|
namer := &Namer{}
|
||||||
namer.SetClusterName(clusterName)
|
namer.SetClusterName(clusterName)
|
||||||
|
namer.SetFirewallName(firewallName)
|
||||||
return namer
|
return namer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,6 +125,16 @@ func (n *Namer) SetClusterName(name string) {
|
||||||
n.clusterName = name
|
n.clusterName = name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetFirewallName sets the firewall name of this cluster.
|
||||||
|
func (n *Namer) SetFirewallName(firewall_name string) {
|
||||||
|
n.nameLock.Lock()
|
||||||
|
defer n.nameLock.Unlock()
|
||||||
|
if n.firewallName != firewall_name {
|
||||||
|
glog.Infof("Changing firewall name from %v to %v", n.firewallName, firewall_name)
|
||||||
|
n.firewallName = firewall_name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GetClusterName returns the UID/name of this cluster.
|
// GetClusterName returns the UID/name of this cluster.
|
||||||
func (n *Namer) GetClusterName() string {
|
func (n *Namer) GetClusterName() string {
|
||||||
n.nameLock.Lock()
|
n.nameLock.Lock()
|
||||||
|
@ -130,6 +142,18 @@ func (n *Namer) GetClusterName() string {
|
||||||
return n.clusterName
|
return n.clusterName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetFirewallName returns the firewall name of this cluster.
|
||||||
|
func (n *Namer) GetFirewallName() string {
|
||||||
|
n.nameLock.Lock()
|
||||||
|
defer n.nameLock.Unlock()
|
||||||
|
// Retain backwards compatible behavior where firewallName == clusterName.
|
||||||
|
if n.firewallName == "" {
|
||||||
|
return n.clusterName
|
||||||
|
} else {
|
||||||
|
return n.firewallName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Truncate truncates the given key to a GCE length limit.
|
// Truncate truncates the given key to a GCE length limit.
|
||||||
func (n *Namer) Truncate(key string) string {
|
func (n *Namer) Truncate(key string) string {
|
||||||
if len(key) > nameLenLimit {
|
if len(key) > nameLenLimit {
|
||||||
|
@ -216,12 +240,12 @@ func (n *Namer) IGName() string {
|
||||||
|
|
||||||
// FrSuffix constructs the glbc specific suffix for the FirewallRule.
|
// FrSuffix constructs the glbc specific suffix for the FirewallRule.
|
||||||
func (n *Namer) FrSuffix() string {
|
func (n *Namer) FrSuffix() string {
|
||||||
clusterName := n.GetClusterName()
|
firewallName := n.GetFirewallName()
|
||||||
// The entire cluster only needs a single firewall rule.
|
// The entire cluster only needs a single firewall rule.
|
||||||
if clusterName == "" {
|
if firewallName == "" {
|
||||||
return globalFirewallSuffix
|
return globalFirewallSuffix
|
||||||
}
|
}
|
||||||
return n.Truncate(fmt.Sprintf("%v%v%v", globalFirewallSuffix, clusterNameDelimiter, clusterName))
|
return n.Truncate(fmt.Sprintf("%v%v%v", globalFirewallSuffix, clusterNameDelimiter, firewallName))
|
||||||
}
|
}
|
||||||
|
|
||||||
// FrName constructs the full firewall rule name, this is the name assigned by
|
// FrName constructs the full firewall rule name, this is the name assigned by
|
||||||
|
|
Loading…
Reference in a new issue