diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index 6effeae97..f81fc1f8d 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -25,6 +25,7 @@ import ( "github.com/golang/glog" + computealpha "google.golang.org/api/compute/v0.alpha" compute "google.golang.org/api/compute/v1" v1 "k8s.io/api/core/v1" @@ -211,6 +212,54 @@ func (b *Backends) create(namedPort *compute.NamedPort, hcLink string, sp Servic return b.Get(namedPort.Port) } +func (b *Backends) Link(port ServicePort, zones []string) error { + if !port.NEGEnabled { + return nil + } + + negName := b.namer.NEGName(port.SvcName.Namespace, port.SvcName.Name, port.SvcTargetPort) + + negs := []*computealpha.NetworkEndpointGroup{} + for _, zone := range zones { + neg, err := b.cloud.GetNetworkEndpointGroup(negName, zone) + if err != nil { + return err + } + negs = append(negs, neg) + } + + backendService, err := b.cloud.GetAlphaGlobalBackendService(b.namer.BeName(port.Port)) + if err != nil { + return err + } + + targetBackends := getBackendsForNEGs(negs) + + needToUpdate := false + for _, backend := range backendService.Backends { + found := false + for _, negBackend := range targetBackends { + // Warnning: Group link includes the api version. + // Backend service and NEG API need to have matching API version. + // Otherwise, it will always be different + if negBackend.Group == backend.Group { + found = true + break + } + } + if !found { + needToUpdate = true + break + } + } + + if needToUpdate { + backendService.Backends = targetBackends + return b.cloud.UpdateAlphaGlobalBackendService(backendService) + } + return nil +} + // Add will get or create a Backend for the given port. // Uses the given instance groups if non-nil, else creates instance groups. func (b *Backends) Add(p ServicePort, igs []*compute.InstanceGroup) error { @@ -339,6 +388,19 @@ func getBackendsForIGs(igs []*compute.InstanceGroup, bm BalancingMode) []*comput return backends } +func getBackendsForNEGs(negs []*computealpha.NetworkEndpointGroup) []*computealpha.Backend { + var backends []*computealpha.Backend + for _, neg := range negs { + b := &computealpha.Backend{ + Group: neg.SelfLink, + BalancingMode: string(Rate), + MaxRate: maxRPS, + } + backends = append(backends, b) + } + return backends +} + // edgeHop checks the links of the given backend by executing an edge hop. // It fixes broken links. func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error { diff --git a/controllers/gce/backends/fakes.go b/controllers/gce/backends/fakes.go index a054d8d2a..aa1198e8c 100644 --- a/controllers/gce/backends/fakes.go +++ b/controllers/gce/backends/fakes.go @@ -19,6 +19,7 @@ package backends import ( "fmt" + computealpha "google.golang.org/api/compute/v0.alpha" compute "google.golang.org/api/compute/v1" api_v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" @@ -62,6 +63,10 @@ func (f *FakeBackendServices) GetGlobalBackendService(name string) (*compute.Bac return nil, fmt.Errorf("backend service %v not found", name) } +func (f *FakeBackendServices) GetAlphaGlobalBackendService(name string) (*computealpha.BackendService, error) { + return nil, nil +} + // CreateGlobalBackendService fakes backend service creation. func (f *FakeBackendServices) CreateGlobalBackendService(be *compute.BackendService) error { if f.errFunc != nil { @@ -108,6 +113,15 @@ func (f *FakeBackendServices) UpdateGlobalBackendService(be *compute.BackendServ return f.backendServices.Update(be) } +// UpdateGlobalBackendService fakes updating a backend service. +func (f *FakeBackendServices) UpdateAlphaGlobalBackendService(be *computealpha.BackendService) error { + return f.backendServices.Update(be) +} + +func (f *FakeBackendServices) GetNetworkEndpointGroup(name string, zone string) (*computealpha.NetworkEndpointGroup, error) { + return nil, nil +} + // GetGlobalBackendServiceHealth fakes getting backend service health. func (f *FakeBackendServices) GetGlobalBackendServiceHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) { be, err := f.GetGlobalBackendService(name) diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 586ceb17c..a06ef04d4 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -17,6 +17,7 @@ limitations under the License. package backends import ( + computealpha "google.golang.org/api/compute/v0.alpha" compute "google.golang.org/api/compute/v1" api_v1 "k8s.io/api/core/v1" ) @@ -38,14 +39,20 @@ type BackendPool interface { Shutdown() error Status(name string) string List() ([]interface{}, error) + Link(port ServicePort, zones []string) error } // BackendServices is an interface for managing gce backend services. type BackendServices interface { GetGlobalBackendService(name string) (*compute.BackendService, error) + GetAlphaGlobalBackendService(name string) (*computealpha.BackendService, error) UpdateGlobalBackendService(bg *compute.BackendService) error + UpdateAlphaGlobalBackendService(bg *computealpha.BackendService) error CreateGlobalBackendService(bg *compute.BackendService) error DeleteGlobalBackendService(name string) error ListGlobalBackendServices() (*compute.BackendServiceList, error) GetGlobalBackendServiceHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) + + // TODO: move this out of BackendService + GetNetworkEndpointGroup(name string, zone string) (*computealpha.NetworkEndpointGroup, error) } diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index 22844d11c..01b9bbd6d 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -335,6 +335,22 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { return nil } + if lbc.negEnabled { + svcPorts := lbc.Translator.toNodePorts(&extensions.IngressList{Items: []extensions.Ingress{ing}}) + for _, svcPort := range svcPorts { + if svcPort.NEGEnabled { + + zones, err := lbc.Translator.ListZones() + if err != nil { + return err + } + if err := lbc.CloudClusterManager.backendPool.Link(svcPort, zones); err != nil { + return err + } + } + } + } + // Update the UrlMap of the single loadbalancer that came through the watch. l7, err := lbc.CloudClusterManager.l7Pool.Get(key) if err != nil {