Handle Firewall for NEG

This commit is contained in:
Minhan Xia 2017-10-02 15:37:11 -07:00
parent a6993bb449
commit 5eab04a21f
5 changed files with 203 additions and 34 deletions

View file

@ -119,7 +119,7 @@ func (c *ClusterManager) shutdown() error {
// Returns the list of all instance groups corresponding to the given loadbalancers.
// If in performing the checkpoint the cluster manager runs out of quota, a
// googleapi 403 is returned.
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, firewallPorts []int64) ([]*compute.InstanceGroup, error) {
if len(namedPorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
namedPorts = append(namedPorts, c.defaultBackendNodePort)
@ -144,22 +144,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
return igs, err
}
// TODO: Manage default backend and its firewall rule in a centralized way.
// DefaultBackend is managed in l7 pool, which doesn't understand instances,
// which the firewall rule requires.
fwNodePorts := backendServicePorts
if len(lbs) != 0 {
// If there are no Ingresses, we shouldn't be allowing traffic to the
// default backend. Equally importantly if the cluster gets torn down
// we shouldn't leak the firewall rule.
fwNodePorts = append(fwNodePorts, c.defaultBackendNodePort)
}
var np []int64
for _, p := range fwNodePorts {
np = append(np, p.Port)
}
if err := c.firewallPool.Sync(np, nodeNames); err != nil {
if err := c.firewallPool.Sync(firewallPorts, nodeNames); err != nil {
return igs, err
}

View file

@ -58,15 +58,18 @@ var (
type LoadBalancerController struct {
client kubernetes.Interface
ingressSynced cache.InformerSynced
serviceSynced cache.InformerSynced
podSynced cache.InformerSynced
nodeSynced cache.InformerSynced
ingLister StoreToIngressLister
nodeLister StoreToNodeLister
svcLister StoreToServiceLister
ingressSynced cache.InformerSynced
serviceSynced cache.InformerSynced
podSynced cache.InformerSynced
nodeSynced cache.InformerSynced
endpointSynced cache.InformerSynced
ingLister StoreToIngressLister
nodeLister StoreToNodeLister
svcLister StoreToServiceLister
// Health checks are the readiness probes of containers on pods.
podLister StoreToPodLister
// endpoint lister is needed when translating service target port to real endpoint target ports.
endpointLister StoreToEndpointLister
// TODO: Watch secrets
CloudClusterManager *ClusterManager
recorder record.EventRecorder
@ -115,11 +118,17 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *utils.Contr
lbc.serviceSynced = ctx.ServiceInformer.HasSynced
lbc.podSynced = ctx.PodInformer.HasSynced
lbc.nodeSynced = ctx.NodeInformer.HasSynced
lbc.endpointSynced = func() bool { return true }
lbc.ingLister.Store = ctx.IngressInformer.GetStore()
lbc.svcLister.Indexer = ctx.ServiceInformer.GetIndexer()
lbc.podLister.Indexer = ctx.PodInformer.GetIndexer()
lbc.nodeLister.Indexer = ctx.NodeInformer.GetIndexer()
if negEnabled {
lbc.endpointSynced = ctx.EndpointInformer.HasSynced
lbc.endpointLister.Indexer = ctx.EndpointInformer.GetIndexer()
}
// ingress event handler
ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -239,7 +248,8 @@ func (lbc *LoadBalancerController) storesSynced() bool {
// group just because we don't realize there are nodes in that zone.
lbc.nodeSynced() &&
// Wait for ingresses as a safety measure. We don't really need this.
lbc.ingressSynced())
lbc.ingressSynced() &&
lbc.endpointSynced())
}
// sync manages Ingress create/updates/deletes.
@ -294,7 +304,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
}()
// Record any errors during sync and throw a single error at the end. This
// allows us to free up associated cloud resources ASAP.
igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts)
igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts, lbc.Translator.gatherFirewallPorts(gceNodePorts, len(lbs) > 0))
if err != nil {
// TODO: Implement proper backoff for the queue.
eventMsg := "GCE"

View file

@ -278,6 +278,11 @@ type StoreToPodLister struct {
cache.Indexer
}
// StoreToPodLister makes a Store that lists Endpoints.
type StoreToEndpointLister struct {
cache.Indexer
}
// List returns a list of all pods based on selector
func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api_v1.Pod, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
@ -358,6 +363,42 @@ IngressLoop:
return
}
func (s *StoreToEndpointLister) ListEndpointTargetPorts(namespace, name, targetPort string) []int {
// if targetPort is integer, no need to translate to endpoint ports
i, _ := strconv.Atoi(targetPort)
if i != 0 {
return []int{i}
}
ep, exists, err := s.Indexer.Get(
&api_v1.Endpoints{
ObjectMeta: meta_v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
},
)
ret := []int{}
if !exists {
glog.Errorf("Endpoint object %v/%v does not exists.", namespace, name)
return ret
}
if err != nil {
glog.Errorf("Failed to retrieve endpoint object %v/%v: %v", namespace, name, err)
return ret
}
for _, subset := range ep.(*api_v1.Endpoints).Subsets {
for _, port := range subset.Ports {
if port.Protocol == api_v1.ProtocolTCP && port.Name == targetPort {
ret = append(ret, int(port.Port))
}
}
}
return ret
}
// GCETranslator helps with kubernetes -> gce api conversion.
type GCETranslator struct {
*LoadBalancerController
@ -489,10 +530,12 @@ PortLoop:
}
p := backends.ServicePort{
Port: int64(port.NodePort),
Protocol: proto,
SvcName: types.NamespacedName{Namespace: namespace, Name: be.ServiceName},
SvcPort: be.ServicePort,
Port: int64(port.NodePort),
Protocol: proto,
SvcName: types.NamespacedName{Namespace: namespace, Name: be.ServiceName},
SvcPort: be.ServicePort,
SvcTargetPort: port.TargetPort.String(),
NEGEnabled: t.negEnabled && utils.NEGEnabled(svc.Annotations),
}
return p, nil
}
@ -623,6 +666,38 @@ func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOr
return nil, nil
}
// gatherFirewallPorts returns all ports needed for open for ingress.
// It gathers both node ports (for IG backends) and target ports (for NEG backends).
func (t *GCETranslator) gatherFirewallPorts(svcPorts []backends.ServicePort, includeDefaultBackend bool) []int64 {
// TODO: Manage default backend and its firewall rule in a centralized way.
// DefaultBackend is managed in l7 pool, which doesn't understand instances,
// which the firewall rule requires.
if includeDefaultBackend {
svcPorts = append(svcPorts, t.CloudClusterManager.defaultBackendNodePort)
}
portMap := map[int64]bool{}
for _, p := range svcPorts {
if p.NEGEnabled {
// For NEG backend, need to open firewall to all endpoint target ports
// TODO(mixia): refactor firewall syncing into a separate go routine with different trigger.
// With NEG, endpoint changes may cause firewall ports to be different if user specifies inconsistent backends.
endpointPorts := t.endpointLister.ListEndpointTargetPorts(p.SvcName.Namespace, p.SvcName.Name, p.SvcTargetPort)
for _, ep := range endpointPorts {
portMap[int64(ep)] = true
}
} else {
// For IG backend, need to open service node port.
portMap[p.Port] = true
}
}
var np []int64
for p := range portMap {
np = append(np, p)
}
return np
}
// isSimpleHTTPProbe returns true if the given Probe is:
// - an HTTPGet probe, as opposed to a tcp or exec probe
// - has no special host or headers fields, except for possibly an HTTP Host header

View file

@ -25,6 +25,7 @@ import (
api_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress/controllers/gce/backends"
@ -273,7 +274,7 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) {
}{
{
// Single zone.
[]*compute.InstanceGroup{&compute.InstanceGroup{
[]*compute.InstanceGroup{{
Name: "ig-name",
Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b",
}},
@ -282,11 +283,11 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) {
{
// Multiple zones.
[]*compute.InstanceGroup{
&compute.InstanceGroup{
{
Name: "ig-name-1",
Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b",
},
&compute.InstanceGroup{
{
Name: "ig-name-2",
Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-a",
},
@ -305,3 +306,101 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) {
}
}
}
func TestGatherFirewallPorts(t *testing.T) {
cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName)
lbc := newLoadBalancerController(t, cm)
lbc.CloudClusterManager.defaultBackendNodePort.Port = int64(30000)
ep1 := "ep1"
ep2 := "ep2"
svcPorts := []backends.ServicePort{
{Port: int64(30001)},
{Port: int64(30002)},
{
SvcName: types.NamespacedName{
"ns",
ep1,
},
Port: int64(30003),
NEGEnabled: true,
SvcTargetPort: "80",
},
{
SvcName: types.NamespacedName{
"ns",
ep2,
},
Port: int64(30004),
NEGEnabled: true,
SvcTargetPort: "named-port",
},
}
lbc.endpointLister.Indexer.Add(getDefaultEndpoint(ep1))
lbc.endpointLister.Indexer.Add(getDefaultEndpoint(ep2))
res := lbc.Translator.gatherFirewallPorts(svcPorts, true)
expect := map[int64]bool{
int64(30000): true,
int64(30001): true,
int64(30002): true,
int64(80): true,
int64(8080): true,
int64(8081): true,
}
if len(res) != len(expect) {
t.Errorf("Expect firewall ports %v, but got %v", expect, res)
}
for _, p := range res {
if _, ok := expect[p]; !ok {
t.Errorf("Expect firewall port %v, but not found.", p)
}
}
}
func getDefaultEndpoint(name string) *api_v1.Endpoints {
return &api_v1.Endpoints{
ObjectMeta: meta_v1.ObjectMeta{
Name: name,
Namespace: "ns",
},
Subsets: []api_v1.EndpointSubset{
{
Ports: []api_v1.EndpointPort{
{
Name: "",
Port: int32(80),
Protocol: api_v1.ProtocolTCP,
},
{
Name: "named-port",
Port: int32(8080),
Protocol: api_v1.ProtocolTCP,
},
},
},
{
Ports: []api_v1.EndpointPort{
{
Name: "named-port",
Port: int32(80),
Protocol: api_v1.ProtocolTCP,
},
},
},
{
Ports: []api_v1.EndpointPort{
{
Name: "named-port",
Port: int32(8081),
Protocol: api_v1.ProtocolTCP,
},
},
},
},
}
}

View file

@ -288,7 +288,7 @@ func main() {
// Create fake cluster manager
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
}
ctx := utils.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup))
// Start loadbalancer controller
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup))