Handle linking backend service with NEG
This commit is contained in:
parent
5eab04a21f
commit
3e761e4491
4 changed files with 99 additions and 0 deletions
|
@ -25,6 +25,7 @@ import (
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
|
computealpha "google.golang.org/api/compute/v0.alpha"
|
||||||
compute "google.golang.org/api/compute/v1"
|
compute "google.golang.org/api/compute/v1"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
@ -211,6 +212,54 @@ func (b *Backends) create(namedPort *compute.NamedPort, hcLink string, sp Servic
|
||||||
return b.Get(namedPort.Port)
|
return b.Get(namedPort.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Backends) Link(port ServicePort, zones []string) error {
|
||||||
|
if !port.NEGEnabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
negName := b.namer.NEGName(port.SvcName.Namespace, port.SvcName.Name, port.SvcTargetPort)
|
||||||
|
|
||||||
|
negs := []*computealpha.NetworkEndpointGroup{}
|
||||||
|
for _, zone := range zones {
|
||||||
|
neg, err := b.cloud.GetNetworkEndpointGroup(negName, zone)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
negs = append(negs, neg)
|
||||||
|
}
|
||||||
|
|
||||||
|
backendService, err := b.cloud.GetAlphaGlobalBackendService(b.namer.BeName(port.Port))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
targetBackends := getBackendsForNEGs(negs)
|
||||||
|
|
||||||
|
needToUpdate := false
|
||||||
|
for _, backend := range backendService.Backends {
|
||||||
|
found := false
|
||||||
|
for _, negBackend := range targetBackends {
|
||||||
|
// Warnning: Group link includes the api version.
|
||||||
|
// Backend service and NEG API need to have matching API version.
|
||||||
|
// Otherwise, it will always be different
|
||||||
|
if negBackend.Group == backend.Group {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
needToUpdate = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if needToUpdate {
|
||||||
|
backendService.Backends = targetBackends
|
||||||
|
return b.cloud.UpdateAlphaGlobalBackendService(backendService)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Add will get or create a Backend for the given port.
|
// Add will get or create a Backend for the given port.
|
||||||
// Uses the given instance groups if non-nil, else creates instance groups.
|
// Uses the given instance groups if non-nil, else creates instance groups.
|
||||||
func (b *Backends) Add(p ServicePort, igs []*compute.InstanceGroup) error {
|
func (b *Backends) Add(p ServicePort, igs []*compute.InstanceGroup) error {
|
||||||
|
@ -339,6 +388,19 @@ func getBackendsForIGs(igs []*compute.InstanceGroup, bm BalancingMode) []*comput
|
||||||
return backends
|
return backends
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getBackendsForNEGs(negs []*computealpha.NetworkEndpointGroup) []*computealpha.Backend {
|
||||||
|
var backends []*computealpha.Backend
|
||||||
|
for _, neg := range negs {
|
||||||
|
b := &computealpha.Backend{
|
||||||
|
Group: neg.SelfLink,
|
||||||
|
BalancingMode: string(Rate),
|
||||||
|
MaxRate: maxRPS,
|
||||||
|
}
|
||||||
|
backends = append(backends, b)
|
||||||
|
}
|
||||||
|
return backends
|
||||||
|
}
|
||||||
|
|
||||||
// edgeHop checks the links of the given backend by executing an edge hop.
|
// edgeHop checks the links of the given backend by executing an edge hop.
|
||||||
// It fixes broken links.
|
// It fixes broken links.
|
||||||
func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error {
|
func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error {
|
||||||
|
|
|
@ -19,6 +19,7 @@ package backends
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
computealpha "google.golang.org/api/compute/v0.alpha"
|
||||||
compute "google.golang.org/api/compute/v1"
|
compute "google.golang.org/api/compute/v1"
|
||||||
api_v1 "k8s.io/api/core/v1"
|
api_v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
|
@ -62,6 +63,10 @@ func (f *FakeBackendServices) GetGlobalBackendService(name string) (*compute.Bac
|
||||||
return nil, fmt.Errorf("backend service %v not found", name)
|
return nil, fmt.Errorf("backend service %v not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *FakeBackendServices) GetAlphaGlobalBackendService(name string) (*computealpha.BackendService, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// CreateGlobalBackendService fakes backend service creation.
|
// CreateGlobalBackendService fakes backend service creation.
|
||||||
func (f *FakeBackendServices) CreateGlobalBackendService(be *compute.BackendService) error {
|
func (f *FakeBackendServices) CreateGlobalBackendService(be *compute.BackendService) error {
|
||||||
if f.errFunc != nil {
|
if f.errFunc != nil {
|
||||||
|
@ -108,6 +113,15 @@ func (f *FakeBackendServices) UpdateGlobalBackendService(be *compute.BackendServ
|
||||||
return f.backendServices.Update(be)
|
return f.backendServices.Update(be)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateGlobalBackendService fakes updating a backend service.
|
||||||
|
func (f *FakeBackendServices) UpdateAlphaGlobalBackendService(be *computealpha.BackendService) error {
|
||||||
|
return f.backendServices.Update(be)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeBackendServices) GetNetworkEndpointGroup(name string, zone string) (*computealpha.NetworkEndpointGroup, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetGlobalBackendServiceHealth fakes getting backend service health.
|
// GetGlobalBackendServiceHealth fakes getting backend service health.
|
||||||
func (f *FakeBackendServices) GetGlobalBackendServiceHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
|
func (f *FakeBackendServices) GetGlobalBackendServiceHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
|
||||||
be, err := f.GetGlobalBackendService(name)
|
be, err := f.GetGlobalBackendService(name)
|
||||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||||
package backends
|
package backends
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
computealpha "google.golang.org/api/compute/v0.alpha"
|
||||||
compute "google.golang.org/api/compute/v1"
|
compute "google.golang.org/api/compute/v1"
|
||||||
api_v1 "k8s.io/api/core/v1"
|
api_v1 "k8s.io/api/core/v1"
|
||||||
)
|
)
|
||||||
|
@ -38,14 +39,20 @@ type BackendPool interface {
|
||||||
Shutdown() error
|
Shutdown() error
|
||||||
Status(name string) string
|
Status(name string) string
|
||||||
List() ([]interface{}, error)
|
List() ([]interface{}, error)
|
||||||
|
Link(port ServicePort, zones []string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// BackendServices is an interface for managing gce backend services.
|
// BackendServices is an interface for managing gce backend services.
|
||||||
type BackendServices interface {
|
type BackendServices interface {
|
||||||
GetGlobalBackendService(name string) (*compute.BackendService, error)
|
GetGlobalBackendService(name string) (*compute.BackendService, error)
|
||||||
|
GetAlphaGlobalBackendService(name string) (*computealpha.BackendService, error)
|
||||||
UpdateGlobalBackendService(bg *compute.BackendService) error
|
UpdateGlobalBackendService(bg *compute.BackendService) error
|
||||||
|
UpdateAlphaGlobalBackendService(bg *computealpha.BackendService) error
|
||||||
CreateGlobalBackendService(bg *compute.BackendService) error
|
CreateGlobalBackendService(bg *compute.BackendService) error
|
||||||
DeleteGlobalBackendService(name string) error
|
DeleteGlobalBackendService(name string) error
|
||||||
ListGlobalBackendServices() (*compute.BackendServiceList, error)
|
ListGlobalBackendServices() (*compute.BackendServiceList, error)
|
||||||
GetGlobalBackendServiceHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error)
|
GetGlobalBackendServiceHealth(name, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error)
|
||||||
|
|
||||||
|
// TODO: move this out of BackendService
|
||||||
|
GetNetworkEndpointGroup(name string, zone string) (*computealpha.NetworkEndpointGroup, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -335,6 +335,22 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if lbc.negEnabled {
|
||||||
|
svcPorts := lbc.Translator.toNodePorts(&extensions.IngressList{Items: []extensions.Ingress{ing}})
|
||||||
|
for _, svcPort := range svcPorts {
|
||||||
|
if svcPort.NEGEnabled {
|
||||||
|
|
||||||
|
zones, err := lbc.Translator.ListZones()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := lbc.CloudClusterManager.backendPool.Link(svcPort, zones); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Update the UrlMap of the single loadbalancer that came through the watch.
|
// Update the UrlMap of the single loadbalancer that came through the watch.
|
||||||
l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
|
l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue