Merge 2025675257
into 51248f87f3
This commit is contained in:
commit
11bdd39b57
10 changed files with 294 additions and 57 deletions
|
@ -22,6 +22,9 @@ import (
|
|||
"github.com/golang/glog"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||
|
||||
"k8s.io/ingress/controllers/gce/backends"
|
||||
|
@ -119,7 +122,7 @@ func (c *ClusterManager) shutdown() error {
|
|||
// Returns the list of all instance groups corresponding to the given loadbalancers.
|
||||
// If in performing the checkpoint the cluster manager runs out of quota, a
|
||||
// googleapi 403 is returned.
|
||||
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
|
||||
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, currentIngress *extensions.Ingress) ([]*compute.InstanceGroup, error) {
|
||||
if len(namedPorts) != 0 {
|
||||
// Add the default backend node port to the list of named ports for instance groups.
|
||||
namedPorts = append(namedPorts, c.defaultBackendNodePort)
|
||||
|
@ -159,7 +162,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
|
|||
for _, p := range fwNodePorts {
|
||||
np = append(np, p.Port)
|
||||
}
|
||||
if err := c.firewallPool.Sync(np, nodeNames); err != nil {
|
||||
if err := c.firewallPool.Sync(np, nodeNames, currentIngress); err != nil {
|
||||
return igs, err
|
||||
}
|
||||
|
||||
|
@ -233,7 +236,8 @@ func NewClusterManager(
|
|||
cloud *gce.GCECloud,
|
||||
namer *utils.Namer,
|
||||
defaultBackendNodePort backends.ServicePort,
|
||||
defaultHealthCheckPath string) (*ClusterManager, error) {
|
||||
defaultHealthCheckPath string,
|
||||
recorder record.EventRecorder) (*ClusterManager, error) {
|
||||
|
||||
// Names are fundamental to the cluster, the uid allocator makes sure names don't collide.
|
||||
cluster := ClusterManager{ClusterNamer: namer}
|
||||
|
@ -255,6 +259,11 @@ func NewClusterManager(
|
|||
|
||||
// L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs.
|
||||
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)
|
||||
cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer)
|
||||
|
||||
eventRaiser := func(ing *extensions.Ingress, reason, msg string) {
|
||||
recorder.Event(ing, apiv1.EventTypeNormal, reason, msg)
|
||||
}
|
||||
|
||||
cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer, eventRaiser)
|
||||
return &cluster, nil
|
||||
}
|
||||
|
|
|
@ -30,8 +30,6 @@ import (
|
|||
informerv1 "k8s.io/client-go/informers/core/v1"
|
||||
informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
scheme "k8s.io/client-go/kubernetes/scheme"
|
||||
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
@ -60,17 +58,19 @@ type ControllerContext struct {
|
|||
ServiceInformer cache.SharedIndexInformer
|
||||
PodInformer cache.SharedIndexInformer
|
||||
NodeInformer cache.SharedIndexInformer
|
||||
EventRecorder record.EventRecorder
|
||||
// Stop is the stop channel shared among controllers
|
||||
StopCh chan struct{}
|
||||
}
|
||||
|
||||
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration) *ControllerContext {
|
||||
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, recorder record.EventRecorder) *ControllerContext {
|
||||
return &ControllerContext{
|
||||
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
|
||||
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
|
||||
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
|
||||
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
|
||||
StopCh: make(chan struct{}),
|
||||
EventRecorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -120,17 +120,11 @@ type LoadBalancerController struct {
|
|||
// required for L7 loadbalancing.
|
||||
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
|
||||
func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
|
||||
Interface: kubeClient.Core().Events(""),
|
||||
})
|
||||
lbc := LoadBalancerController{
|
||||
client: kubeClient,
|
||||
CloudClusterManager: clusterManager,
|
||||
stopCh: ctx.StopCh,
|
||||
recorder: eventBroadcaster.NewRecorder(scheme.Scheme,
|
||||
apiv1.EventSource{Component: "loadbalancer-controller"}),
|
||||
recorder: ctx.EventRecorder,
|
||||
}
|
||||
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
|
||||
lbc.ingQueue = NewTaskQueue(lbc.sync)
|
||||
|
@ -299,6 +293,12 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var ing *extensions.Ingress
|
||||
if ingExists {
|
||||
// Make a shallow copy of the object
|
||||
v := *obj.(*extensions.Ingress)
|
||||
ing = &v
|
||||
}
|
||||
|
||||
// This performs a 2 phase checkpoint with the cloud:
|
||||
// * Phase 1 creates/verifies resources are as expected. At the end of a
|
||||
|
@ -320,7 +320,9 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
}()
|
||||
// Record any errors during sync and throw a single error at the end. This
|
||||
// allows us to free up associated cloud resources ASAP.
|
||||
igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts)
|
||||
// Although we pass the current ingress, 'Checkpoint' syncs all resources. This ingress is only
|
||||
// for attaching events.
|
||||
igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts, ing)
|
||||
if err != nil {
|
||||
// TODO: Implement proper backoff for the queue.
|
||||
eventMsg := "GCE"
|
||||
|
@ -335,8 +337,8 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
if !ingExists {
|
||||
return syncError
|
||||
}
|
||||
ing := *obj.(*extensions.Ingress)
|
||||
if isGCEMultiClusterIngress(&ing) {
|
||||
|
||||
if isGCEMultiClusterIngress(ing) {
|
||||
// Add instance group names as annotation on the ingress.
|
||||
if ing.Annotations == nil {
|
||||
ing.Annotations = map[string]string{}
|
||||
|
@ -358,13 +360,13 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
return syncError
|
||||
}
|
||||
|
||||
if urlMap, err := lbc.tr.toURLMap(&ing); err != nil {
|
||||
if urlMap, err := lbc.tr.toURLMap(ing); err != nil {
|
||||
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
|
||||
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
|
||||
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
|
||||
lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
|
||||
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
|
||||
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
|
||||
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error())
|
||||
lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error())
|
||||
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
|
||||
}
|
||||
return syncError
|
||||
|
@ -372,7 +374,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
|||
|
||||
// updateIngressStatus updates the IP and annotations of a loadbalancer.
|
||||
// The annotations are parsed by kubectl describe.
|
||||
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
|
||||
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing *extensions.Ingress) error {
|
||||
ingClient := lbc.client.Extensions().Ingresses(ing.Namespace)
|
||||
|
||||
// Update IP through update/status endpoint
|
||||
|
|
|
@ -53,7 +53,8 @@ func defaultBackendName(clusterName string) string {
|
|||
// newLoadBalancerController create a loadbalancer controller.
|
||||
func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController {
|
||||
kubeClient := fake.NewSimpleClientset()
|
||||
ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second)
|
||||
eventRecorder := utils.NewEventRecorder(kubeClient)
|
||||
ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, eventRecorder)
|
||||
lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager)
|
||||
if err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
|
@ -95,10 +96,12 @@ func toIngressRules(hostRules map[string]utils.FakeIngressRuleValueMap) []extens
|
|||
|
||||
// newIngress returns a new Ingress with the given path map.
|
||||
func newIngress(hostRules map[string]utils.FakeIngressRuleValueMap) *extensions.Ingress {
|
||||
id := uuid.NewUUID()
|
||||
return &extensions.Ingress{
|
||||
ObjectMeta: meta_v1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%v", uuid.NewUUID()),
|
||||
Name: fmt.Sprintf("%v", id),
|
||||
Namespace: api.NamespaceNone,
|
||||
UID: id,
|
||||
},
|
||||
Spec: extensions.IngressSpec{
|
||||
Backend: &extensions.IngressBackend{
|
||||
|
|
|
@ -17,7 +17,10 @@ limitations under the License.
|
|||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
||||
|
@ -65,7 +68,7 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager
|
|||
testDefaultBeNodePort,
|
||||
namer,
|
||||
)
|
||||
frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallsProvider(), namer)
|
||||
frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallsProvider(false, false), namer, fakeRaiseIngressEvent)
|
||||
cm := &ClusterManager{
|
||||
ClusterNamer: namer,
|
||||
instancePool: nodePool,
|
||||
|
@ -75,3 +78,12 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager
|
|||
}
|
||||
return &fakeClusterManager{cm, fakeLbs, fakeBackends, fakeIGs}
|
||||
}
|
||||
|
||||
func fakeRaiseIngressEvent(ing *extensions.Ingress, reason, msg string) {
|
||||
ingName := "nil-ingress"
|
||||
if ing != nil {
|
||||
ingName = ing.Name
|
||||
}
|
||||
|
||||
fmt.Printf("Ingress %q Event %q: %q\n", ingName, reason, msg)
|
||||
}
|
||||
|
|
|
@ -25,14 +25,21 @@ import (
|
|||
)
|
||||
|
||||
type fakeFirewallsProvider struct {
|
||||
fw map[string]*compute.Firewall
|
||||
networkUrl string
|
||||
fw map[string]*compute.Firewall
|
||||
networkProjectID string
|
||||
networkURL string
|
||||
onXPN bool
|
||||
fwReadOnly bool
|
||||
}
|
||||
|
||||
// NewFakeFirewallsProvider creates a fake for firewall rules.
|
||||
func NewFakeFirewallsProvider() *fakeFirewallsProvider {
|
||||
func NewFakeFirewallsProvider(onXPN bool, fwReadOnly bool) *fakeFirewallsProvider {
|
||||
return &fakeFirewallsProvider{
|
||||
fw: make(map[string]*compute.Firewall),
|
||||
fw: make(map[string]*compute.Firewall),
|
||||
networkProjectID: "test-network-project",
|
||||
networkURL: "/path/to/my-network",
|
||||
onXPN: onXPN,
|
||||
fwReadOnly: fwReadOnly,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,7 +51,7 @@ func (ff *fakeFirewallsProvider) GetFirewall(name string) (*compute.Firewall, er
|
|||
return nil, utils.FakeGoogleAPINotFoundErr()
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) CreateFirewall(f *compute.Firewall) error {
|
||||
func (ff *fakeFirewallsProvider) doCreateFirewall(f *compute.Firewall) error {
|
||||
if _, exists := ff.fw[f.Name]; exists {
|
||||
return fmt.Errorf("firewall rule %v already exists", f.Name)
|
||||
}
|
||||
|
@ -52,7 +59,15 @@ func (ff *fakeFirewallsProvider) CreateFirewall(f *compute.Firewall) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) DeleteFirewall(name string) error {
|
||||
func (ff *fakeFirewallsProvider) CreateFirewall(f *compute.Firewall) error {
|
||||
if ff.fwReadOnly {
|
||||
return utils.FakeGoogleAPIForbiddenErr()
|
||||
}
|
||||
|
||||
return ff.doCreateFirewall(f)
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) doDeleteFirewall(name string) error {
|
||||
// We need the full name for the same reason as CreateFirewall.
|
||||
_, exists := ff.fw[name]
|
||||
if !exists {
|
||||
|
@ -63,7 +78,15 @@ func (ff *fakeFirewallsProvider) DeleteFirewall(name string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) UpdateFirewall(f *compute.Firewall) error {
|
||||
func (ff *fakeFirewallsProvider) DeleteFirewall(name string) error {
|
||||
if ff.fwReadOnly {
|
||||
return utils.FakeGoogleAPIForbiddenErr()
|
||||
}
|
||||
|
||||
return ff.doDeleteFirewall(name)
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) doUpdateFirewall(f *compute.Firewall) error {
|
||||
// We need the full name for the same reason as CreateFirewall.
|
||||
_, exists := ff.fw[f.Name]
|
||||
if !exists {
|
||||
|
@ -74,8 +97,24 @@ func (ff *fakeFirewallsProvider) UpdateFirewall(f *compute.Firewall) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) UpdateFirewall(f *compute.Firewall) error {
|
||||
if ff.fwReadOnly {
|
||||
return utils.FakeGoogleAPIForbiddenErr()
|
||||
}
|
||||
|
||||
return ff.doUpdateFirewall(f)
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) NetworkProjectID() string {
|
||||
return ff.networkProjectID
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) NetworkURL() string {
|
||||
return ff.networkUrl
|
||||
return ff.networkURL
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) OnXPN() bool {
|
||||
return ff.onXPN
|
||||
}
|
||||
|
||||
func (ff *fakeFirewallsProvider) GetNodeTags(nodeNames []string) ([]string, error) {
|
||||
|
|
|
@ -17,12 +17,15 @@ limitations under the License.
|
|||
package firewalls
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||
netset "k8s.io/kubernetes/pkg/util/net/sets"
|
||||
|
||||
"k8s.io/ingress/controllers/gce/utils"
|
||||
|
@ -36,21 +39,28 @@ type FirewallRules struct {
|
|||
cloud Firewall
|
||||
namer *utils.Namer
|
||||
srcRanges []string
|
||||
recorder utils.IngressEventRecorder
|
||||
}
|
||||
|
||||
// NewFirewallPool creates a new firewall rule manager.
|
||||
// cloud: the cloud object implementing Firewall.
|
||||
// namer: cluster namer.
|
||||
func NewFirewallPool(cloud Firewall, namer *utils.Namer) SingleFirewallPool {
|
||||
// recorder: the func for raising an event.
|
||||
func NewFirewallPool(cloud Firewall, namer *utils.Namer, recorder utils.IngressEventRecorder) SingleFirewallPool {
|
||||
_, err := netset.ParseIPNets(l7SrcRanges...)
|
||||
if err != nil {
|
||||
glog.Fatalf("Could not parse L7 src ranges %v for firewall rule: %v", l7SrcRanges, err)
|
||||
}
|
||||
return &FirewallRules{cloud: cloud, namer: namer, srcRanges: l7SrcRanges}
|
||||
return &FirewallRules{
|
||||
cloud: cloud,
|
||||
namer: namer,
|
||||
srcRanges: l7SrcRanges,
|
||||
recorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
// Sync sync firewall rules with the cloud.
|
||||
func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error {
|
||||
func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string, ing *extensions.Ingress) error {
|
||||
if len(nodePorts) == 0 {
|
||||
return fr.Shutdown()
|
||||
}
|
||||
|
@ -68,7 +78,7 @@ func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error {
|
|||
|
||||
if rule == nil {
|
||||
glog.Infof("Creating global l7 firewall rule %v", name)
|
||||
return fr.cloud.CreateFirewall(firewall)
|
||||
return fr.createFirewall(firewall, ing)
|
||||
}
|
||||
|
||||
requiredPorts := sets.NewString()
|
||||
|
@ -92,15 +102,48 @@ func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error {
|
|||
return nil
|
||||
}
|
||||
glog.V(3).Infof("Firewall %v already exists, updating nodeports %v", name, nodePorts)
|
||||
return fr.cloud.UpdateFirewall(firewall)
|
||||
return fr.updateFirewall(firewall, ing)
|
||||
}
|
||||
|
||||
func (fr *FirewallRules) createFirewall(f *compute.Firewall, ing *extensions.Ingress) error {
|
||||
err := fr.cloud.CreateFirewall(f)
|
||||
if utils.IsForbiddenError(err) && fr.cloud.OnXPN() {
|
||||
gcloudCmd := gce.FirewallToGCloudCreateCmd(f, fr.cloud.NetworkProjectID())
|
||||
glog.V(3).Infof("Could not create L7 firewall on XPN cluster. Raising event for cmd: %q", gcloudCmd)
|
||||
fr.raiseFirewallChangeNeededEvent(ing, gcloudCmd)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (fr *FirewallRules) updateFirewall(f *compute.Firewall, ing *extensions.Ingress) error {
|
||||
err := fr.cloud.UpdateFirewall(f)
|
||||
if utils.IsForbiddenError(err) && fr.cloud.OnXPN() {
|
||||
gcloudCmd := gce.FirewallToGCloudUpdateCmd(f, fr.cloud.NetworkProjectID())
|
||||
glog.V(3).Infof("Could not update L7 firewall on XPN cluster. Raising event for cmd: %q", gcloudCmd)
|
||||
fr.raiseFirewallChangeNeededEvent(ing, gcloudCmd)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (fr *FirewallRules) deleteFirewall(name string) error {
|
||||
err := fr.cloud.DeleteFirewall(name)
|
||||
if utils.IsForbiddenError(err) && fr.cloud.OnXPN() {
|
||||
gcloudCmd := gce.FirewallToGCloudDeleteCmd(name, fr.cloud.NetworkProjectID())
|
||||
glog.V(3).Infof("Could not delete L7 firewall on XPN cluster. %q needs to be ran.", gcloudCmd)
|
||||
// An event cannot be raised because there's no ingress for reference.
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Shutdown shuts down this firewall rules manager.
|
||||
func (fr *FirewallRules) Shutdown() error {
|
||||
name := fr.namer.FrName(fr.namer.FrSuffix())
|
||||
glog.Infof("Deleting firewall %v", name)
|
||||
err := fr.cloud.DeleteFirewall(name)
|
||||
if err != nil && utils.IsHTTPErrorCode(err, 404) {
|
||||
err := fr.deleteFirewall(name)
|
||||
if utils.IsNotFoundError(err) {
|
||||
glog.Infof("Firewall with name %v didn't exist at Shutdown", name)
|
||||
return nil
|
||||
}
|
||||
|
@ -141,3 +184,16 @@ func (fr *FirewallRules) createFirewallObject(firewallName, description string,
|
|||
TargetTags: targetTags,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (fr *FirewallRules) raiseFirewallChangeNeededEvent(ing *extensions.Ingress, cmd string) {
|
||||
if fr.recorder == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Cannot attach an event to a nil object
|
||||
if ing == nil {
|
||||
return
|
||||
}
|
||||
|
||||
fr.recorder(ing, "LoadBalancerManualChange", fmt.Sprintf("Firewall change required by network admin: `%v`", cmd))
|
||||
}
|
||||
|
|
|
@ -20,20 +20,22 @@ import (
|
|||
"strconv"
|
||||
"testing"
|
||||
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/ingress/controllers/gce/utils"
|
||||
)
|
||||
|
||||
func TestSyncFirewallPool(t *testing.T) {
|
||||
namer := utils.NewNamer("ABC", "XYZ")
|
||||
fwp := NewFakeFirewallsProvider()
|
||||
fp := NewFirewallPool(fwp, namer)
|
||||
fwp := NewFakeFirewallsProvider(false, false)
|
||||
fp := NewFirewallPool(fwp, namer, nil)
|
||||
ruleName := namer.FrName(namer.FrSuffix())
|
||||
|
||||
// Test creating a firewall rule via Sync
|
||||
nodePorts := []int64{80, 443, 3000}
|
||||
nodes := []string{"node-a", "node-b", "node-c"}
|
||||
err := fp.Sync(nodePorts, nodes)
|
||||
err := fp.Sync(nodePorts, nodes, nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when syncing firewall, err: %v", err)
|
||||
}
|
||||
|
@ -41,25 +43,25 @@ func TestSyncFirewallPool(t *testing.T) {
|
|||
|
||||
// Sync to fewer ports
|
||||
nodePorts = []int64{80, 443}
|
||||
err = fp.Sync(nodePorts, nodes)
|
||||
err = fp.Sync(nodePorts, nodes, nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when syncing firewall, err: %v", err)
|
||||
}
|
||||
verifyFirewallRule(fwp, ruleName, nodePorts, nodes, l7SrcRanges, t)
|
||||
|
||||
firewall, err := fp.(*FirewallRules).createFirewallObject(namer.FrName(namer.FrSuffix()), "", nodePorts, nodes)
|
||||
firewall, err := fp.(*FirewallRules).createFirewallObject(ruleName, "", nodePorts, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when creating firewall object, err: %v", err)
|
||||
}
|
||||
|
||||
err = fwp.UpdateFirewall(firewall)
|
||||
err = fwp.doUpdateFirewall(firewall)
|
||||
if err != nil {
|
||||
t.Errorf("failed to update firewall rule, err: %v", err)
|
||||
}
|
||||
verifyFirewallRule(fwp, ruleName, nodePorts, nodes, l7SrcRanges, t)
|
||||
|
||||
// Run Sync and expect l7 src ranges to be returned
|
||||
err = fp.Sync(nodePorts, nodes)
|
||||
err = fp.Sync(nodePorts, nodes, nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when syncing firewall, err: %v", err)
|
||||
}
|
||||
|
@ -68,7 +70,7 @@ func TestSyncFirewallPool(t *testing.T) {
|
|||
// Add node and expect firewall to remain the same
|
||||
// NOTE: See computeHostTag(..) in gce cloudprovider
|
||||
nodes = []string{"node-a", "node-b", "node-c", "node-d"}
|
||||
err = fp.Sync(nodePorts, nodes)
|
||||
err = fp.Sync(nodePorts, nodes, nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when syncing firewall, err: %v", err)
|
||||
}
|
||||
|
@ -76,7 +78,7 @@ func TestSyncFirewallPool(t *testing.T) {
|
|||
|
||||
// Remove all ports and expect firewall rule to disappear
|
||||
nodePorts = []int64{}
|
||||
err = fp.Sync(nodePorts, nodes)
|
||||
err = fp.Sync(nodePorts, nodes, nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when syncing firewall, err: %v", err)
|
||||
}
|
||||
|
@ -87,6 +89,87 @@ func TestSyncFirewallPool(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestSyncOnXPNWithPermission tests that firwall sync continues to work when OnXPN=true
|
||||
func TestSyncOnXPNWithPermission(t *testing.T) {
|
||||
namer := utils.NewNamer("ABC", "XYZ")
|
||||
fwp := NewFakeFirewallsProvider(true, false)
|
||||
fp := NewFirewallPool(fwp, namer, nil)
|
||||
ruleName := namer.FrName(namer.FrSuffix())
|
||||
|
||||
// Test creating a firewall rule via Sync
|
||||
nodePorts := []int64{80, 443, 3000}
|
||||
nodes := []string{"node-a", "node-b", "node-c"}
|
||||
err := fp.Sync(nodePorts, nodes, nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when syncing firewall, err: %v", err)
|
||||
}
|
||||
verifyFirewallRule(fwp, ruleName, nodePorts, nodes, l7SrcRanges, t)
|
||||
}
|
||||
|
||||
// TestSyncOnXPNReadOnly tests that controller behavior is accurate when the controller
|
||||
// does not have permission to create/update/delete firewall rules.
|
||||
// Sync should NOT return an error. An event should be raised.
|
||||
func TestSyncOnXPNReadOnly(t *testing.T) {
|
||||
ing := &extensions.Ingress{ObjectMeta: meta_v1.ObjectMeta{Name: "xpn-ingress"}}
|
||||
var events []string
|
||||
eventer := func(ing *extensions.Ingress, reason, msg string) {
|
||||
events = append(events, msg)
|
||||
}
|
||||
|
||||
namer := utils.NewNamer("ABC", "XYZ")
|
||||
fwp := NewFakeFirewallsProvider(true, true)
|
||||
fp := NewFirewallPool(fwp, namer, eventer)
|
||||
ruleName := namer.FrName(namer.FrSuffix())
|
||||
|
||||
// Test creating a firewall rule via Sync
|
||||
nodePorts := []int64{80, 443, 3000}
|
||||
nodes := []string{"node-a", "node-b", "node-c"}
|
||||
err := fp.Sync(nodePorts, nodes, ing)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when syncing firewall, err: %v", err)
|
||||
}
|
||||
|
||||
// Expect an event saying a firewall needs to be created
|
||||
if len(events) != 1 {
|
||||
t.Errorf("expected %v events but received %v: %+v", 1, len(events), events)
|
||||
}
|
||||
|
||||
// Clear events
|
||||
events = events[:0]
|
||||
|
||||
// Manually create the firewall
|
||||
firewall, err := fp.(*FirewallRules).createFirewallObject(ruleName, "", nodePorts, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when creating firewall object, err: %v", err)
|
||||
}
|
||||
err = fwp.doCreateFirewall(firewall)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when creating firewall, err: %v", err)
|
||||
}
|
||||
|
||||
// Run sync again with same state - expect no event
|
||||
err = fp.Sync(nodePorts, nodes, ing)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when syncing firewall, err: %v", err)
|
||||
}
|
||||
if len(events) > 0 {
|
||||
t.Errorf("received unexpected event(s): %+v", events)
|
||||
}
|
||||
|
||||
// Modify nodePorts to cause an event
|
||||
nodePorts = append(nodePorts, 3001)
|
||||
|
||||
// Run sync again with same state - expect no event
|
||||
err = fp.Sync(nodePorts, nodes, ing)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err when syncing firewall, err: %v", err)
|
||||
}
|
||||
// Expect an event saying a firewall needs to be created
|
||||
if len(events) != 1 {
|
||||
t.Errorf("expected %v events but received %v: %+v", 1, len(events), events)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyFirewallRule(fwp *fakeFirewallsProvider, ruleName string, expectedPorts []int64, expectedNodes, expectedCIDRs []string, t *testing.T) {
|
||||
var strPorts []string
|
||||
for _, v := range expectedPorts {
|
||||
|
|
|
@ -18,12 +18,13 @@ package firewalls
|
|||
|
||||
import (
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
)
|
||||
|
||||
// SingleFirewallPool syncs the firewall rule for L7 traffic.
|
||||
type SingleFirewallPool interface {
|
||||
// TODO: Take a list of node ports for the firewall.
|
||||
Sync(nodePorts []int64, nodeNames []string) error
|
||||
Sync(nodePorts []int64, nodeNames []string, ing *extensions.Ingress) error
|
||||
Shutdown() error
|
||||
}
|
||||
|
||||
|
@ -36,5 +37,7 @@ type Firewall interface {
|
|||
DeleteFirewall(name string) error
|
||||
UpdateFirewall(f *compute.Firewall) error
|
||||
GetNodeTags(nodeNames []string) ([]string, error)
|
||||
NetworkProjectID() string
|
||||
NetworkURL() string
|
||||
OnXPN() bool
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import (
|
|||
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
flag "github.com/spf13/pflag"
|
||||
"k8s.io/api/core/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
@ -129,7 +129,7 @@ var (
|
|||
`Path used to health-check a backend service. All Services must serve
|
||||
a 200 page on this path. Currently this is only configurable globally.`)
|
||||
|
||||
watchNamespace = flags.String("watch-namespace", v1.NamespaceAll,
|
||||
watchNamespace = flags.String("watch-namespace", apiv1.NamespaceAll,
|
||||
`Namespace to watch for Ingress/Services/Endpoints.`)
|
||||
|
||||
verbose = flags.Bool("verbose", false,
|
||||
|
@ -158,7 +158,6 @@ func registerHandlers(lbc *controller.LoadBalancerController) {
|
|||
// TODO: Retry failures during shutdown.
|
||||
lbc.Stop(true)
|
||||
})
|
||||
|
||||
glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *healthzPort), nil))
|
||||
}
|
||||
|
||||
|
@ -249,6 +248,8 @@ func main() {
|
|||
SvcPort: intstr.FromInt(int(port)),
|
||||
}
|
||||
|
||||
eventRecorder := utils.NewEventRecorder(kubeClient)
|
||||
|
||||
var cloud *gce.GCECloud
|
||||
if *inCluster || *useRealCloud {
|
||||
// Create cluster manager
|
||||
|
@ -278,7 +279,7 @@ func main() {
|
|||
glog.Infof("Created GCE client without a config file")
|
||||
}
|
||||
|
||||
clusterManager, err = controller.NewClusterManager(cloud, namer, defaultBackendNodePort, *healthCheckPath)
|
||||
clusterManager, err = controller.NewClusterManager(cloud, namer, defaultBackendNodePort, *healthCheckPath, eventRecorder)
|
||||
if err != nil {
|
||||
glog.Fatalf("%v", err)
|
||||
}
|
||||
|
@ -287,14 +288,13 @@ func main() {
|
|||
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
|
||||
}
|
||||
|
||||
ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod)
|
||||
ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, eventRecorder)
|
||||
|
||||
// Start loadbalancer controller
|
||||
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager)
|
||||
if err != nil {
|
||||
glog.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
if clusterManager.ClusterNamer.GetClusterName() != "" {
|
||||
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName())
|
||||
}
|
||||
|
@ -453,7 +453,7 @@ func getClusterUID(kubeClient kubernetes.Interface, name string) (string, error)
|
|||
|
||||
// getNodePort waits for the Service, and returns it's first node port.
|
||||
func getNodePort(client kubernetes.Interface, ns, name string) (port, nodePort int32, err error) {
|
||||
var svc *v1.Service
|
||||
var svc *apiv1.Service
|
||||
glog.V(3).Infof("Waiting for %v/%v", ns, name)
|
||||
wait.Poll(1*time.Second, 5*time.Minute, func() (bool, error) {
|
||||
svc, err = client.Core().Services(ns).Get(name, metav1.GetOptions{})
|
||||
|
|
|
@ -24,6 +24,13 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
scheme "k8s.io/client-go/kubernetes/scheme"
|
||||
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"github.com/golang/glog"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
"google.golang.org/api/googleapi"
|
||||
|
@ -309,9 +316,14 @@ func (g GCEURLMap) PutDefaultBackend(d *compute.BackendService) {
|
|||
}
|
||||
}
|
||||
|
||||
// FakeGoogleAPIForbiddenErr creates a Forbidden error with type googleapi.Error
|
||||
func FakeGoogleAPIForbiddenErr() *googleapi.Error {
|
||||
return &googleapi.Error{Code: http.StatusForbidden}
|
||||
}
|
||||
|
||||
// FakeGoogleAPINotFoundErr creates a NotFound error with type googleapi.Error
|
||||
func FakeGoogleAPINotFoundErr() *googleapi.Error {
|
||||
return &googleapi.Error{Code: 404}
|
||||
return &googleapi.Error{Code: http.StatusNotFound}
|
||||
}
|
||||
|
||||
// IsHTTPErrorCode checks if the given error matches the given HTTP Error code.
|
||||
|
@ -344,6 +356,11 @@ func IsNotFoundError(err error) bool {
|
|||
return IsHTTPErrorCode(err, http.StatusNotFound)
|
||||
}
|
||||
|
||||
// IsForbiddenError returns true if the operation was forbidden
|
||||
func IsForbiddenError(err error) bool {
|
||||
return IsHTTPErrorCode(err, http.StatusForbidden)
|
||||
}
|
||||
|
||||
// CompareLinks returns true if the 2 self links are equal.
|
||||
func CompareLinks(l1, l2 string) bool {
|
||||
// TODO: These can be partial links
|
||||
|
@ -359,3 +376,16 @@ func GetNamedPort(port int64) *compute.NamedPort {
|
|||
// TODO: move port naming to namer
|
||||
return &compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port}
|
||||
}
|
||||
|
||||
// NewEventRecorder returns an event recorder given a kubernetes client
|
||||
func NewEventRecorder(kubeClient kubernetes.Interface) record.EventRecorder {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
|
||||
Interface: kubeClient.Core().Events(""),
|
||||
})
|
||||
return eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"})
|
||||
|
||||
}
|
||||
|
||||
type IngressEventRecorder func(ing *extensions.Ingress, reason, msg string)
|
||||
|
|
Loading…
Reference in a new issue