diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 642c3fd22..5f613fb2c 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -22,6 +22,7 @@ import ( "time" "k8s.io/contrib/ingress/controllers/gce/backends" + "k8s.io/contrib/ingress/controllers/gce/firewalls" "k8s.io/contrib/ingress/controllers/gce/healthchecks" "k8s.io/contrib/ingress/controllers/gce/instances" "k8s.io/contrib/ingress/controllers/gce/loadbalancers" @@ -70,6 +71,7 @@ type ClusterManager struct { instancePool instances.NodePool backendPool backends.BackendPool l7Pool loadbalancers.LoadBalancerPool + firewallPool firewalls.SingleFirewallPool } // IsHealthy returns an error if the cluster manager is unhealthy. @@ -92,6 +94,9 @@ func (c *ClusterManager) shutdown() error { if err := c.l7Pool.Shutdown(); err != nil { return err } + if err := c.firewallPool.Shutdown(); err != nil { + return err + } // The backend pool will also delete instance groups. return c.backendPool.Shutdown() } @@ -107,6 +112,17 @@ func (c *ClusterManager) shutdown() error { // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []int64) error { + // Multiple ingress paths can point to the same service (and hence nodePort) + // but each nodePort can only have one set of cloud resources behind it. So + // don't waste time double validating GCE BackendServices. + portMap := map[int64]struct{}{} + for _, p := range nodePorts { + portMap[p] = struct{}{} + } + nodePorts = []int64{} + for p, _ := range portMap { + nodePorts = append(nodePorts, p) + } if err := c.backendPool.Sync(nodePorts); err != nil { return err } @@ -116,6 +132,21 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName if err := c.l7Pool.Sync(lbs); err != nil { return err } + + // TODO: Manage default backend and its firewall rule in a centralized way. + // DefaultBackend is managed in l7 pool, which doesn't understand instances, + // which the firewall rule requires. + fwNodePorts := nodePorts + if len(fwNodePorts) != 0 { + // If there are no Ingresses, we shouldn't be allowing traffic to the + // default backend. Equally importantly if the cluster gets torn down + // we shouldn't leak the firewall rule. + fwNodePorts = append(fwNodePorts, c.defaultBackendNodePort) + } + if err := c.firewallPool.Sync(fwNodePorts, nodeNames); err != nil { + return err + } + return nil } @@ -213,6 +244,6 @@ func NewClusterManager( cluster.defaultBackendNodePort = defaultBackendNodePort cluster.l7Pool = loadbalancers.NewLoadBalancerPool( cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) - + cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer) return &cluster, nil } diff --git a/controllers/gce/controller/controller_test.go b/controllers/gce/controller/controller_test.go index 83e01cbfe..d752b2a73 100644 --- a/controllers/gce/controller/controller_test.go +++ b/controllers/gce/controller/controller_test.go @@ -23,6 +23,7 @@ import ( "time" compute "google.golang.org/api/compute/v1" + "k8s.io/contrib/ingress/controllers/gce/firewalls" "k8s.io/contrib/ingress/controllers/gce/loadbalancers" "k8s.io/contrib/ingress/controllers/gce/utils" "k8s.io/kubernetes/pkg/api" @@ -32,6 +33,7 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/sets" ) const testClusterName = "testcluster" @@ -234,11 +236,27 @@ func TestLbCreateDelete(t *testing.T) { // we shouldn't pull shared backends out from existing loadbalancers. unexpected := []int{pm.portMap["foo2svc"], pm.portMap["bar2svc"]} expected := []int{pm.portMap["foo1svc"], pm.portMap["bar1svc"]} + firewallPorts := sets.NewString() + firewallName := pm.namer.FrName(pm.namer.FrSuffix()) + + if firewallRule, err := cm.firewallPool.(*firewalls.FirewallRules).GetFirewall(firewallName); err != nil { + t.Fatalf("%v", err) + } else { + if len(firewallRule.Allowed) != 1 { + t.Fatalf("Expected a single firewall rule") + } + for _, p := range firewallRule.Allowed[0].Ports { + firewallPorts.Insert(p) + } + } for _, port := range expected { if _, err := cm.backendPool.Get(int64(port)); err != nil { t.Fatalf("%v", err) } + if !firewallPorts.Has(fmt.Sprintf("%v", port)) { + t.Fatalf("Expected a firewall rule for port %v", port) + } } for _, port := range unexpected { if be, err := cm.backendPool.Get(int64(port)); err == nil { @@ -263,6 +281,9 @@ func TestLbCreateDelete(t *testing.T) { t.Fatalf("Found unexpected loadbalandcer %+v: %v", l7, err) } } + if firewallRule, err := cm.firewallPool.(*firewalls.FirewallRules).GetFirewall(firewallName); err == nil { + t.Fatalf("Found unexpected firewall rule %v", firewallRule) + } } func TestLbFaultyUpdate(t *testing.T) { diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index 9b036164b..cc5341714 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -18,6 +18,7 @@ package controller import ( "k8s.io/contrib/ingress/controllers/gce/backends" + "k8s.io/contrib/ingress/controllers/gce/firewalls" "k8s.io/contrib/ingress/controllers/gce/healthchecks" "k8s.io/contrib/ingress/controllers/gce/instances" "k8s.io/contrib/ingress/controllers/gce/loadbalancers" @@ -60,11 +61,13 @@ func NewFakeClusterManager(clusterName string) *fakeClusterManager { testDefaultBeNodePort, namer, ) + frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallRules(), namer) cm := &ClusterManager{ ClusterNamer: namer, instancePool: nodePool, backendPool: backendPool, l7Pool: l7Pool, + firewallPool: frPool, } return &fakeClusterManager{cm, fakeLbs, fakeBackends, fakeIGs} } diff --git a/controllers/gce/firewalls/fakes.go b/controllers/gce/firewalls/fakes.go new file mode 100644 index 000000000..236d26e0e --- /dev/null +++ b/controllers/gce/firewalls/fakes.go @@ -0,0 +1,104 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package firewalls + +import ( + "fmt" + + compute "google.golang.org/api/compute/v1" + "k8s.io/contrib/ingress/controllers/gce/utils" + netset "k8s.io/kubernetes/pkg/util/net/sets" +) + +type fakeFirewallRules struct { + fw []*compute.Firewall + namer utils.Namer +} + +func (f *fakeFirewallRules) GetFirewall(name string) (*compute.Firewall, error) { + for _, rule := range f.fw { + if rule.Name == name { + return rule, nil + } + } + return nil, fmt.Errorf("Firewall rule %v not found.", name) +} + +func (f *fakeFirewallRules) CreateFirewall(name, msgTag string, srcRange netset.IPNet, ports []int64, hosts []string) error { + strPorts := []string{} + for _, p := range ports { + strPorts = append(strPorts, fmt.Sprintf("%v", p)) + } + f.fw = append(f.fw, &compute.Firewall{ + // To accurately mimic the cloudprovider we need to add the k8s-fw + // prefix to the given rule name. + Name: f.namer.FrName(name), + SourceRanges: srcRange.StringSlice(), + Allowed: []*compute.FirewallAllowed{&compute.FirewallAllowed{Ports: strPorts}}, + }) + return nil +} + +func (f *fakeFirewallRules) DeleteFirewall(name string) error { + firewalls := []*compute.Firewall{} + exists := false + // We need the full name for the same reason as CreateFirewall. + name = f.namer.FrName(name) + for _, rule := range f.fw { + if rule.Name == name { + exists = true + continue + } + firewalls = append(firewalls, rule) + } + if !exists { + return fmt.Errorf("Failed to find health check %v", name) + } + f.fw = firewalls + return nil +} + +func (f *fakeFirewallRules) UpdateFirewall(name, msgTag string, srcRange netset.IPNet, ports []int64, hosts []string) error { + var exists bool + strPorts := []string{} + for _, p := range ports { + strPorts = append(strPorts, fmt.Sprintf("%v", p)) + } + + // To accurately mimic the cloudprovider we need to add the k8s-fw + // prefix to the given rule name. + name = f.namer.FrName(name) + for i := range f.fw { + if f.fw[i].Name == name { + exists = true + f.fw[i] = &compute.Firewall{ + Name: name, + SourceRanges: srcRange.StringSlice(), + Allowed: []*compute.FirewallAllowed{&compute.FirewallAllowed{Ports: strPorts}}, + } + } + } + if exists { + return nil + } + return fmt.Errorf("Update failed for rule %v, srcRange %v ports %v, rule not found", name, srcRange, ports) +} + +// NewFakeFirewallRules creates a fake for firewall rules. +func NewFakeFirewallRules() *fakeFirewallRules { + return &fakeFirewallRules{fw: []*compute.Firewall{}, namer: utils.Namer{}} +} diff --git a/controllers/gce/firewalls/firewalls.go b/controllers/gce/firewalls/firewalls.go new file mode 100644 index 000000000..b111aed1e --- /dev/null +++ b/controllers/gce/firewalls/firewalls.go @@ -0,0 +1,78 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package firewalls + +import ( + "github.com/golang/glog" + + compute "google.golang.org/api/compute/v1" + "k8s.io/contrib/ingress/controllers/gce/utils" + netset "k8s.io/kubernetes/pkg/util/net/sets" +) + +// Src range from which the GCE L7 performs health checks. +const l7SrcRange = "130.211.0.0/22" + +// FirewallRules manages firewall rules. +type FirewallRules struct { + cloud Firewall + namer utils.Namer + srcRange netset.IPNet +} + +// NewFirewallPool creates a new firewall rule manager. +// cloud: the cloud object implementing Firewall. +// namer: cluster namer. +func NewFirewallPool(cloud Firewall, namer utils.Namer) SingleFirewallPool { + srcNetSet, err := netset.ParseIPNets(l7SrcRange) + if err != nil { + glog.Fatalf("Could not parse L7 src range %v for firewall rule: %v", l7SrcRange, err) + } + return &FirewallRules{cloud: cloud, namer: namer, srcRange: srcNetSet} +} + +// Sync sync firewall rules with the cloud. +func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error { + if len(nodePorts) == 0 { + return fr.Shutdown() + } + // Firewall rule prefix must match that inserted by the gce library. + suffix := fr.namer.FrSuffix() + // TODO: Fix upstream gce cloudprovider lib so GET also takes the suffix + // instead of the whole name. + name := fr.namer.FrName(suffix) + rule, _ := fr.cloud.GetFirewall(name) + if rule == nil { + glog.Infof("Creating global l7 firewall rule %v", name) + return fr.cloud.CreateFirewall(suffix, "GCE L7 firewall rule", fr.srcRange, nodePorts, nodeNames) + } + glog.V(3).Infof("Firewall rule already %v exists, verifying for nodeports %v", name, nodePorts) + return fr.cloud.UpdateFirewall(suffix, "GCE L7 firewall rule", fr.srcRange, nodePorts, nodeNames) +} + +// Shutdown shuts down this firewall rules manager. +func (fr *FirewallRules) Shutdown() error { + glog.Infof("Deleting fireawll rule with suffix %v", fr.namer.FrSuffix()) + return fr.cloud.DeleteFirewall(fr.namer.FrSuffix()) +} + +// GetFirewall just returns the firewall object corresponding to the given name. +// TODO: Currently only used in testing. Modify so we don't leak compute +// objects out of this interface by returning just the (src, ports, error). +func (fr *FirewallRules) GetFirewall(name string) (*compute.Firewall, error) { + return fr.cloud.GetFirewall(name) +} diff --git a/controllers/gce/firewalls/interfaces.go b/controllers/gce/firewalls/interfaces.go new file mode 100644 index 000000000..93dac65bb --- /dev/null +++ b/controllers/gce/firewalls/interfaces.go @@ -0,0 +1,39 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package firewalls + +import ( + compute "google.golang.org/api/compute/v1" + netset "k8s.io/kubernetes/pkg/util/net/sets" +) + +// SingleFirewallPool syncs the firewall rule for L7 traffic. +type SingleFirewallPool interface { + // TODO: Take a list of node ports for the firewall. + Sync(nodePorts []int64, nodeNames []string) error + Shutdown() error +} + +// Firewall interfaces with the GCE firewall api. +// This interface is a little different from the rest because it dovetails into +// the same firewall methods used by the TCPLoadBalancer. +type Firewall interface { + CreateFirewall(name, msgTag string, srcRange netset.IPNet, ports []int64, hosts []string) error + GetFirewall(name string) (*compute.Firewall, error) + DeleteFirewall(name string) error + UpdateFirewall(name, msgTag string, srcRange netset.IPNet, ports []int64, hosts []string) error +} diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index a7dd117c7..b0a6c5145 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -54,6 +54,11 @@ const ( // Prefix used for instance groups involved in L7 balancing. igPrefix = "k8s-ig" + // Suffix used in the l7 firewall rule. There is currently only one. + // Note that this name is used by the cloudprovider lib that inserts its + // own k8s-fw prefix. + globalFirewallSuffix = "l7" + // A delimiter used for clarity in naming GCE resources. clusterNameDelimiter = "--" @@ -145,6 +150,22 @@ func (n *Namer) IGName() string { return n.decorateName(igPrefix) } +// FrSuffix constructs the glbc specific suffix for the FirewallRule. +func (n *Namer) FrSuffix() string { + // The entire cluster only needs a single firewall rule. + if n.ClusterName == "" { + return globalFirewallSuffix + } + return n.Truncate(fmt.Sprintf("%v%v%v", globalFirewallSuffix, clusterNameDelimiter, n.ClusterName)) +} + +// FrName constructs the full firewall rule name, this is the name assigned by +// the cloudprovider lib + suffix from glbc, so we don't mix this rule with a +// rule created for L4 loadbalancing. +func (n *Namer) FrName(suffix string) string { + return fmt.Sprintf("k8s-fw-%s", suffix) +} + // LBName constructs a loadbalancer name from the given key. The key is usually // the namespace/name of a Kubernetes Ingress. func (n *Namer) LBName(key string) string {