diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index e987711b9..613ece598 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -119,7 +119,7 @@ func (c *ClusterManager) shutdown() error { // Returns the list of all instance groups corresponding to the given loadbalancers. // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. -func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { +func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, firewallPorts []int64) ([]*compute.InstanceGroup, error) { if len(namedPorts) != 0 { // Add the default backend node port to the list of named ports for instance groups. namedPorts = append(namedPorts, c.defaultBackendNodePort) @@ -144,22 +144,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName return igs, err } - // TODO: Manage default backend and its firewall rule in a centralized way. - // DefaultBackend is managed in l7 pool, which doesn't understand instances, - // which the firewall rule requires. - fwNodePorts := backendServicePorts - if len(lbs) != 0 { - // If there are no Ingresses, we shouldn't be allowing traffic to the - // default backend. Equally importantly if the cluster gets torn down - // we shouldn't leak the firewall rule. - fwNodePorts = append(fwNodePorts, c.defaultBackendNodePort) - } - - var np []int64 - for _, p := range fwNodePorts { - np = append(np, p.Port) - } - if err := c.firewallPool.Sync(np, nodeNames); err != nil { + if err := c.firewallPool.Sync(firewallPorts, nodeNames); err != nil { return igs, err } diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index b289e4153..22844d11c 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -58,15 +58,18 @@ var ( type LoadBalancerController struct { client kubernetes.Interface - ingressSynced cache.InformerSynced - serviceSynced cache.InformerSynced - podSynced cache.InformerSynced - nodeSynced cache.InformerSynced - ingLister StoreToIngressLister - nodeLister StoreToNodeLister - svcLister StoreToServiceLister + ingressSynced cache.InformerSynced + serviceSynced cache.InformerSynced + podSynced cache.InformerSynced + nodeSynced cache.InformerSynced + endpointSynced cache.InformerSynced + ingLister StoreToIngressLister + nodeLister StoreToNodeLister + svcLister StoreToServiceLister // Health checks are the readiness probes of containers on pods. podLister StoreToPodLister + // endpoint lister is needed when translating service target port to real endpoint target ports. + endpointLister StoreToEndpointLister // TODO: Watch secrets CloudClusterManager *ClusterManager recorder record.EventRecorder @@ -115,11 +118,17 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *utils.Contr lbc.serviceSynced = ctx.ServiceInformer.HasSynced lbc.podSynced = ctx.PodInformer.HasSynced lbc.nodeSynced = ctx.NodeInformer.HasSynced + lbc.endpointSynced = func() bool { return true } lbc.ingLister.Store = ctx.IngressInformer.GetStore() lbc.svcLister.Indexer = ctx.ServiceInformer.GetIndexer() lbc.podLister.Indexer = ctx.PodInformer.GetIndexer() lbc.nodeLister.Indexer = ctx.NodeInformer.GetIndexer() + if negEnabled { + lbc.endpointSynced = ctx.EndpointInformer.HasSynced + lbc.endpointLister.Indexer = ctx.EndpointInformer.GetIndexer() + } + // ingress event handler ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -239,7 +248,8 @@ func (lbc *LoadBalancerController) storesSynced() bool { // group just because we don't realize there are nodes in that zone. lbc.nodeSynced() && // Wait for ingresses as a safety measure. We don't really need this. - lbc.ingressSynced()) + lbc.ingressSynced() && + lbc.endpointSynced()) } // sync manages Ingress create/updates/deletes. @@ -294,7 +304,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { }() // Record any errors during sync and throw a single error at the end. This // allows us to free up associated cloud resources ASAP. - igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts) + igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts, lbc.Translator.gatherFirewallPorts(gceNodePorts, len(lbs) > 0)) if err != nil { // TODO: Implement proper backoff for the queue. eventMsg := "GCE" diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index d313097f5..b315cb01c 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -278,6 +278,11 @@ type StoreToPodLister struct { cache.Indexer } +// StoreToPodLister makes a Store that lists Endpoints. +type StoreToEndpointLister struct { + cache.Indexer +} + // List returns a list of all pods based on selector func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api_v1.Pod, err error) { err = ListAll(s.Indexer, selector, func(m interface{}) { @@ -358,6 +363,42 @@ IngressLoop: return } +func (s *StoreToEndpointLister) ListEndpointTargetPorts(namespace, name, targetPort string) []int { + // if targetPort is integer, no need to translate to endpoint ports + i, _ := strconv.Atoi(targetPort) + if i != 0 { + return []int{i} + } + + ep, exists, err := s.Indexer.Get( + &api_v1.Endpoints{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + ) + ret := []int{} + + if !exists { + glog.Errorf("Endpoint object %v/%v does not exists.", namespace, name) + return ret + } + if err != nil { + glog.Errorf("Failed to retrieve endpoint object %v/%v: %v", namespace, name, err) + return ret + } + + for _, subset := range ep.(*api_v1.Endpoints).Subsets { + for _, port := range subset.Ports { + if port.Protocol == api_v1.ProtocolTCP && port.Name == targetPort { + ret = append(ret, int(port.Port)) + } + } + } + return ret +} + // GCETranslator helps with kubernetes -> gce api conversion. type GCETranslator struct { *LoadBalancerController @@ -489,10 +530,12 @@ PortLoop: } p := backends.ServicePort{ - Port: int64(port.NodePort), - Protocol: proto, - SvcName: types.NamespacedName{Namespace: namespace, Name: be.ServiceName}, - SvcPort: be.ServicePort, + Port: int64(port.NodePort), + Protocol: proto, + SvcName: types.NamespacedName{Namespace: namespace, Name: be.ServiceName}, + SvcPort: be.ServicePort, + SvcTargetPort: port.TargetPort.String(), + NEGEnabled: t.negEnabled && utils.NEGEnabled(svc.Annotations), } return p, nil } @@ -623,6 +666,38 @@ func (t *GCETranslator) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOr return nil, nil } +// gatherFirewallPorts returns all ports needed for open for ingress. +// It gathers both node ports (for IG backends) and target ports (for NEG backends). +func (t *GCETranslator) gatherFirewallPorts(svcPorts []backends.ServicePort, includeDefaultBackend bool) []int64 { + // TODO: Manage default backend and its firewall rule in a centralized way. + // DefaultBackend is managed in l7 pool, which doesn't understand instances, + // which the firewall rule requires. + if includeDefaultBackend { + svcPorts = append(svcPorts, t.CloudClusterManager.defaultBackendNodePort) + } + portMap := map[int64]bool{} + for _, p := range svcPorts { + if p.NEGEnabled { + // For NEG backend, need to open firewall to all endpoint target ports + // TODO(mixia): refactor firewall syncing into a separate go routine with different trigger. + // With NEG, endpoint changes may cause firewall ports to be different if user specifies inconsistent backends. + endpointPorts := t.endpointLister.ListEndpointTargetPorts(p.SvcName.Namespace, p.SvcName.Name, p.SvcTargetPort) + for _, ep := range endpointPorts { + portMap[int64(ep)] = true + } + } else { + // For IG backend, need to open service node port. + portMap[p.Port] = true + } + } + + var np []int64 + for p := range portMap { + np = append(np, p) + } + return np +} + // isSimpleHTTPProbe returns true if the given Probe is: // - an HTTPGet probe, as opposed to a tcp or exec probe // - has no special host or headers fields, except for possibly an HTTP Host header diff --git a/controllers/gce/controller/utils_test.go b/controllers/gce/controller/utils_test.go index 944a60fc6..7eae43a35 100644 --- a/controllers/gce/controller/utils_test.go +++ b/controllers/gce/controller/utils_test.go @@ -25,6 +25,7 @@ import ( api_v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress/controllers/gce/backends" @@ -273,7 +274,7 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) { }{ { // Single zone. - []*compute.InstanceGroup{&compute.InstanceGroup{ + []*compute.InstanceGroup{{ Name: "ig-name", Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b", }}, @@ -282,11 +283,11 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) { { // Multiple zones. []*compute.InstanceGroup{ - &compute.InstanceGroup{ + { Name: "ig-name-1", Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b", }, - &compute.InstanceGroup{ + { Name: "ig-name-2", Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-a", }, @@ -305,3 +306,101 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) { } } } + +func TestGatherFirewallPorts(t *testing.T) { + cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + lbc := newLoadBalancerController(t, cm) + lbc.CloudClusterManager.defaultBackendNodePort.Port = int64(30000) + + ep1 := "ep1" + ep2 := "ep2" + + svcPorts := []backends.ServicePort{ + {Port: int64(30001)}, + {Port: int64(30002)}, + { + SvcName: types.NamespacedName{ + "ns", + ep1, + }, + Port: int64(30003), + NEGEnabled: true, + SvcTargetPort: "80", + }, + { + SvcName: types.NamespacedName{ + "ns", + ep2, + }, + Port: int64(30004), + NEGEnabled: true, + SvcTargetPort: "named-port", + }, + } + + lbc.endpointLister.Indexer.Add(getDefaultEndpoint(ep1)) + lbc.endpointLister.Indexer.Add(getDefaultEndpoint(ep2)) + + + res := lbc.Translator.gatherFirewallPorts(svcPorts, true) + expect := map[int64]bool{ + int64(30000): true, + int64(30001): true, + int64(30002): true, + int64(80): true, + int64(8080): true, + int64(8081): true, + } + if len(res) != len(expect) { + t.Errorf("Expect firewall ports %v, but got %v", expect, res) + } + + for _, p := range res { + if _, ok := expect[p]; !ok { + t.Errorf("Expect firewall port %v, but not found.", p) + } + } +} + +func getDefaultEndpoint(name string) *api_v1.Endpoints { + return &api_v1.Endpoints{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: name, + Namespace: "ns", + }, + Subsets: []api_v1.EndpointSubset{ + { + Ports: []api_v1.EndpointPort{ + { + Name: "", + Port: int32(80), + Protocol: api_v1.ProtocolTCP, + }, + { + Name: "named-port", + Port: int32(8080), + Protocol: api_v1.ProtocolTCP, + }, + }, + }, + { + Ports: []api_v1.EndpointPort{ + { + Name: "named-port", + Port: int32(80), + Protocol: api_v1.ProtocolTCP, + }, + }, + }, + { + Ports: []api_v1.EndpointPort{ + { + Name: "named-port", + Port: int32(8081), + Protocol: api_v1.ProtocolTCP, + }, + }, + }, + }, + } +} diff --git a/controllers/gce/main.go b/controllers/gce/main.go index 8d05f583e..cf5a79397 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -288,7 +288,7 @@ func main() { // Create fake cluster manager clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager } - + ctx := utils.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)) // Start loadbalancer controller lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup))