diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index e987711b9..76c657047 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -22,6 +22,9 @@ import ( "github.com/golang/glog" compute "google.golang.org/api/compute/v1" + apiv1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/client-go/tools/record" gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/ingress/controllers/gce/backends" @@ -119,7 +122,7 @@ func (c *ClusterManager) shutdown() error { // Returns the list of all instance groups corresponding to the given loadbalancers. // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. -func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { +func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, currentIngress *extensions.Ingress) ([]*compute.InstanceGroup, error) { if len(namedPorts) != 0 { // Add the default backend node port to the list of named ports for instance groups. namedPorts = append(namedPorts, c.defaultBackendNodePort) @@ -159,7 +162,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName for _, p := range fwNodePorts { np = append(np, p.Port) } - if err := c.firewallPool.Sync(np, nodeNames); err != nil { + if err := c.firewallPool.Sync(np, nodeNames, currentIngress); err != nil { return igs, err } @@ -233,7 +236,8 @@ func NewClusterManager( cloud *gce.GCECloud, namer *utils.Namer, defaultBackendNodePort backends.ServicePort, - defaultHealthCheckPath string) (*ClusterManager, error) { + defaultHealthCheckPath string, + recorder record.EventRecorder) (*ClusterManager, error) { // Names are fundamental to the cluster, the uid allocator makes sure names don't collide. cluster := ClusterManager{ClusterNamer: namer} @@ -255,6 +259,11 @@ func NewClusterManager( // L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs. cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer) - cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer) + + eventRaiser := func(ing *extensions.Ingress, reason, msg string) { + recorder.Event(ing, apiv1.EventTypeNormal, reason, msg) + } + + cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer, eventRaiser) return &cluster, nil } diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index e0a8de1fa..137357014 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -30,8 +30,6 @@ import ( informerv1 "k8s.io/client-go/informers/core/v1" informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1" "k8s.io/client-go/kubernetes" - scheme "k8s.io/client-go/kubernetes/scheme" - unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -60,17 +58,19 @@ type ControllerContext struct { ServiceInformer cache.SharedIndexInformer PodInformer cache.SharedIndexInformer NodeInformer cache.SharedIndexInformer + EventRecorder record.EventRecorder // Stop is the stop channel shared among controllers StopCh chan struct{} } -func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration) *ControllerContext { +func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, recorder record.EventRecorder) *ControllerContext { return &ControllerContext{ IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), StopCh: make(chan struct{}), + EventRecorder: recorder, } } @@ -120,17 +120,11 @@ type LoadBalancerController struct { // required for L7 loadbalancing. // - resyncPeriod: Watchers relist from the Kubernetes API server this often. func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ - Interface: kubeClient.Core().Events(""), - }) lbc := LoadBalancerController{ client: kubeClient, CloudClusterManager: clusterManager, stopCh: ctx.StopCh, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, - apiv1.EventSource{Component: "loadbalancer-controller"}), + recorder: ctx.EventRecorder, } lbc.nodeQueue = NewTaskQueue(lbc.syncNodes) lbc.ingQueue = NewTaskQueue(lbc.sync) @@ -299,6 +293,12 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if err != nil { return err } + var ing *extensions.Ingress + if ingExists { + // Make a shallow copy of the object + v := *obj.(*extensions.Ingress) + ing = &v + } // This performs a 2 phase checkpoint with the cloud: // * Phase 1 creates/verifies resources are as expected. At the end of a @@ -320,7 +320,9 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { }() // Record any errors during sync and throw a single error at the end. This // allows us to free up associated cloud resources ASAP. - igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts) + // Although we pass the current ingress, 'Checkpoint' syncs all resources. This ingress is only + // for attaching events. + igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts, ing) if err != nil { // TODO: Implement proper backoff for the queue. eventMsg := "GCE" @@ -335,8 +337,8 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if !ingExists { return syncError } - ing := *obj.(*extensions.Ingress) - if isGCEMultiClusterIngress(&ing) { + + if isGCEMultiClusterIngress(ing) { // Add instance group names as annotation on the ingress. if ing.Annotations == nil { ing.Annotations = map[string]string{} @@ -358,13 +360,13 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { return syncError } - if urlMap, err := lbc.tr.toURLMap(&ing); err != nil { + if urlMap, err := lbc.tr.toURLMap(ing); err != nil { syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err) } else if err := l7.UpdateUrlMap(urlMap); err != nil { - lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error()) + lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error()) syncError = fmt.Errorf("%v, update url map error: %v", syncError, err) } else if err := lbc.updateIngressStatus(l7, ing); err != nil { - lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error()) + lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error()) syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err) } return syncError @@ -372,7 +374,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { // updateIngressStatus updates the IP and annotations of a loadbalancer. // The annotations are parsed by kubectl describe. -func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error { +func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing *extensions.Ingress) error { ingClient := lbc.client.Extensions().Ingresses(ing.Namespace) // Update IP through update/status endpoint diff --git a/controllers/gce/controller/controller_test.go b/controllers/gce/controller/controller_test.go index b12d01f1d..21465842c 100644 --- a/controllers/gce/controller/controller_test.go +++ b/controllers/gce/controller/controller_test.go @@ -53,7 +53,8 @@ func defaultBackendName(clusterName string) string { // newLoadBalancerController create a loadbalancer controller. func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController { kubeClient := fake.NewSimpleClientset() - ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second) + eventRecorder := utils.NewEventRecorder(kubeClient) + ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, eventRecorder) lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager) if err != nil { t.Fatalf("%v", err) @@ -95,10 +96,12 @@ func toIngressRules(hostRules map[string]utils.FakeIngressRuleValueMap) []extens // newIngress returns a new Ingress with the given path map. func newIngress(hostRules map[string]utils.FakeIngressRuleValueMap) *extensions.Ingress { + id := uuid.NewUUID() return &extensions.Ingress{ ObjectMeta: meta_v1.ObjectMeta{ - Name: fmt.Sprintf("%v", uuid.NewUUID()), + Name: fmt.Sprintf("%v", id), Namespace: api.NamespaceNone, + UID: id, }, Spec: extensions.IngressSpec{ Backend: &extensions.IngressBackend{ diff --git a/controllers/gce/controller/fakes.go b/controllers/gce/controller/fakes.go index 113460e12..cba1abbea 100644 --- a/controllers/gce/controller/fakes.go +++ b/controllers/gce/controller/fakes.go @@ -17,7 +17,10 @@ limitations under the License. package controller import ( + "fmt" + compute "google.golang.org/api/compute/v1" + extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" @@ -65,7 +68,7 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager testDefaultBeNodePort, namer, ) - frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallsProvider(), namer) + frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallsProvider(false, false), namer, fakeRaiseIngressEvent) cm := &ClusterManager{ ClusterNamer: namer, instancePool: nodePool, @@ -75,3 +78,12 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager } return &fakeClusterManager{cm, fakeLbs, fakeBackends, fakeIGs} } + +func fakeRaiseIngressEvent(ing *extensions.Ingress, reason, msg string) { + ingName := "nil-ingress" + if ing != nil { + ingName = ing.Name + } + + fmt.Printf("Ingress %q Event %q: %q\n", ingName, reason, msg) +} diff --git a/controllers/gce/firewalls/fakes.go b/controllers/gce/firewalls/fakes.go index 8ee43ad3b..969e74aa9 100644 --- a/controllers/gce/firewalls/fakes.go +++ b/controllers/gce/firewalls/fakes.go @@ -25,14 +25,21 @@ import ( ) type fakeFirewallsProvider struct { - fw map[string]*compute.Firewall - networkUrl string + fw map[string]*compute.Firewall + networkProjectID string + networkURL string + onXPN bool + fwReadOnly bool } // NewFakeFirewallsProvider creates a fake for firewall rules. -func NewFakeFirewallsProvider() *fakeFirewallsProvider { +func NewFakeFirewallsProvider(onXPN bool, fwReadOnly bool) *fakeFirewallsProvider { return &fakeFirewallsProvider{ - fw: make(map[string]*compute.Firewall), + fw: make(map[string]*compute.Firewall), + networkProjectID: "test-network-project", + networkURL: "/path/to/my-network", + onXPN: onXPN, + fwReadOnly: fwReadOnly, } } @@ -44,7 +51,7 @@ func (ff *fakeFirewallsProvider) GetFirewall(name string) (*compute.Firewall, er return nil, utils.FakeGoogleAPINotFoundErr() } -func (ff *fakeFirewallsProvider) CreateFirewall(f *compute.Firewall) error { +func (ff *fakeFirewallsProvider) doCreateFirewall(f *compute.Firewall) error { if _, exists := ff.fw[f.Name]; exists { return fmt.Errorf("firewall rule %v already exists", f.Name) } @@ -52,7 +59,15 @@ func (ff *fakeFirewallsProvider) CreateFirewall(f *compute.Firewall) error { return nil } -func (ff *fakeFirewallsProvider) DeleteFirewall(name string) error { +func (ff *fakeFirewallsProvider) CreateFirewall(f *compute.Firewall) error { + if ff.fwReadOnly { + return utils.FakeGoogleAPIForbiddenErr() + } + + return ff.doCreateFirewall(f) +} + +func (ff *fakeFirewallsProvider) doDeleteFirewall(name string) error { // We need the full name for the same reason as CreateFirewall. _, exists := ff.fw[name] if !exists { @@ -63,7 +78,15 @@ func (ff *fakeFirewallsProvider) DeleteFirewall(name string) error { return nil } -func (ff *fakeFirewallsProvider) UpdateFirewall(f *compute.Firewall) error { +func (ff *fakeFirewallsProvider) DeleteFirewall(name string) error { + if ff.fwReadOnly { + return utils.FakeGoogleAPIForbiddenErr() + } + + return ff.doDeleteFirewall(name) +} + +func (ff *fakeFirewallsProvider) doUpdateFirewall(f *compute.Firewall) error { // We need the full name for the same reason as CreateFirewall. _, exists := ff.fw[f.Name] if !exists { @@ -74,8 +97,24 @@ func (ff *fakeFirewallsProvider) UpdateFirewall(f *compute.Firewall) error { return nil } +func (ff *fakeFirewallsProvider) UpdateFirewall(f *compute.Firewall) error { + if ff.fwReadOnly { + return utils.FakeGoogleAPIForbiddenErr() + } + + return ff.doUpdateFirewall(f) +} + +func (ff *fakeFirewallsProvider) NetworkProjectID() string { + return ff.networkProjectID +} + func (ff *fakeFirewallsProvider) NetworkURL() string { - return ff.networkUrl + return ff.networkURL +} + +func (ff *fakeFirewallsProvider) OnXPN() bool { + return ff.onXPN } func (ff *fakeFirewallsProvider) GetNodeTags(nodeNames []string) ([]string, error) { diff --git a/controllers/gce/firewalls/firewalls.go b/controllers/gce/firewalls/firewalls.go index a8aceca39..b63ce037d 100644 --- a/controllers/gce/firewalls/firewalls.go +++ b/controllers/gce/firewalls/firewalls.go @@ -17,12 +17,15 @@ limitations under the License. package firewalls import ( + "fmt" "strconv" "github.com/golang/glog" compute "google.golang.org/api/compute/v1" + extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" netset "k8s.io/kubernetes/pkg/util/net/sets" "k8s.io/ingress/controllers/gce/utils" @@ -36,21 +39,28 @@ type FirewallRules struct { cloud Firewall namer *utils.Namer srcRanges []string + recorder utils.IngressEventRecorder } // NewFirewallPool creates a new firewall rule manager. // cloud: the cloud object implementing Firewall. // namer: cluster namer. -func NewFirewallPool(cloud Firewall, namer *utils.Namer) SingleFirewallPool { +// recorder: the func for raising an event. +func NewFirewallPool(cloud Firewall, namer *utils.Namer, recorder utils.IngressEventRecorder) SingleFirewallPool { _, err := netset.ParseIPNets(l7SrcRanges...) if err != nil { glog.Fatalf("Could not parse L7 src ranges %v for firewall rule: %v", l7SrcRanges, err) } - return &FirewallRules{cloud: cloud, namer: namer, srcRanges: l7SrcRanges} + return &FirewallRules{ + cloud: cloud, + namer: namer, + srcRanges: l7SrcRanges, + recorder: recorder, + } } // Sync sync firewall rules with the cloud. -func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error { +func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string, ing *extensions.Ingress) error { if len(nodePorts) == 0 { return fr.Shutdown() } @@ -68,7 +78,7 @@ func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error { if rule == nil { glog.Infof("Creating global l7 firewall rule %v", name) - return fr.cloud.CreateFirewall(firewall) + return fr.createFirewall(firewall, ing) } requiredPorts := sets.NewString() @@ -92,15 +102,48 @@ func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error { return nil } glog.V(3).Infof("Firewall %v already exists, updating nodeports %v", name, nodePorts) - return fr.cloud.UpdateFirewall(firewall) + return fr.updateFirewall(firewall, ing) +} + +func (fr *FirewallRules) createFirewall(f *compute.Firewall, ing *extensions.Ingress) error { + err := fr.cloud.CreateFirewall(f) + if utils.IsForbiddenError(err) && fr.cloud.OnXPN() { + gcloudCmd := gce.FirewallToGCloudCreateCmd(f, fr.cloud.NetworkProjectID()) + glog.V(3).Infof("Could not create L7 firewall on XPN cluster. Raising event for cmd: %q", gcloudCmd) + fr.raiseFirewallChangeNeededEvent(ing, gcloudCmd) + return nil + } + return err +} + +func (fr *FirewallRules) updateFirewall(f *compute.Firewall, ing *extensions.Ingress) error { + err := fr.cloud.UpdateFirewall(f) + if utils.IsForbiddenError(err) && fr.cloud.OnXPN() { + gcloudCmd := gce.FirewallToGCloudUpdateCmd(f, fr.cloud.NetworkProjectID()) + glog.V(3).Infof("Could not update L7 firewall on XPN cluster. Raising event for cmd: %q", gcloudCmd) + fr.raiseFirewallChangeNeededEvent(ing, gcloudCmd) + return nil + } + return err +} + +func (fr *FirewallRules) deleteFirewall(name string) error { + err := fr.cloud.DeleteFirewall(name) + if utils.IsForbiddenError(err) && fr.cloud.OnXPN() { + gcloudCmd := gce.FirewallToGCloudDeleteCmd(name, fr.cloud.NetworkProjectID()) + glog.V(3).Infof("Could not delete L7 firewall on XPN cluster. %q needs to be ran.", gcloudCmd) + // An event cannot be raised because there's no ingress for reference. + return nil + } + return err } // Shutdown shuts down this firewall rules manager. func (fr *FirewallRules) Shutdown() error { name := fr.namer.FrName(fr.namer.FrSuffix()) glog.Infof("Deleting firewall %v", name) - err := fr.cloud.DeleteFirewall(name) - if err != nil && utils.IsHTTPErrorCode(err, 404) { + err := fr.deleteFirewall(name) + if utils.IsNotFoundError(err) { glog.Infof("Firewall with name %v didn't exist at Shutdown", name) return nil } @@ -141,3 +184,16 @@ func (fr *FirewallRules) createFirewallObject(firewallName, description string, TargetTags: targetTags, }, nil } + +func (fr *FirewallRules) raiseFirewallChangeNeededEvent(ing *extensions.Ingress, cmd string) { + if fr.recorder == nil { + return + } + + // Cannot attach an event to a nil object + if ing == nil { + return + } + + fr.recorder(ing, "LoadBalancerManualChange", fmt.Sprintf("Firewall change required by network admin: `%v`", cmd)) +} diff --git a/controllers/gce/firewalls/firewalls_test.go b/controllers/gce/firewalls/firewalls_test.go index 484ad90be..54eab2264 100644 --- a/controllers/gce/firewalls/firewalls_test.go +++ b/controllers/gce/firewalls/firewalls_test.go @@ -20,20 +20,22 @@ import ( "strconv" "testing" + extensions "k8s.io/api/extensions/v1beta1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress/controllers/gce/utils" ) func TestSyncFirewallPool(t *testing.T) { namer := utils.NewNamer("ABC", "XYZ") - fwp := NewFakeFirewallsProvider() - fp := NewFirewallPool(fwp, namer) + fwp := NewFakeFirewallsProvider(false, false) + fp := NewFirewallPool(fwp, namer, nil) ruleName := namer.FrName(namer.FrSuffix()) // Test creating a firewall rule via Sync nodePorts := []int64{80, 443, 3000} nodes := []string{"node-a", "node-b", "node-c"} - err := fp.Sync(nodePorts, nodes) + err := fp.Sync(nodePorts, nodes, nil) if err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } @@ -41,25 +43,25 @@ func TestSyncFirewallPool(t *testing.T) { // Sync to fewer ports nodePorts = []int64{80, 443} - err = fp.Sync(nodePorts, nodes) + err = fp.Sync(nodePorts, nodes, nil) if err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } verifyFirewallRule(fwp, ruleName, nodePorts, nodes, l7SrcRanges, t) - firewall, err := fp.(*FirewallRules).createFirewallObject(namer.FrName(namer.FrSuffix()), "", nodePorts, nodes) + firewall, err := fp.(*FirewallRules).createFirewallObject(ruleName, "", nodePorts, nodes) if err != nil { t.Errorf("unexpected err when creating firewall object, err: %v", err) } - err = fwp.UpdateFirewall(firewall) + err = fwp.doUpdateFirewall(firewall) if err != nil { t.Errorf("failed to update firewall rule, err: %v", err) } verifyFirewallRule(fwp, ruleName, nodePorts, nodes, l7SrcRanges, t) // Run Sync and expect l7 src ranges to be returned - err = fp.Sync(nodePorts, nodes) + err = fp.Sync(nodePorts, nodes, nil) if err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } @@ -68,7 +70,7 @@ func TestSyncFirewallPool(t *testing.T) { // Add node and expect firewall to remain the same // NOTE: See computeHostTag(..) in gce cloudprovider nodes = []string{"node-a", "node-b", "node-c", "node-d"} - err = fp.Sync(nodePorts, nodes) + err = fp.Sync(nodePorts, nodes, nil) if err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } @@ -76,7 +78,7 @@ func TestSyncFirewallPool(t *testing.T) { // Remove all ports and expect firewall rule to disappear nodePorts = []int64{} - err = fp.Sync(nodePorts, nodes) + err = fp.Sync(nodePorts, nodes, nil) if err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } @@ -87,6 +89,87 @@ func TestSyncFirewallPool(t *testing.T) { } } +// TestSyncOnXPNWithPermission tests that firwall sync continues to work when OnXPN=true +func TestSyncOnXPNWithPermission(t *testing.T) { + namer := utils.NewNamer("ABC", "XYZ") + fwp := NewFakeFirewallsProvider(true, false) + fp := NewFirewallPool(fwp, namer, nil) + ruleName := namer.FrName(namer.FrSuffix()) + + // Test creating a firewall rule via Sync + nodePorts := []int64{80, 443, 3000} + nodes := []string{"node-a", "node-b", "node-c"} + err := fp.Sync(nodePorts, nodes, nil) + if err != nil { + t.Errorf("unexpected err when syncing firewall, err: %v", err) + } + verifyFirewallRule(fwp, ruleName, nodePorts, nodes, l7SrcRanges, t) +} + +// TestSyncOnXPNReadOnly tests that controller behavior is accurate when the controller +// does not have permission to create/update/delete firewall rules. +// Sync should NOT return an error. An event should be raised. +func TestSyncOnXPNReadOnly(t *testing.T) { + ing := &extensions.Ingress{ObjectMeta: meta_v1.ObjectMeta{Name: "xpn-ingress"}} + var events []string + eventer := func(ing *extensions.Ingress, reason, msg string) { + events = append(events, msg) + } + + namer := utils.NewNamer("ABC", "XYZ") + fwp := NewFakeFirewallsProvider(true, true) + fp := NewFirewallPool(fwp, namer, eventer) + ruleName := namer.FrName(namer.FrSuffix()) + + // Test creating a firewall rule via Sync + nodePorts := []int64{80, 443, 3000} + nodes := []string{"node-a", "node-b", "node-c"} + err := fp.Sync(nodePorts, nodes, ing) + if err != nil { + t.Errorf("unexpected err when syncing firewall, err: %v", err) + } + + // Expect an event saying a firewall needs to be created + if len(events) != 1 { + t.Errorf("expected %v events but received %v: %+v", 1, len(events), events) + } + + // Clear events + events = events[:0] + + // Manually create the firewall + firewall, err := fp.(*FirewallRules).createFirewallObject(ruleName, "", nodePorts, nodes) + if err != nil { + t.Errorf("unexpected err when creating firewall object, err: %v", err) + } + err = fwp.doCreateFirewall(firewall) + if err != nil { + t.Errorf("unexpected err when creating firewall, err: %v", err) + } + + // Run sync again with same state - expect no event + err = fp.Sync(nodePorts, nodes, ing) + if err != nil { + t.Errorf("unexpected err when syncing firewall, err: %v", err) + } + if len(events) > 0 { + t.Errorf("received unexpected event(s): %+v", events) + } + + // Modify nodePorts to cause an event + nodePorts = append(nodePorts, 3001) + + // Run sync again with same state - expect no event + err = fp.Sync(nodePorts, nodes, ing) + if err != nil { + t.Errorf("unexpected err when syncing firewall, err: %v", err) + } + // Expect an event saying a firewall needs to be created + if len(events) != 1 { + t.Errorf("expected %v events but received %v: %+v", 1, len(events), events) + } +} + func verifyFirewallRule(fwp *fakeFirewallsProvider, ruleName string, expectedPorts []int64, expectedNodes, expectedCIDRs []string, t *testing.T) { var strPorts []string for _, v := range expectedPorts { diff --git a/controllers/gce/firewalls/interfaces.go b/controllers/gce/firewalls/interfaces.go index 97fc6a74d..cb1c9c7fb 100644 --- a/controllers/gce/firewalls/interfaces.go +++ b/controllers/gce/firewalls/interfaces.go @@ -18,12 +18,13 @@ package firewalls import ( compute "google.golang.org/api/compute/v1" + extensions "k8s.io/api/extensions/v1beta1" ) // SingleFirewallPool syncs the firewall rule for L7 traffic. type SingleFirewallPool interface { // TODO: Take a list of node ports for the firewall. - Sync(nodePorts []int64, nodeNames []string) error + Sync(nodePorts []int64, nodeNames []string, ing *extensions.Ingress) error Shutdown() error } @@ -36,5 +37,7 @@ type Firewall interface { DeleteFirewall(name string) error UpdateFirewall(f *compute.Firewall) error GetNodeTags(nodeNames []string) ([]string, error) + NetworkProjectID() string NetworkURL() string + OnXPN() bool } diff --git a/controllers/gce/main.go b/controllers/gce/main.go index 35d9145bd..05242c095 100644 --- a/controllers/gce/main.go +++ b/controllers/gce/main.go @@ -33,7 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" flag "github.com/spf13/pflag" - "k8s.io/api/core/v1" + apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -129,7 +129,7 @@ var ( `Path used to health-check a backend service. All Services must serve a 200 page on this path. Currently this is only configurable globally.`) - watchNamespace = flags.String("watch-namespace", v1.NamespaceAll, + watchNamespace = flags.String("watch-namespace", apiv1.NamespaceAll, `Namespace to watch for Ingress/Services/Endpoints.`) verbose = flags.Bool("verbose", false, @@ -158,7 +158,6 @@ func registerHandlers(lbc *controller.LoadBalancerController) { // TODO: Retry failures during shutdown. lbc.Stop(true) }) - glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *healthzPort), nil)) } @@ -249,6 +248,8 @@ func main() { SvcPort: intstr.FromInt(int(port)), } + eventRecorder := utils.NewEventRecorder(kubeClient) + var cloud *gce.GCECloud if *inCluster || *useRealCloud { // Create cluster manager @@ -278,7 +279,7 @@ func main() { glog.Infof("Created GCE client without a config file") } - clusterManager, err = controller.NewClusterManager(cloud, namer, defaultBackendNodePort, *healthCheckPath) + clusterManager, err = controller.NewClusterManager(cloud, namer, defaultBackendNodePort, *healthCheckPath, eventRecorder) if err != nil { glog.Fatalf("%v", err) } @@ -287,14 +288,13 @@ func main() { clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager } - ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod) + ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, eventRecorder) // Start loadbalancer controller lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager) if err != nil { glog.Fatalf("%v", err) } - if clusterManager.ClusterNamer.GetClusterName() != "" { glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName()) } @@ -453,7 +453,7 @@ func getClusterUID(kubeClient kubernetes.Interface, name string) (string, error) // getNodePort waits for the Service, and returns it's first node port. func getNodePort(client kubernetes.Interface, ns, name string) (port, nodePort int32, err error) { - var svc *v1.Service + var svc *apiv1.Service glog.V(3).Infof("Waiting for %v/%v", ns, name) wait.Poll(1*time.Second, 5*time.Minute, func() (bool, error) { svc, err = client.Core().Services(ns).Get(name, metav1.GetOptions{}) diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index c0486f557..bc45dc3e1 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -24,6 +24,13 @@ import ( "strings" "sync" + apiv1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/client-go/kubernetes" + scheme "k8s.io/client-go/kubernetes/scheme" + unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + "github.com/golang/glog" compute "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" @@ -309,9 +316,14 @@ func (g GCEURLMap) PutDefaultBackend(d *compute.BackendService) { } } +// FakeGoogleAPIForbiddenErr creates a Forbidden error with type googleapi.Error +func FakeGoogleAPIForbiddenErr() *googleapi.Error { + return &googleapi.Error{Code: http.StatusForbidden} +} + // FakeGoogleAPINotFoundErr creates a NotFound error with type googleapi.Error func FakeGoogleAPINotFoundErr() *googleapi.Error { - return &googleapi.Error{Code: 404} + return &googleapi.Error{Code: http.StatusNotFound} } // IsHTTPErrorCode checks if the given error matches the given HTTP Error code. @@ -344,6 +356,11 @@ func IsNotFoundError(err error) bool { return IsHTTPErrorCode(err, http.StatusNotFound) } +// IsForbiddenError returns true if the operation was forbidden +func IsForbiddenError(err error) bool { + return IsHTTPErrorCode(err, http.StatusForbidden) +} + // CompareLinks returns true if the 2 self links are equal. func CompareLinks(l1, l2 string) bool { // TODO: These can be partial links @@ -359,3 +376,16 @@ func GetNamedPort(port int64) *compute.NamedPort { // TODO: move port naming to namer return &compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port} } + +// NewEventRecorder returns an event recorder given a kubernetes client +func NewEventRecorder(kubeClient kubernetes.Interface) record.EventRecorder { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ + Interface: kubeClient.Core().Events(""), + }) + return eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"}) + +} + +type IngressEventRecorder func(ing *extensions.Ingress, reason, msg string)