Gracefully handle case of not having firewall create/update/delete

permissions on XPN cluster
This commit is contained in:
Nick Sardo 2017-10-02 16:05:52 -07:00
parent cbfb619ca7
commit 2025675257
10 changed files with 294 additions and 57 deletions

View file

@ -22,6 +22,9 @@ import (
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/tools/record"
gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/ingress/controllers/gce/backends"
@ -119,7 +122,7 @@ func (c *ClusterManager) shutdown() error {
// Returns the list of all instance groups corresponding to the given loadbalancers.
// If in performing the checkpoint the cluster manager runs out of quota, a
// googleapi 403 is returned.
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, currentIngress *extensions.Ingress) ([]*compute.InstanceGroup, error) {
if len(namedPorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
namedPorts = append(namedPorts, c.defaultBackendNodePort)
@ -159,7 +162,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
for _, p := range fwNodePorts {
np = append(np, p.Port)
}
if err := c.firewallPool.Sync(np, nodeNames); err != nil {
if err := c.firewallPool.Sync(np, nodeNames, currentIngress); err != nil {
return igs, err
}
@ -233,7 +236,8 @@ func NewClusterManager(
cloud *gce.GCECloud,
namer *utils.Namer,
defaultBackendNodePort backends.ServicePort,
defaultHealthCheckPath string) (*ClusterManager, error) {
defaultHealthCheckPath string,
recorder record.EventRecorder) (*ClusterManager, error) {
// Names are fundamental to the cluster, the uid allocator makes sure names don't collide.
cluster := ClusterManager{ClusterNamer: namer}
@ -255,6 +259,11 @@ func NewClusterManager(
// L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs.
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)
cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer)
eventRaiser := func(ing *extensions.Ingress, reason, msg string) {
recorder.Event(ing, apiv1.EventTypeNormal, reason, msg)
}
cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer, eventRaiser)
return &cluster, nil
}

View file

@ -30,8 +30,6 @@ import (
informerv1 "k8s.io/client-go/informers/core/v1"
informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@ -60,17 +58,19 @@ type ControllerContext struct {
ServiceInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
EventRecorder record.EventRecorder
// Stop is the stop channel shared among controllers
StopCh chan struct{}
}
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration) *ControllerContext {
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, recorder record.EventRecorder) *ControllerContext {
return &ControllerContext{
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
StopCh: make(chan struct{}),
EventRecorder: recorder,
}
}
@ -120,17 +120,11 @@ type LoadBalancerController struct {
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Interface: kubeClient.Core().Events(""),
})
lbc := LoadBalancerController{
client: kubeClient,
CloudClusterManager: clusterManager,
stopCh: ctx.StopCh,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme,
apiv1.EventSource{Component: "loadbalancer-controller"}),
recorder: ctx.EventRecorder,
}
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
lbc.ingQueue = NewTaskQueue(lbc.sync)
@ -299,6 +293,12 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
if err != nil {
return err
}
var ing *extensions.Ingress
if ingExists {
// Make a shallow copy of the object
v := *obj.(*extensions.Ingress)
ing = &v
}
// This performs a 2 phase checkpoint with the cloud:
// * Phase 1 creates/verifies resources are as expected. At the end of a
@ -320,7 +320,9 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
}()
// Record any errors during sync and throw a single error at the end. This
// allows us to free up associated cloud resources ASAP.
igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts)
// Although we pass the current ingress, 'Checkpoint' syncs all resources. This ingress is only
// for attaching events.
igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts, ing)
if err != nil {
// TODO: Implement proper backoff for the queue.
eventMsg := "GCE"
@ -335,8 +337,8 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
if !ingExists {
return syncError
}
ing := *obj.(*extensions.Ingress)
if isGCEMultiClusterIngress(&ing) {
if isGCEMultiClusterIngress(ing) {
// Add instance group names as annotation on the ingress.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
@ -358,13 +360,13 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
return syncError
}
if urlMap, err := lbc.tr.toURLMap(&ing); err != nil {
if urlMap, err := lbc.tr.toURLMap(ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error())
lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
return syncError
@ -372,7 +374,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing *extensions.Ingress) error {
ingClient := lbc.client.Extensions().Ingresses(ing.Namespace)
// Update IP through update/status endpoint

View file

@ -53,7 +53,8 @@ func defaultBackendName(clusterName string) string {
// newLoadBalancerController create a loadbalancer controller.
func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController {
kubeClient := fake.NewSimpleClientset()
ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second)
eventRecorder := utils.NewEventRecorder(kubeClient)
ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, eventRecorder)
lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager)
if err != nil {
t.Fatalf("%v", err)
@ -95,10 +96,12 @@ func toIngressRules(hostRules map[string]utils.FakeIngressRuleValueMap) []extens
// newIngress returns a new Ingress with the given path map.
func newIngress(hostRules map[string]utils.FakeIngressRuleValueMap) *extensions.Ingress {
id := uuid.NewUUID()
return &extensions.Ingress{
ObjectMeta: meta_v1.ObjectMeta{
Name: fmt.Sprintf("%v", uuid.NewUUID()),
Name: fmt.Sprintf("%v", id),
Namespace: api.NamespaceNone,
UID: id,
},
Spec: extensions.IngressSpec{
Backend: &extensions.IngressBackend{

View file

@ -17,7 +17,10 @@ limitations under the License.
package controller
import (
"fmt"
compute "google.golang.org/api/compute/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
@ -65,7 +68,7 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager
testDefaultBeNodePort,
namer,
)
frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallsProvider(), namer)
frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallsProvider(false, false), namer, fakeRaiseIngressEvent)
cm := &ClusterManager{
ClusterNamer: namer,
instancePool: nodePool,
@ -75,3 +78,12 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager
}
return &fakeClusterManager{cm, fakeLbs, fakeBackends, fakeIGs}
}
func fakeRaiseIngressEvent(ing *extensions.Ingress, reason, msg string) {
ingName := "nil-ingress"
if ing != nil {
ingName = ing.Name
}
fmt.Printf("Ingress %q Event %q: %q\n", ingName, reason, msg)
}

View file

@ -25,14 +25,21 @@ import (
)
type fakeFirewallsProvider struct {
fw map[string]*compute.Firewall
networkUrl string
fw map[string]*compute.Firewall
networkProjectID string
networkURL string
onXPN bool
fwReadOnly bool
}
// NewFakeFirewallsProvider creates a fake for firewall rules.
func NewFakeFirewallsProvider() *fakeFirewallsProvider {
func NewFakeFirewallsProvider(onXPN bool, fwReadOnly bool) *fakeFirewallsProvider {
return &fakeFirewallsProvider{
fw: make(map[string]*compute.Firewall),
fw: make(map[string]*compute.Firewall),
networkProjectID: "test-network-project",
networkURL: "/path/to/my-network",
onXPN: onXPN,
fwReadOnly: fwReadOnly,
}
}
@ -44,7 +51,7 @@ func (ff *fakeFirewallsProvider) GetFirewall(name string) (*compute.Firewall, er
return nil, utils.FakeGoogleAPINotFoundErr()
}
func (ff *fakeFirewallsProvider) CreateFirewall(f *compute.Firewall) error {
func (ff *fakeFirewallsProvider) doCreateFirewall(f *compute.Firewall) error {
if _, exists := ff.fw[f.Name]; exists {
return fmt.Errorf("firewall rule %v already exists", f.Name)
}
@ -52,7 +59,15 @@ func (ff *fakeFirewallsProvider) CreateFirewall(f *compute.Firewall) error {
return nil
}
func (ff *fakeFirewallsProvider) DeleteFirewall(name string) error {
func (ff *fakeFirewallsProvider) CreateFirewall(f *compute.Firewall) error {
if ff.fwReadOnly {
return utils.FakeGoogleAPIForbiddenErr()
}
return ff.doCreateFirewall(f)
}
func (ff *fakeFirewallsProvider) doDeleteFirewall(name string) error {
// We need the full name for the same reason as CreateFirewall.
_, exists := ff.fw[name]
if !exists {
@ -63,7 +78,15 @@ func (ff *fakeFirewallsProvider) DeleteFirewall(name string) error {
return nil
}
func (ff *fakeFirewallsProvider) UpdateFirewall(f *compute.Firewall) error {
func (ff *fakeFirewallsProvider) DeleteFirewall(name string) error {
if ff.fwReadOnly {
return utils.FakeGoogleAPIForbiddenErr()
}
return ff.doDeleteFirewall(name)
}
func (ff *fakeFirewallsProvider) doUpdateFirewall(f *compute.Firewall) error {
// We need the full name for the same reason as CreateFirewall.
_, exists := ff.fw[f.Name]
if !exists {
@ -74,8 +97,24 @@ func (ff *fakeFirewallsProvider) UpdateFirewall(f *compute.Firewall) error {
return nil
}
func (ff *fakeFirewallsProvider) UpdateFirewall(f *compute.Firewall) error {
if ff.fwReadOnly {
return utils.FakeGoogleAPIForbiddenErr()
}
return ff.doUpdateFirewall(f)
}
func (ff *fakeFirewallsProvider) NetworkProjectID() string {
return ff.networkProjectID
}
func (ff *fakeFirewallsProvider) NetworkURL() string {
return ff.networkUrl
return ff.networkURL
}
func (ff *fakeFirewallsProvider) OnXPN() bool {
return ff.onXPN
}
func (ff *fakeFirewallsProvider) GetNodeTags(nodeNames []string) ([]string, error) {

View file

@ -17,12 +17,15 @@ limitations under the License.
package firewalls
import (
"fmt"
"strconv"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
netset "k8s.io/kubernetes/pkg/util/net/sets"
"k8s.io/ingress/controllers/gce/utils"
@ -36,21 +39,28 @@ type FirewallRules struct {
cloud Firewall
namer *utils.Namer
srcRanges []string
recorder utils.IngressEventRecorder
}
// NewFirewallPool creates a new firewall rule manager.
// cloud: the cloud object implementing Firewall.
// namer: cluster namer.
func NewFirewallPool(cloud Firewall, namer *utils.Namer) SingleFirewallPool {
// recorder: the func for raising an event.
func NewFirewallPool(cloud Firewall, namer *utils.Namer, recorder utils.IngressEventRecorder) SingleFirewallPool {
_, err := netset.ParseIPNets(l7SrcRanges...)
if err != nil {
glog.Fatalf("Could not parse L7 src ranges %v for firewall rule: %v", l7SrcRanges, err)
}
return &FirewallRules{cloud: cloud, namer: namer, srcRanges: l7SrcRanges}
return &FirewallRules{
cloud: cloud,
namer: namer,
srcRanges: l7SrcRanges,
recorder: recorder,
}
}
// Sync sync firewall rules with the cloud.
func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error {
func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string, ing *extensions.Ingress) error {
if len(nodePorts) == 0 {
return fr.Shutdown()
}
@ -68,7 +78,7 @@ func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error {
if rule == nil {
glog.Infof("Creating global l7 firewall rule %v", name)
return fr.cloud.CreateFirewall(firewall)
return fr.createFirewall(firewall, ing)
}
requiredPorts := sets.NewString()
@ -92,15 +102,48 @@ func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error {
return nil
}
glog.V(3).Infof("Firewall %v already exists, updating nodeports %v", name, nodePorts)
return fr.cloud.UpdateFirewall(firewall)
return fr.updateFirewall(firewall, ing)
}
func (fr *FirewallRules) createFirewall(f *compute.Firewall, ing *extensions.Ingress) error {
err := fr.cloud.CreateFirewall(f)
if utils.IsForbiddenError(err) && fr.cloud.OnXPN() {
gcloudCmd := gce.FirewallToGCloudCreateCmd(f, fr.cloud.NetworkProjectID())
glog.V(3).Infof("Could not create L7 firewall on XPN cluster. Raising event for cmd: %q", gcloudCmd)
fr.raiseFirewallChangeNeededEvent(ing, gcloudCmd)
return nil
}
return err
}
func (fr *FirewallRules) updateFirewall(f *compute.Firewall, ing *extensions.Ingress) error {
err := fr.cloud.UpdateFirewall(f)
if utils.IsForbiddenError(err) && fr.cloud.OnXPN() {
gcloudCmd := gce.FirewallToGCloudUpdateCmd(f, fr.cloud.NetworkProjectID())
glog.V(3).Infof("Could not update L7 firewall on XPN cluster. Raising event for cmd: %q", gcloudCmd)
fr.raiseFirewallChangeNeededEvent(ing, gcloudCmd)
return nil
}
return err
}
func (fr *FirewallRules) deleteFirewall(name string) error {
err := fr.cloud.DeleteFirewall(name)
if utils.IsForbiddenError(err) && fr.cloud.OnXPN() {
gcloudCmd := gce.FirewallToGCloudDeleteCmd(name, fr.cloud.NetworkProjectID())
glog.V(3).Infof("Could not delete L7 firewall on XPN cluster. %q needs to be ran.", gcloudCmd)
// An event cannot be raised because there's no ingress for reference.
return nil
}
return err
}
// Shutdown shuts down this firewall rules manager.
func (fr *FirewallRules) Shutdown() error {
name := fr.namer.FrName(fr.namer.FrSuffix())
glog.Infof("Deleting firewall %v", name)
err := fr.cloud.DeleteFirewall(name)
if err != nil && utils.IsHTTPErrorCode(err, 404) {
err := fr.deleteFirewall(name)
if utils.IsNotFoundError(err) {
glog.Infof("Firewall with name %v didn't exist at Shutdown", name)
return nil
}
@ -141,3 +184,16 @@ func (fr *FirewallRules) createFirewallObject(firewallName, description string,
TargetTags: targetTags,
}, nil
}
func (fr *FirewallRules) raiseFirewallChangeNeededEvent(ing *extensions.Ingress, cmd string) {
if fr.recorder == nil {
return
}
// Cannot attach an event to a nil object
if ing == nil {
return
}
fr.recorder(ing, "LoadBalancerManualChange", fmt.Sprintf("Firewall change required by network admin: `%v`", cmd))
}

View file

@ -20,20 +20,22 @@ import (
"strconv"
"testing"
extensions "k8s.io/api/extensions/v1beta1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress/controllers/gce/utils"
)
func TestSyncFirewallPool(t *testing.T) {
namer := utils.NewNamer("ABC", "XYZ")
fwp := NewFakeFirewallsProvider()
fp := NewFirewallPool(fwp, namer)
fwp := NewFakeFirewallsProvider(false, false)
fp := NewFirewallPool(fwp, namer, nil)
ruleName := namer.FrName(namer.FrSuffix())
// Test creating a firewall rule via Sync
nodePorts := []int64{80, 443, 3000}
nodes := []string{"node-a", "node-b", "node-c"}
err := fp.Sync(nodePorts, nodes)
err := fp.Sync(nodePorts, nodes, nil)
if err != nil {
t.Errorf("unexpected err when syncing firewall, err: %v", err)
}
@ -41,25 +43,25 @@ func TestSyncFirewallPool(t *testing.T) {
// Sync to fewer ports
nodePorts = []int64{80, 443}
err = fp.Sync(nodePorts, nodes)
err = fp.Sync(nodePorts, nodes, nil)
if err != nil {
t.Errorf("unexpected err when syncing firewall, err: %v", err)
}
verifyFirewallRule(fwp, ruleName, nodePorts, nodes, l7SrcRanges, t)
firewall, err := fp.(*FirewallRules).createFirewallObject(namer.FrName(namer.FrSuffix()), "", nodePorts, nodes)
firewall, err := fp.(*FirewallRules).createFirewallObject(ruleName, "", nodePorts, nodes)
if err != nil {
t.Errorf("unexpected err when creating firewall object, err: %v", err)
}
err = fwp.UpdateFirewall(firewall)
err = fwp.doUpdateFirewall(firewall)
if err != nil {
t.Errorf("failed to update firewall rule, err: %v", err)
}
verifyFirewallRule(fwp, ruleName, nodePorts, nodes, l7SrcRanges, t)
// Run Sync and expect l7 src ranges to be returned
err = fp.Sync(nodePorts, nodes)
err = fp.Sync(nodePorts, nodes, nil)
if err != nil {
t.Errorf("unexpected err when syncing firewall, err: %v", err)
}
@ -68,7 +70,7 @@ func TestSyncFirewallPool(t *testing.T) {
// Add node and expect firewall to remain the same
// NOTE: See computeHostTag(..) in gce cloudprovider
nodes = []string{"node-a", "node-b", "node-c", "node-d"}
err = fp.Sync(nodePorts, nodes)
err = fp.Sync(nodePorts, nodes, nil)
if err != nil {
t.Errorf("unexpected err when syncing firewall, err: %v", err)
}
@ -76,7 +78,7 @@ func TestSyncFirewallPool(t *testing.T) {
// Remove all ports and expect firewall rule to disappear
nodePorts = []int64{}
err = fp.Sync(nodePorts, nodes)
err = fp.Sync(nodePorts, nodes, nil)
if err != nil {
t.Errorf("unexpected err when syncing firewall, err: %v", err)
}
@ -87,6 +89,87 @@ func TestSyncFirewallPool(t *testing.T) {
}
}
// TestSyncOnXPNWithPermission tests that firwall sync continues to work when OnXPN=true
func TestSyncOnXPNWithPermission(t *testing.T) {
namer := utils.NewNamer("ABC", "XYZ")
fwp := NewFakeFirewallsProvider(true, false)
fp := NewFirewallPool(fwp, namer, nil)
ruleName := namer.FrName(namer.FrSuffix())
// Test creating a firewall rule via Sync
nodePorts := []int64{80, 443, 3000}
nodes := []string{"node-a", "node-b", "node-c"}
err := fp.Sync(nodePorts, nodes, nil)
if err != nil {
t.Errorf("unexpected err when syncing firewall, err: %v", err)
}
verifyFirewallRule(fwp, ruleName, nodePorts, nodes, l7SrcRanges, t)
}
// TestSyncOnXPNReadOnly tests that controller behavior is accurate when the controller
// does not have permission to create/update/delete firewall rules.
// Sync should NOT return an error. An event should be raised.
func TestSyncOnXPNReadOnly(t *testing.T) {
ing := &extensions.Ingress{ObjectMeta: meta_v1.ObjectMeta{Name: "xpn-ingress"}}
var events []string
eventer := func(ing *extensions.Ingress, reason, msg string) {
events = append(events, msg)
}
namer := utils.NewNamer("ABC", "XYZ")
fwp := NewFakeFirewallsProvider(true, true)
fp := NewFirewallPool(fwp, namer, eventer)
ruleName := namer.FrName(namer.FrSuffix())
// Test creating a firewall rule via Sync
nodePorts := []int64{80, 443, 3000}
nodes := []string{"node-a", "node-b", "node-c"}
err := fp.Sync(nodePorts, nodes, ing)
if err != nil {
t.Errorf("unexpected err when syncing firewall, err: %v", err)
}
// Expect an event saying a firewall needs to be created
if len(events) != 1 {
t.Errorf("expected %v events but received %v: %+v", 1, len(events), events)
}
// Clear events
events = events[:0]
// Manually create the firewall
firewall, err := fp.(*FirewallRules).createFirewallObject(ruleName, "", nodePorts, nodes)
if err != nil {
t.Errorf("unexpected err when creating firewall object, err: %v", err)
}
err = fwp.doCreateFirewall(firewall)
if err != nil {
t.Errorf("unexpected err when creating firewall, err: %v", err)
}
// Run sync again with same state - expect no event
err = fp.Sync(nodePorts, nodes, ing)
if err != nil {
t.Errorf("unexpected err when syncing firewall, err: %v", err)
}
if len(events) > 0 {
t.Errorf("received unexpected event(s): %+v", events)
}
// Modify nodePorts to cause an event
nodePorts = append(nodePorts, 3001)
// Run sync again with same state - expect no event
err = fp.Sync(nodePorts, nodes, ing)
if err != nil {
t.Errorf("unexpected err when syncing firewall, err: %v", err)
}
// Expect an event saying a firewall needs to be created
if len(events) != 1 {
t.Errorf("expected %v events but received %v: %+v", 1, len(events), events)
}
}
func verifyFirewallRule(fwp *fakeFirewallsProvider, ruleName string, expectedPorts []int64, expectedNodes, expectedCIDRs []string, t *testing.T) {
var strPorts []string
for _, v := range expectedPorts {

View file

@ -18,12 +18,13 @@ package firewalls
import (
compute "google.golang.org/api/compute/v1"
extensions "k8s.io/api/extensions/v1beta1"
)
// SingleFirewallPool syncs the firewall rule for L7 traffic.
type SingleFirewallPool interface {
// TODO: Take a list of node ports for the firewall.
Sync(nodePorts []int64, nodeNames []string) error
Sync(nodePorts []int64, nodeNames []string, ing *extensions.Ingress) error
Shutdown() error
}
@ -36,5 +37,7 @@ type Firewall interface {
DeleteFirewall(name string) error
UpdateFirewall(f *compute.Firewall) error
GetNodeTags(nodeNames []string) ([]string, error)
NetworkProjectID() string
NetworkURL() string
OnXPN() bool
}

View file

@ -33,7 +33,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
flag "github.com/spf13/pflag"
"k8s.io/api/core/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
@ -129,7 +129,7 @@ var (
`Path used to health-check a backend service. All Services must serve
a 200 page on this path. Currently this is only configurable globally.`)
watchNamespace = flags.String("watch-namespace", v1.NamespaceAll,
watchNamespace = flags.String("watch-namespace", apiv1.NamespaceAll,
`Namespace to watch for Ingress/Services/Endpoints.`)
verbose = flags.Bool("verbose", false,
@ -158,7 +158,6 @@ func registerHandlers(lbc *controller.LoadBalancerController) {
// TODO: Retry failures during shutdown.
lbc.Stop(true)
})
glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *healthzPort), nil))
}
@ -249,6 +248,8 @@ func main() {
SvcPort: intstr.FromInt(int(port)),
}
eventRecorder := utils.NewEventRecorder(kubeClient)
var cloud *gce.GCECloud
if *inCluster || *useRealCloud {
// Create cluster manager
@ -278,7 +279,7 @@ func main() {
glog.Infof("Created GCE client without a config file")
}
clusterManager, err = controller.NewClusterManager(cloud, namer, defaultBackendNodePort, *healthCheckPath)
clusterManager, err = controller.NewClusterManager(cloud, namer, defaultBackendNodePort, *healthCheckPath, eventRecorder)
if err != nil {
glog.Fatalf("%v", err)
}
@ -287,14 +288,13 @@ func main() {
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
}
ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod)
ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, eventRecorder)
// Start loadbalancer controller
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager)
if err != nil {
glog.Fatalf("%v", err)
}
if clusterManager.ClusterNamer.GetClusterName() != "" {
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName())
}
@ -453,7 +453,7 @@ func getClusterUID(kubeClient kubernetes.Interface, name string) (string, error)
// getNodePort waits for the Service, and returns it's first node port.
func getNodePort(client kubernetes.Interface, ns, name string) (port, nodePort int32, err error) {
var svc *v1.Service
var svc *apiv1.Service
glog.V(3).Infof("Waiting for %v/%v", ns, name)
wait.Poll(1*time.Second, 5*time.Minute, func() (bool, error) {
svc, err = client.Core().Services(ns).Get(name, metav1.GetOptions{})

View file

@ -24,6 +24,13 @@ import (
"strings"
"sync"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
@ -309,9 +316,14 @@ func (g GCEURLMap) PutDefaultBackend(d *compute.BackendService) {
}
}
// FakeGoogleAPIForbiddenErr creates a Forbidden error with type googleapi.Error
func FakeGoogleAPIForbiddenErr() *googleapi.Error {
return &googleapi.Error{Code: http.StatusForbidden}
}
// FakeGoogleAPINotFoundErr creates a NotFound error with type googleapi.Error
func FakeGoogleAPINotFoundErr() *googleapi.Error {
return &googleapi.Error{Code: 404}
return &googleapi.Error{Code: http.StatusNotFound}
}
// IsHTTPErrorCode checks if the given error matches the given HTTP Error code.
@ -344,6 +356,11 @@ func IsNotFoundError(err error) bool {
return IsHTTPErrorCode(err, http.StatusNotFound)
}
// IsForbiddenError returns true if the operation was forbidden
func IsForbiddenError(err error) bool {
return IsHTTPErrorCode(err, http.StatusForbidden)
}
// CompareLinks returns true if the 2 self links are equal.
func CompareLinks(l1, l2 string) bool {
// TODO: These can be partial links
@ -359,3 +376,16 @@ func GetNamedPort(port int64) *compute.NamedPort {
// TODO: move port naming to namer
return &compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port}
}
// NewEventRecorder returns an event recorder given a kubernetes client
func NewEventRecorder(kubeClient kubernetes.Interface) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Interface: kubeClient.Core().Events(""),
})
return eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"})
}
type IngressEventRecorder func(ing *extensions.Ingress, reason, msg string)