DO NOT MERGE update gce cloud provider for development

This commit is contained in:
Minhan Xia 2017-08-22 17:11:59 -07:00
parent 7f2591f5f6
commit b75a9b33f2
29 changed files with 23786 additions and 2008 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,5 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
@ -15,6 +13,7 @@ go_library(
"gce.go",
"gce_addresses.go",
"gce_addresses_fakes.go",
"gce_alpha.go",
"gce_annotations.go",
"gce_backendservice.go",
"gce_cert.go",
@ -31,6 +30,7 @@ go_library(
"gce_loadbalancer_external.go",
"gce_loadbalancer_internal.go",
"gce_loadbalancer_naming.go",
"gce_networkendpointgroup.go",
"gce_op.go",
"gce_routes.go",
"gce_targetpool.go",
@ -41,7 +41,6 @@ go_library(
"metrics.go",
"token_source.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/v1/service:go_default_library",
"//pkg/cloudprovider:go_default_library",
@ -85,12 +84,16 @@ go_test(
"gce_healthchecks_test.go",
"gce_loadbalancer_external_test.go",
"gce_test.go",
"metrics_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/cloudprovider:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/golang.org/x/oauth2/google:go_default_library",
"//vendor/google.golang.org/api/compute/v0.alpha:go_default_library",
"//vendor/google.golang.org/api/compute/v0.beta:go_default_library",
"//vendor/google.golang.org/api/compute/v1:go_default_library",
"//vendor/google.golang.org/api/googleapi:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",

View file

@ -16,4 +16,4 @@ limitations under the License.
// Package gce is an implementation of Interface, LoadBalancer
// and Instances for Google Compute Engine.
package gce
package gce // import "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"

View file

@ -80,6 +80,11 @@ const (
gceComputeAPIEndpoint = "https://www.googleapis.com/compute/v1/"
)
// gceObject is an abstraction of all GCE API object in go client
type gceObject interface {
MarshalJSON() ([]byte, error)
}
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
// ClusterID contains functionality for getting (and initializing) the ingress-uid. Call GCECloud.Initialize()
@ -113,6 +118,11 @@ type GCECloud struct {
// lock to prevent shared resources from being prematurely deleted while the operation is
// in progress.
sharedResourceLock sync.Mutex
// AlphaFeatureGate gates gce alpha features in GCECloud instance.
// Related wrapper functions that interacts with gce alpha api should examine whether
// the corresponding api is enabled.
// If not enabled, it should return error.
AlphaFeatureGate *AlphaFeatureGate
}
type ServiceManager interface {
@ -133,7 +143,7 @@ type GCEServiceManager struct {
gce *GCECloud
}
type Config struct {
type ConfigFile struct {
Global struct {
TokenURL string `gcfg:"token-url"`
TokenBody string `gcfg:"token-body"`
@ -145,9 +155,29 @@ type Config struct {
Multizone bool `gcfg:"multizone"`
// Specifying ApiEndpoint will override the default GCE compute API endpoint.
ApiEndpoint string `gcfg:"api-endpoint"`
LocalZone string `gcfg:"local-zone"`
// Possible values: List of api names separated by comma. Default to none.
// For example: MyFeatureFlag
AlphaFeatures []string `gcfg:"alpha-features"`
}
}
// CloudConfig includes all the necessary configuration for creating GCECloud
type CloudConfig struct {
ApiEndpoint string
ProjectID string
Region string
Zone string
ManagedZones []string
NetworkURL string
SubnetworkURL string
NodeTags []string
NodeInstancePrefix string
TokenSource oauth2.TokenSource
UseMetadataServer bool
AlphaFeatureGate *AlphaFeatureGate
}
func init() {
cloudprovider.RegisterCloudProvider(
ProviderName,
@ -172,77 +202,28 @@ func (g *GCECloud) GetProjectID() string {
}
// newGCECloud creates a new instance of GCECloud.
func newGCECloud(config io.Reader) (*GCECloud, error) {
apiEndpoint := ""
projectID, zone, err := getProjectAndZone()
if err != nil {
return nil, err
}
func newGCECloud(config io.Reader) (gceCloud *GCECloud, err error) {
var cloudConfig *CloudConfig
var configFile *ConfigFile
region, err := GetGCERegion(zone)
if err != nil {
return nil, err
}
networkName, err := getNetworkNameViaMetadata()
if err != nil {
return nil, err
}
networkURL := gceNetworkURL("", projectID, networkName)
subnetworkURL := ""
// By default, Kubernetes clusters only run against one zone
managedZones := []string{zone}
tokenSource := google.ComputeTokenSource("")
var nodeTags []string
var nodeInstancePrefix string
if config != nil {
cfg, err := readConfig(config)
configFile, err = readConfig(config)
if err != nil {
return nil, err
}
glog.Infof("Using GCE provider config %+v", cfg)
if cfg.Global.ApiEndpoint != "" {
apiEndpoint = cfg.Global.ApiEndpoint
}
if cfg.Global.ProjectID != "" {
projectID = cfg.Global.ProjectID
}
if cfg.Global.NetworkName != "" {
if strings.Contains(cfg.Global.NetworkName, "/") {
networkURL = cfg.Global.NetworkName
} else {
networkURL = gceNetworkURL(apiEndpoint, projectID, cfg.Global.NetworkName)
}
}
if cfg.Global.SubnetworkName != "" {
if strings.Contains(cfg.Global.SubnetworkName, "/") {
subnetworkURL = cfg.Global.SubnetworkName
} else {
subnetworkURL = gceSubnetworkURL(apiEndpoint, projectID, region, cfg.Global.SubnetworkName)
}
}
if cfg.Global.TokenURL != "" {
tokenSource = NewAltTokenSource(cfg.Global.TokenURL, cfg.Global.TokenBody)
}
nodeTags = cfg.Global.NodeTags
nodeInstancePrefix = cfg.Global.NodeInstancePrefix
if cfg.Global.Multizone {
managedZones = nil // Use all zones in region
}
glog.Infof("Using GCE provider config %+v", configFile)
}
return CreateGCECloud(apiEndpoint, projectID, region, zone, managedZones, networkURL, subnetworkURL,
nodeTags, nodeInstancePrefix, tokenSource, true /* useMetadataServer */)
cloudConfig, err = generateCloudConfig(configFile)
if err != nil {
return nil, err
}
return CreateGCECloud(cloudConfig)
}
func readConfig(reader io.Reader) (*Config, error) {
cfg := &Config{}
func readConfig(reader io.Reader) (*ConfigFile, error) {
cfg := &ConfigFile{}
if err := gcfg.FatalOnly(gcfg.ReadInto(cfg, reader)); err != nil {
glog.Errorf("Couldn't read config: %v", err)
return nil, err
@ -250,14 +231,98 @@ func readConfig(reader io.Reader) (*Config, error) {
return cfg, nil
}
func generateCloudConfig(configFile *ConfigFile) (cloudConfig *CloudConfig, err error) {
cloudConfig = &CloudConfig{}
// By default, fetch token from GCE metadata server
cloudConfig.TokenSource = google.ComputeTokenSource("")
cloudConfig.UseMetadataServer = true
if configFile != nil {
if configFile.Global.ApiEndpoint != "" {
cloudConfig.ApiEndpoint = configFile.Global.ApiEndpoint
}
if configFile.Global.TokenURL != "" {
// if tokenURL is nil, set tokenSource to nil. This will force the OAuth client to fall
// back to use DefaultTokenSource. This allows running gceCloud remotely.
if configFile.Global.TokenURL == "nil" {
cloudConfig.TokenSource = nil
} else {
cloudConfig.TokenSource = NewAltTokenSource(configFile.Global.TokenURL, configFile.Global.TokenBody)
}
}
cloudConfig.NodeTags = configFile.Global.NodeTags
cloudConfig.NodeInstancePrefix = configFile.Global.NodeInstancePrefix
alphaFeatureGate, err := NewAlphaFeatureGate(configFile.Global.AlphaFeatures)
if err != nil {
glog.Errorf("Encountered error for creating alpha feature gate: %v", err)
}
cloudConfig.AlphaFeatureGate = alphaFeatureGate
}
// retrieve projectID and zone
if configFile == nil || configFile.Global.ProjectID == "" || configFile.Global.LocalZone == "" {
cloudConfig.ProjectID, cloudConfig.Zone, err = getProjectAndZone()
if err != nil {
return nil, err
}
}
if configFile != nil {
if configFile.Global.ProjectID != "" {
cloudConfig.ProjectID = configFile.Global.ProjectID
}
if configFile.Global.LocalZone != "" {
cloudConfig.Zone = configFile.Global.LocalZone
}
}
// retrieve region
cloudConfig.Region, err = GetGCERegion(cloudConfig.Zone)
if err != nil {
return nil, err
}
// generate managedZones
cloudConfig.ManagedZones = []string{cloudConfig.Zone}
if configFile != nil && configFile.Global.Multizone {
cloudConfig.ManagedZones = nil // Use all zones in region
}
// generate networkURL
if configFile != nil && configFile.Global.NetworkName != "" {
if strings.Contains(configFile.Global.NetworkName, "/") {
cloudConfig.NetworkURL = configFile.Global.NetworkName
} else {
cloudConfig.NetworkURL = gceNetworkURL(cloudConfig.ApiEndpoint, cloudConfig.ProjectID, configFile.Global.NetworkName)
}
} else {
networkName, err := getNetworkNameViaMetadata()
if err != nil {
return nil, err
}
cloudConfig.NetworkURL = gceNetworkURL("", cloudConfig.ProjectID, networkName)
}
// generate subnetworkURL
if configFile != nil && configFile.Global.SubnetworkName != "" {
if strings.Contains(configFile.Global.SubnetworkName, "/") {
cloudConfig.SubnetworkURL = configFile.Global.SubnetworkName
} else {
cloudConfig.SubnetworkURL = gceSubnetworkURL(cloudConfig.ApiEndpoint, cloudConfig.ProjectID, cloudConfig.Region, configFile.Global.SubnetworkName)
}
}
return cloudConfig, err
}
// Creates a GCECloud object using the specified parameters.
// If no networkUrl is specified, loads networkName via rest call.
// If no tokenSource is specified, uses oauth2.DefaultTokenSource.
// If managedZones is nil / empty all zones in the region will be managed.
func CreateGCECloud(apiEndpoint, projectID, region, zone string, managedZones []string, networkURL, subnetworkURL string, nodeTags []string,
nodeInstancePrefix string, tokenSource oauth2.TokenSource, useMetadataServer bool) (*GCECloud, error) {
func CreateGCECloud(config *CloudConfig) (*GCECloud, error) {
client, err := newOauthClient(tokenSource)
client, err := newOauthClient(config.TokenSource)
if err != nil {
return nil, err
}
@ -266,7 +331,7 @@ func CreateGCECloud(apiEndpoint, projectID, region, zone string, managedZones []
return nil, err
}
client, err = newOauthClient(tokenSource)
client, err = newOauthClient(config.TokenSource)
if err != nil {
return nil, err
}
@ -275,7 +340,7 @@ func CreateGCECloud(apiEndpoint, projectID, region, zone string, managedZones []
return nil, err
}
client, err = newOauthClient(tokenSource)
client, err = newOauthClient(config.TokenSource)
if err != nil {
return nil, err
}
@ -288,10 +353,10 @@ func CreateGCECloud(apiEndpoint, projectID, region, zone string, managedZones []
// Generate alpha and beta api endpoints based on override v1 api endpoint.
// For example,
// staging API endpoint: https://www.googleapis.com/compute/staging_v1/
if apiEndpoint != "" {
service.BasePath = fmt.Sprintf("%sprojects/", apiEndpoint)
serviceBeta.BasePath = fmt.Sprintf("%sprojects/", strings.Replace(apiEndpoint, "v1", "beta", 0))
serviceAlpha.BasePath = fmt.Sprintf("%sprojects/", strings.Replace(apiEndpoint, "v1", "alpha", 0))
if config.ApiEndpoint != "" {
service.BasePath = fmt.Sprintf("%sprojects/", config.ApiEndpoint)
serviceBeta.BasePath = fmt.Sprintf("%sprojects/", strings.Replace(config.ApiEndpoint, "v1", "beta", -1))
serviceAlpha.BasePath = fmt.Sprintf("%sprojects/", strings.Replace(config.ApiEndpoint, "v1", "alpha", -1))
}
containerService, err := container.New(client)
@ -304,28 +369,28 @@ func CreateGCECloud(apiEndpoint, projectID, region, zone string, managedZones []
return nil, err
}
if networkURL == "" {
networkName, err := getNetworkNameViaAPICall(service, projectID)
if config.NetworkURL == "" {
networkName, err := getNetworkNameViaAPICall(service, config.ProjectID)
if err != nil {
return nil, err
}
networkURL = gceNetworkURL(apiEndpoint, projectID, networkName)
config.NetworkURL = gceNetworkURL(config.ApiEndpoint, config.ProjectID, networkName)
}
networkProjectID, err := getProjectIDInURL(networkURL)
networkProjectID, err := getProjectIDInURL(config.NetworkURL)
if err != nil {
return nil, err
}
onXPN := networkProjectID != projectID
onXPN := networkProjectID != config.ProjectID
if len(managedZones) == 0 {
managedZones, err = getZonesForRegion(service, projectID, region)
if len(config.ManagedZones) == 0 {
config.ManagedZones, err = getZonesForRegion(service, config.ProjectID, config.Region)
if err != nil {
return nil, err
}
}
if len(managedZones) != 1 {
glog.Infof("managing multiple zones: %v", managedZones)
if len(config.ManagedZones) != 1 {
glog.Infof("managing multiple zones: %v", config.ManagedZones)
}
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
@ -336,18 +401,19 @@ func CreateGCECloud(apiEndpoint, projectID, region, zone string, managedZones []
serviceBeta: serviceBeta,
containerService: containerService,
cloudkmsService: cloudkmsService,
projectID: projectID,
projectID: config.ProjectID,
networkProjectID: networkProjectID,
onXPN: onXPN,
region: region,
localZone: zone,
managedZones: managedZones,
networkURL: networkURL,
subnetworkURL: subnetworkURL,
nodeTags: nodeTags,
nodeInstancePrefix: nodeInstancePrefix,
useMetadataServer: useMetadataServer,
region: config.Region,
localZone: config.Zone,
managedZones: config.ManagedZones,
networkURL: config.NetworkURL,
subnetworkURL: config.SubnetworkURL,
nodeTags: config.NodeTags,
nodeInstancePrefix: config.NodeInstancePrefix,
useMetadataServer: config.UseMetadataServer,
operationPollRateLimiter: operationPollRateLimiter,
AlphaFeatureGate: config.AlphaFeatureGate,
}
gce.manager = &GCEServiceManager{gce}
@ -424,6 +490,11 @@ func (gce *GCECloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []
return nameservers, srchOut
}
// HasClusterID returns true if the cluster has a clusterID
func (gce *GCECloud) HasClusterID() bool {
return true
}
// GCECloud implements cloudprovider.Interface.
var _ cloudprovider.Interface = (*GCECloud)(nil)

View file

@ -17,16 +17,19 @@ limitations under the License.
package gce
import (
"time"
"fmt"
"github.com/golang/glog"
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
)
func newAddressMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"address_" + request, region, unusedMetricLabel},
}
return newAddressMetricContextWithVersion(request, region, computeV1Version)
}
func newAddressMetricContextWithVersion(request, region, version string) *metricContext {
return newGenericMetricContext("address", request, region, unusedMetricLabel, version)
}
// ReserveGlobalAddress creates a global address.
@ -69,6 +72,16 @@ func (gce *GCECloud) ReserveRegionAddress(addr *compute.Address, region string)
return gce.waitForRegionOp(op, region, mc)
}
// ReserveAlphaRegionAddress creates an Alpha, regional address.
func (gce *GCECloud) ReserveAlphaRegionAddress(addr *computealpha.Address, region string) error {
mc := newAddressMetricContextWithVersion("reserve", region, computeAlphaVersion)
op, err := gce.serviceAlpha.Addresses.Insert(gce.projectID, region, addr).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, region, mc)
}
// DeleteRegionAddress deletes a region address by name.
func (gce *GCECloud) DeleteRegionAddress(name, region string) error {
mc := newAddressMetricContext("delete", region)
@ -85,3 +98,37 @@ func (gce *GCECloud) GetRegionAddress(name, region string) (*compute.Address, er
v, err := gce.service.Addresses.Get(gce.projectID, region, name).Do()
return v, mc.Observe(err)
}
// GetAlphaRegionAddress returns the Alpha, regional address by name.
func (gce *GCECloud) GetAlphaRegionAddress(name, region string) (*computealpha.Address, error) {
mc := newAddressMetricContextWithVersion("get", region, computeAlphaVersion)
v, err := gce.serviceAlpha.Addresses.Get(gce.projectID, region, name).Do()
return v, mc.Observe(err)
}
// GetRegionAddressByIP returns the regional address matching the given IP
// address.
func (gce *GCECloud) GetRegionAddressByIP(region, ipAddress string) (*compute.Address, error) {
mc := newAddressMetricContext("list", region)
addrs, err := gce.service.Addresses.List(gce.projectID, region).Filter("address eq " + ipAddress).Do()
// Record the metrics for the call.
mc.Observe(err)
if err != nil {
return nil, err
}
if len(addrs.Items) > 1 {
// We don't expect more than one match.
addrsToPrint := []compute.Address{}
for _, addr := range addrs.Items {
addrsToPrint = append(addrsToPrint, *addr)
}
glog.Errorf("More than one addresses matching the IP %q: %+v", ipAddress, addrsToPrint)
}
for _, addr := range addrs.Items {
if addr.Address == ipAddress {
return addr, nil
}
}
return nil, makeGoogleAPINotFoundError(fmt.Sprintf("Address with IP %q was not found in region %q", ipAddress, region))
}

View file

@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
)
@ -34,6 +35,9 @@ type FakeCloudAddressService struct {
addrsByRegionAndName map[string]map[string]*compute.Address
}
// FakeCloudAddressService Implements CloudAddressService
var _ CloudAddressService = &FakeCloudAddressService{}
func NewFakeCloudAddressService() *FakeCloudAddressService {
return &FakeCloudAddressService{
reservedAddrs: make(map[string]bool),
@ -41,6 +45,18 @@ func NewFakeCloudAddressService() *FakeCloudAddressService {
}
}
// SetRegionalAddresses populates the addresses of the region with the name to
// IP map.
func (cas *FakeCloudAddressService) SetRegionalAddresses(region string, ipList map[string]string) {
// Reset addresses in the region.
cas.addrsByRegionAndName[region] = make(map[string]*compute.Address)
for name, ip := range ipList {
cas.reservedAddrs[ip] = true
cas.addrsByRegionAndName[region][name] = &compute.Address{Name: name, Address: ip}
}
}
func (cas *FakeCloudAddressService) ReserveRegionAddress(addr *compute.Address, region string) error {
if addr.Address == "" {
addr.Address = fmt.Sprintf("1.2.3.%d", cas.count)
@ -66,12 +82,33 @@ func (cas *FakeCloudAddressService) ReserveRegionAddress(addr *compute.Address,
func (cas *FakeCloudAddressService) GetRegionAddress(name, region string) (*compute.Address, error) {
if _, exists := cas.addrsByRegionAndName[region]; !exists {
return nil, &googleapi.Error{Code: http.StatusNotFound}
return nil, makeGoogleAPINotFoundError("")
}
if addr, exists := cas.addrsByRegionAndName[region][name]; !exists {
return nil, &googleapi.Error{Code: http.StatusNotFound}
return nil, makeGoogleAPINotFoundError("")
} else {
return addr, nil
}
}
func (cas *FakeCloudAddressService) GetRegionAddressByIP(region, ipAddress string) (*compute.Address, error) {
if _, exists := cas.addrsByRegionAndName[region]; !exists {
return nil, makeGoogleAPINotFoundError("")
}
for _, addr := range cas.addrsByRegionAndName[region] {
if addr.Address == ipAddress {
return addr, nil
}
}
return nil, makeGoogleAPINotFoundError("")
}
func (cas *FakeCloudAddressService) GetAlphaRegionAddress(name, region string) (*computealpha.Address, error) {
return nil, fmt.Errorf("not implemented")
}
func (cas *FakeCloudAddressService) ReserveAlphaRegionAddress(addr *computealpha.Address, region string) error {
return fmt.Errorf("not implemented")
}

View file

@ -0,0 +1,61 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"fmt"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
)
const (
AlphaFeatureNetworkEndpointGroup = "NetworkEndpointGroup"
notEnabledErrorTemplate = "alpha feature %q is not enabled."
)
// All known alpha features
var knownAlphaFeatures = map[string]bool{
AlphaFeatureNetworkEndpointGroup: true,
}
type AlphaFeatureGate struct {
features map[string]bool
}
func (af *AlphaFeatureGate) Enabled(key string) bool {
return af.features[key]
}
func NewAlphaFeatureGate(features []string) (*AlphaFeatureGate, error) {
errList := []error{}
featureMap := make(map[string]bool)
for _, name := range features {
if _, ok := knownAlphaFeatures[name]; !ok {
errList = append(errList, fmt.Errorf("alpha feature %q is not supported.", name))
} else {
featureMap[name] = true
}
}
return &AlphaFeatureGate{featureMap}, utilerrors.NewAggregate(errList)
}
func (gce *GCECloud) alphaFeatureEnabled(feature string) error {
if !gce.AlphaFeatureGate.Enabled(feature) {
return fmt.Errorf(notEnabledErrorTemplate, feature)
}
return nil
}

View file

@ -16,7 +16,10 @@ limitations under the License.
package gce
import "k8s.io/api/core/v1"
import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
)
type LoadBalancerType string
@ -26,12 +29,16 @@ const (
// Currently, only "internal" is supported.
ServiceAnnotationLoadBalancerType = "cloud.google.com/load-balancer-type"
LBTypeInternal LoadBalancerType = "internal"
LBTypeInternal LoadBalancerType = "Internal"
// Deprecating the lowercase spelling of Internal.
deprecatedTypeInternalLowerCase LoadBalancerType = "internal"
// ServiceAnnotationInternalBackendShare is annotated on a service with "true" when users
// want to share GCP Backend Services for a set of internal load balancers.
// ALPHA feature - this may be removed in a future release.
ServiceAnnotationILBBackendShare = "cloud.google.com/load-balancer-backend-share"
ServiceAnnotationILBBackendShare = "alpha.cloud.google.com/load-balancer-backend-share"
// This annotation did not correctly specify "alpha", so both annotations will be checked.
deprecatedServiceAnnotationILBBackendShare = "cloud.google.com/load-balancer-backend-share"
)
// GetLoadBalancerAnnotationType returns the type of GCP load balancer which should be assembled.
@ -48,8 +55,8 @@ func GetLoadBalancerAnnotationType(service *v1.Service) (LoadBalancerType, bool)
}
switch v {
case LBTypeInternal:
return v, true
case LBTypeInternal, deprecatedTypeInternalLowerCase:
return LBTypeInternal, true
default:
return v, false
}
@ -58,8 +65,13 @@ func GetLoadBalancerAnnotationType(service *v1.Service) (LoadBalancerType, bool)
// GetLoadBalancerAnnotationBackendShare returns whether this service's backend service should be
// shared with other load balancers. Health checks and the healthcheck firewall will be shared regardless.
func GetLoadBalancerAnnotationBackendShare(service *v1.Service) bool {
l, exists := service.Annotations[ServiceAnnotationILBBackendShare]
if exists && l == "true" {
if l, exists := service.Annotations[ServiceAnnotationILBBackendShare]; exists && l == "true" {
return true
}
// Check for deprecated annotation key
if l, exists := service.Annotations[deprecatedServiceAnnotationILBBackendShare]; exists && l == "true" {
glog.Warningf("Annotation %q is deprecated and replaced with an alpha-specific key: %q", deprecatedServiceAnnotationILBBackendShare, ServiceAnnotationILBBackendShare)
return true
}

View file

@ -18,16 +18,17 @@ package gce
import (
"net/http"
"time"
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
)
func newBackendServiceMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"backendservice_" + request, region, unusedMetricLabel},
}
return newBackendServiceMetricContextWithVersion(request, region, computeV1Version)
}
func newBackendServiceMetricContextWithVersion(request, region, version string) *metricContext {
return newGenericMetricContext("backendservice", request, region, unusedMetricLabel, version)
}
// GetGlobalBackendService retrieves a backend by name.
@ -37,6 +38,13 @@ func (gce *GCECloud) GetGlobalBackendService(name string) (*compute.BackendServi
return v, mc.Observe(err)
}
// GetAlphaGlobalBackendService retrieves alpha backend by name.
func (gce *GCECloud) GetAlphaGlobalBackendService(name string) (*computealpha.BackendService, error) {
mc := newBackendServiceMetricContextWithVersion("get", "", computeAlphaVersion)
v, err := gce.serviceAlpha.BackendServices.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// UpdateGlobalBackendService applies the given BackendService as an update to an existing service.
func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) error {
mc := newBackendServiceMetricContext("update", "")
@ -48,6 +56,17 @@ func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) erro
return gce.waitForGlobalOp(op, mc)
}
// UpdateAlphaGlobalBackendService applies the given alpha BackendService as an update to an existing service.
func (gce *GCECloud) UpdateAlphaGlobalBackendService(bg *computealpha.BackendService) error {
mc := newBackendServiceMetricContextWithVersion("alpha_update", "", computeAlphaVersion)
op, err := gce.serviceAlpha.BackendServices.Update(gce.projectID, bg.Name, bg).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForGlobalOp(op, mc)
}
// DeleteGlobalBackendService deletes the given BackendService by name.
func (gce *GCECloud) DeleteGlobalBackendService(name string) error {
mc := newBackendServiceMetricContext("delete", "")
@ -73,6 +92,17 @@ func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) erro
return gce.waitForGlobalOp(op, mc)
}
// CreateAlphaGlobalBackendService creates the given alpha BackendService.
func (gce *GCECloud) CreateAlphaGlobalBackendService(bg *computealpha.BackendService) error {
mc := newBackendServiceMetricContextWithVersion("alpha_create", "", computeAlphaVersion)
op, err := gce.serviceAlpha.BackendServices.Insert(gce.projectID, bg).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForGlobalOp(op, mc)
}
// ListGlobalBackendServices lists all backend services in the project.
func (gce *GCECloud) ListGlobalBackendServices() (*compute.BackendServiceList, error) {
mc := newBackendServiceMetricContext("list", "")

View file

@ -18,16 +18,12 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
func newCertMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"cert_" + request, unusedMetricLabel, unusedMetricLabel},
}
return newGenericMetricContext("cert", request, unusedMetricLabel, unusedMetricLabel, computeV1Version)
}
// GetSslCertificate returns the SslCertificate by name.

View file

@ -16,13 +16,8 @@ limitations under the License.
package gce
import "time"
func newClustersMetricContext(request, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"clusters_" + request, unusedMetricLabel, zone},
}
return newGenericMetricContext("clusters", request, unusedMetricLabel, zone, computeV1Version)
}
func (gce *GCECloud) ListClusters() ([]string, error) {

View file

@ -21,7 +21,6 @@ import (
"fmt"
"net/http"
"strings"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
@ -85,10 +84,7 @@ type GCEDisk struct {
}
func newDiskMetricContext(request, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"disk_" + request, unusedMetricLabel, zone},
}
return newGenericMetricContext("disk", request, unusedMetricLabel, zone, computeV1Version)
}
func (gce *GCECloud) AttachDisk(diskName string, nodeName types.NodeName, readOnly bool) error {

View file

@ -17,16 +17,11 @@ limitations under the License.
package gce
import (
"time"
compute "google.golang.org/api/compute/v1"
)
func newFirewallMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"firewall_" + request, unusedMetricLabel, unusedMetricLabel},
}
return newGenericMetricContext("firewall", request, unusedMetricLabel, unusedMetricLabel, computeV1Version)
}
// GetFirewall returns the Firewall by name.

View file

@ -17,16 +17,15 @@ limitations under the License.
package gce
import (
"time"
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
)
func newForwardingRuleMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"forwardingrule_" + request, region, unusedMetricLabel},
}
return newForwardingRuleMetricContextWithVersion(request, region, computeV1Version)
}
func newForwardingRuleMetricContextWithVersion(request, region, version string) *metricContext {
return newGenericMetricContext("forwardingrule", request, region, unusedMetricLabel, version)
}
// CreateGlobalForwardingRule creates the passed GlobalForwardingRule
@ -85,6 +84,13 @@ func (gce *GCECloud) GetRegionForwardingRule(name, region string) (*compute.Forw
return v, mc.Observe(err)
}
// GetAlphaRegionForwardingRule returns the Alpha forwarding rule by name & region.
func (gce *GCECloud) GetAlphaRegionForwardingRule(name, region string) (*computealpha.ForwardingRule, error) {
mc := newForwardingRuleMetricContextWithVersion("get", region, computeAlphaVersion)
v, err := gce.serviceAlpha.ForwardingRules.Get(gce.projectID, region, name).Do()
return v, mc.Observe(err)
}
// ListRegionForwardingRules lists all RegionalForwardingRules in the project & region.
func (gce *GCECloud) ListRegionForwardingRules(region string) (*compute.ForwardingRuleList, error) {
mc := newForwardingRuleMetricContext("list", region)
@ -105,6 +111,18 @@ func (gce *GCECloud) CreateRegionForwardingRule(rule *compute.ForwardingRule, re
return gce.waitForRegionOp(op, region, mc)
}
// CreateAlphaRegionForwardingRule creates and returns an Alpha
// forwarding fule in the given region.
func (gce *GCECloud) CreateAlphaRegionForwardingRule(rule *computealpha.ForwardingRule, region string) error {
mc := newForwardingRuleMetricContextWithVersion("create", region, computeAlphaVersion)
op, err := gce.serviceAlpha.ForwardingRules.Insert(gce.projectID, region, rule).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForRegionOp(op, region, mc)
}
// DeleteRegionForwardingRule deletes the RegionalForwardingRule by name & region.
func (gce *GCECloud) DeleteRegionForwardingRule(name, region string) error {
mc := newForwardingRuleMetricContext("delete", region)

View file

@ -17,13 +17,12 @@ limitations under the License.
package gce
import (
"time"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/master/ports"
utilversion "k8s.io/kubernetes/pkg/util/version"
"github.com/golang/glog"
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
)
@ -45,10 +44,11 @@ func init() {
}
func newHealthcheckMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"healthcheck_" + request, unusedMetricLabel, unusedMetricLabel},
}
return newHealthcheckMetricContextWithVersion(request, computeV1Version)
}
func newHealthcheckMetricContextWithVersion(request, version string) *metricContext {
return newGenericMetricContext("healthcheck", request, unusedMetricLabel, unusedMetricLabel, version)
}
// GetHttpHealthCheck returns the given HttpHealthCheck by name.
@ -160,6 +160,13 @@ func (gce *GCECloud) GetHealthCheck(name string) (*compute.HealthCheck, error) {
return v, mc.Observe(err)
}
// GetAlphaHealthCheck returns the given alpha HealthCheck by name.
func (gce *GCECloud) GetAlphaHealthCheck(name string) (*computealpha.HealthCheck, error) {
mc := newHealthcheckMetricContextWithVersion("alpha_get", computeAlphaVersion)
v, err := gce.serviceAlpha.HealthChecks.Get(gce.projectID, name).Do()
return v, mc.Observe(err)
}
// UpdateHealthCheck applies the given HealthCheck as an update.
func (gce *GCECloud) UpdateHealthCheck(hc *compute.HealthCheck) error {
mc := newHealthcheckMetricContext("update")
@ -171,6 +178,17 @@ func (gce *GCECloud) UpdateHealthCheck(hc *compute.HealthCheck) error {
return gce.waitForGlobalOp(op, mc)
}
// UpdateAlphaHealthCheck applies the given alpha HealthCheck as an update.
func (gce *GCECloud) UpdateAlphaHealthCheck(hc *computealpha.HealthCheck) error {
mc := newHealthcheckMetricContextWithVersion("alpha_update", computeAlphaVersion)
op, err := gce.serviceAlpha.HealthChecks.Update(gce.projectID, hc.Name, hc).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForGlobalOp(op, mc)
}
// DeleteHealthCheck deletes the given HealthCheck by name.
func (gce *GCECloud) DeleteHealthCheck(name string) error {
mc := newHealthcheckMetricContext("delete")
@ -193,6 +211,17 @@ func (gce *GCECloud) CreateHealthCheck(hc *compute.HealthCheck) error {
return gce.waitForGlobalOp(op, mc)
}
// CreateAlphaHealthCheck creates the given alpha HealthCheck.
func (gce *GCECloud) CreateAlphaHealthCheck(hc *computealpha.HealthCheck) error {
mc := newHealthcheckMetricContextWithVersion("alpha_create", computeAlphaVersion)
op, err := gce.serviceAlpha.HealthChecks.Insert(gce.projectID, hc).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForGlobalOp(op, mc)
}
// ListHealthChecks lists all HealthCheck in the project.
func (gce *GCECloud) ListHealthChecks() (*compute.HealthCheckList, error) {
mc := newHealthcheckMetricContext("list")

View file

@ -16,17 +16,10 @@ limitations under the License.
package gce
import (
"time"
compute "google.golang.org/api/compute/v1"
)
import compute "google.golang.org/api/compute/v1"
func newInstanceGroupMetricContext(request string, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"instancegroup_" + request, unusedMetricLabel, zone},
}
return newGenericMetricContext("instancegroup", request, unusedMetricLabel, zone, computeV1Version)
}
// CreateInstanceGroup creates an instance group with the given

View file

@ -41,10 +41,7 @@ const (
)
func newInstancesMetricContext(request, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"instances_" + request, unusedMetricLabel, zone},
}
return newGenericMetricContext("instances", request, unusedMetricLabel, zone, computeV1Version)
}
func splitNodesByZone(nodes []*v1.Node) map[string][]*v1.Node {

View file

@ -16,12 +16,20 @@ limitations under the License.
package gce
import compute "google.golang.org/api/compute/v1"
import (
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
)
// CloudAddressService is an interface for managing addresses
type CloudAddressService interface {
ReserveRegionAddress(*compute.Address, string) error
GetRegionAddress(string, string) (*compute.Address, error)
GetRegionAddressByIP(region, ipAddress string) (*compute.Address, error)
// TODO: Mock `DeleteRegionAddress(name, region string) endpoint
// TODO: Mock Global endpoints
// Alpha API.
GetAlphaRegionAddress(name, region string) (*computealpha.Address, error)
ReserveAlphaRegionAddress(addr *computealpha.Address, region string) error
}

View file

@ -21,7 +21,6 @@ import (
"fmt"
"net"
"strings"
"time"
"github.com/golang/glog"
@ -40,10 +39,7 @@ var (
)
func newLoadBalancerMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"loadbalancer_" + request, region, unusedMetricLabel},
}
return newGenericMetricContext("loadbalancer", request, region, unusedMetricLabel, computeV1Version)
}
type lbScheme string

View file

@ -55,7 +55,7 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
}
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
loadBalancerIP := apiService.Spec.LoadBalancerIP
requestedIP := apiService.Spec.LoadBalancerIP
ports := apiService.Spec.Ports
portStr := []string{}
for _, p := range apiService.Spec.Ports {
@ -66,10 +66,10 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)",
loadBalancerName, gce.region, loadBalancerIP, portStr, hostNames, serviceName, apiService.Annotations)
loadBalancerName, gce.region, requestedIP, portStr, hostNames, serviceName, apiService.Annotations)
// Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports)
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, requestedIP, ports)
if err != nil {
return nil, err
}
@ -93,7 +93,7 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
// forwarding rule creation as the last thing that needs to be done in this
// function in order to maintain the invariant that "if the forwarding rule
// exists, the LB has been fully created".
ipAddress := ""
ipAddressToUse := ""
// Through this process we try to keep track of whether it is safe to
// release the IP that was allocated. If the user specifically asked for
@ -110,75 +110,42 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
}
if isSafeToReleaseIP {
if err := gce.DeleteRegionAddress(loadBalancerName, gce.region); err != nil && !isNotFound(err) {
glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
glog.Errorf("Failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddressToUse, loadBalancerName, serviceName, gce.region, err)
} else if isNotFound(err) {
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): address %s is not reserved.", loadBalancerName, serviceName, ipAddress)
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): address %s is not reserved.", loadBalancerName, serviceName, ipAddressToUse)
} else {
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress)
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddressToUse)
}
} else {
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddressToUse, loadBalancerName, serviceName, gce.region, err)
}
}()
if loadBalancerIP != "" {
// If a specific IP address has been requested, we have to respect the
// user's request and use that IP. If the forwarding rule was already using
// a different IP, it will be harmlessly abandoned because it was only an
// ephemeral IP (or it was a different static IP owned by the user, in which
// case we shouldn't delete it anyway).
if isStatic, err := gce.projectOwnsStaticIP(loadBalancerName, gce.region, loadBalancerIP); err != nil {
return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", loadBalancerIP, err)
} else if isStatic {
// The requested IP is a static IP, owned and managed by the user.
isUserOwnedIP = true
isSafeToReleaseIP = false
ipAddress = loadBalancerIP
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", loadBalancerName, serviceName, ipAddress)
} else if loadBalancerIP == fwdRuleIP {
// The requested IP is not a static IP, but is currently assigned
// to this forwarding rule, so we can keep it.
isUserOwnedIP = false
isSafeToReleaseIP = true
ipAddress, _, err = ensureStaticIP(gce, loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
// The requested IP is not static and it is not assigned to the
// current forwarding rule. It might be attached to a different
// rule or it might not be part of this project at all. Either
// way, we can't use it.
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", loadBalancerIP, loadBalancerName, serviceName, err)
}
} else {
// The user did not request a specific IP.
isUserOwnedIP = false
// This will either allocate a new static IP if the forwarding rule didn't
// already have an IP, or it will promote the forwarding rule's current
// IP from ephemeral to static, or it will just get the IP if it is
// already static.
existed := false
ipAddress, existed, err = ensureStaticIP(gce, loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
lbRefStr := fmt.Sprintf("%v(%v)", loadBalancerName, serviceName)
if requestedIP != "" {
// If user requests a specific IP address, verify first. No mutation to
// the GCE resources will be performed in the verification process.
isUserOwnedIP, err = verifyUserRequestedIP(gce, gce.region, requestedIP, fwdRuleIP, lbRefStr)
if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
return nil, err
}
if existed {
// If the IP was not specifically requested by the user, but it
// already existed, it seems to be a failed update cycle. We can
// use this IP and try to run through the process again, but we
// should not release the IP unless it is explicitly flagged as OK.
isSafeToReleaseIP = false
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
// For total clarity. The IP did not pre-exist and the user did
// not ask for a particular one, so we can release the IP in case
// of failure or success.
isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", loadBalancerName, serviceName, ipAddress)
ipAddressToUse = requestedIP
}
if !isUserOwnedIP {
// If we are not using the user-owned IP, either promote the
// emphemeral IP used by the fwd rule, or create a new static IP.
ipAddr, existed, err := ensureStaticIP(gce, loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil {
return nil, fmt.Errorf("failed to ensure a static IP for the LB: %v", err)
}
glog.V(4).Infof("EnsureLoadBalancer(%s): ensured IP address %s", lbRefStr, ipAddr)
// If the IP was not owned by the user, but it already existed, it
// could indicate that the previous update cycle failed. We can use
// this IP and try to run through the process again, but we should
// not release the IP unless it is explicitly flagged as OK.
isSafeToReleaseIP = !existed
ipAddressToUse = ipAddr
}
// Deal with the firewall next. The reason we do this here rather than last
@ -190,13 +157,13 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
return nil, err
}
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports, sourceRanges)
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(loadBalancerName, serviceName.String(), gce.region, ipAddressToUse, ports, sourceRanges)
if err != nil {
return nil, err
}
if firewallNeedsUpdate {
desc := makeFirewallDescription(serviceName.String(), ipAddress)
desc := makeFirewallDescription(serviceName.String(), ipAddressToUse)
// Unlike forwarding rules and target pools, firewalls can be updated
// without needing to be deleted and recreated.
if firewallExists {
@ -293,7 +260,7 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
createInstances = createInstances[:maxTargetPoolCreateInstances]
}
// Pass healthchecks to createTargetPool which needs them as health check links in the target pool
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), ipAddress, gce.region, clusterID, createInstances, affinityType, hcToCreate); err != nil {
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), ipAddressToUse, gce.region, clusterID, createInstances, affinityType, hcToCreate); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
}
if hcToCreate != nil {
@ -315,8 +282,8 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
}
}
if tpNeedsUpdate || fwdRuleNeedsUpdate {
glog.Infof("EnsureLoadBalancer(%v(%v)): creating forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress)
if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports); err != nil {
glog.Infof("EnsureLoadBalancer(%v(%v)): creating forwarding rule, IP %s", loadBalancerName, serviceName, ipAddressToUse)
if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddressToUse, ports); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", loadBalancerName, err)
}
// End critical section. It is safe to release the static IP (which
@ -324,11 +291,11 @@ func (gce *GCECloud) ensureExternalLoadBalancer(clusterName, clusterID string, a
// of a user-requested IP, the "is user-owned" flag will be set,
// preventing it from actually being released.
isSafeToReleaseIP = true
glog.Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress)
glog.Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", loadBalancerName, serviceName, ipAddressToUse)
}
status := &v1.LoadBalancerStatus{}
status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddress}}
status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddressToUse}}
return status, nil
}
@ -456,6 +423,42 @@ func (gce *GCECloud) DeleteExternalTargetPoolAndChecks(name, region, clusterID s
return nil
}
// verifyUserRequestedIP checks the user-provided IP to see whether it can be
// used for the LB. It also returns whether the IP is considered owned by the
// user.
func verifyUserRequestedIP(s CloudAddressService, region, requestedIP, fwdRuleIP, lbRef string) (isUserOwnedIP bool, err error) {
if requestedIP == "" {
return false, nil
}
// If a specific IP address has been requested, we have to respect the
// user's request and use that IP. If the forwarding rule was already using
// a different IP, it will be harmlessly abandoned because it was only an
// ephemeral IP (or it was a different static IP owned by the user, in which
// case we shouldn't delete it anyway).
existingAddress, err := s.GetRegionAddressByIP(region, requestedIP)
if err != nil && !isNotFound(err) {
glog.Errorf("verifyUserRequestedIP: failed to check whether the requested IP %q for LB %s exists: %v", requestedIP, lbRef, err)
return false, err
}
if err == nil {
// The requested IP is a static IP, owned and managed by the user.
glog.V(4).Infof("verifyUserRequestedIP: the requested static IP %q (name: %s) for LB %s exists.", requestedIP, existingAddress.Name, lbRef)
return true, nil
}
if requestedIP == fwdRuleIP {
// The requested IP is not a static IP, but is currently assigned
// to this forwarding rule, so we can just use it.
glog.V(4).Infof("verifyUserRequestedIP: the requested IP %q is not static, but is currently in use by for LB %s", requestedIP, lbRef)
return false, nil
}
// The requested IP is not static and it is not assigned to the
// current forwarding rule. It might be attached to a different
// rule or it might not be part of this project at all. Either
// way, we can't use it.
glog.Errorf("verifyUserRequestedIP: requested IP %q for LB %s is neither static nor assigned to the LB", requestedIP, lbRef)
return false, fmt.Errorf("requested ip %q is neither static nor assigned to the LB", requestedIP)
}
func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error {
// health check management is coupled with targetPools to prevent leaks. A
// target pool is the only thing that requires a health check, so we delete
@ -789,7 +792,7 @@ func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, regio
if fw.Description != desc ||
len(fw.Allowed) != 1 ||
fw.Allowed[0].IPProtocol != string(ports[0].Protocol) ||
!equalStringSets(fw.Allowed[0].Ports, []string{string(ports[0].Port)}) ||
!equalStringSets(fw.Allowed[0].Ports, []string{strconv.Itoa(int(ports[0].Port))}) ||
!equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) {
glog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName)
if err := gce.updateFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil {
@ -880,32 +883,6 @@ func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges nets
return firewall, nil
}
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Addresses.List(gce.projectID, region)
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
addresses, err := listCall.Do()
if err != nil {
return false, fmt.Errorf("failed to list gce IP addresses: %v", err)
}
pageToken = addresses.NextPageToken
for _, addr := range addresses.Items {
if addr.Address == ipAddress {
// This project does own the address, so return success.
return true, nil
}
}
}
if page >= maxPages {
glog.Errorf("projectOwnsStaticIP exceeded maxPages=%d for Addresses.List; truncating.", maxPages)
}
return false, nil
}
func ensureStaticIP(s CloudAddressService, name, serviceName, region, existingIP string) (ipAddress string, existing bool, err error) {
// If the address doesn't exist, this will create it.
// If the existingIP exists but is ephemeral, this will promote it to static.

View file

@ -0,0 +1,203 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"github.com/golang/glog"
computealpha "google.golang.org/api/compute/v0.alpha"
"strings"
)
const (
NEGLoadBalancerType = "LOAD_BALANCING"
NEGIPPortNetworkEndpointType = "GCE_VM_IP_PORT"
)
func newNetworkEndpointGroupMetricContext(request string, zone string) *metricContext {
return newGenericMetricContext("networkendpointgroup_", request, unusedMetricLabel, zone, computeAlphaVersion)
}
func (gce *GCECloud) GetNetworkEndpointGroup(name string, zone string) (*computealpha.NetworkEndpointGroup, error) {
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return nil, err
}
mc := newNetworkEndpointGroupMetricContext("get", zone)
v, err := gce.serviceAlpha.NetworkEndpointGroups.Get(gce.GetProjectID(), zone, name).Do()
return v, mc.Observe(err)
}
func (gce *GCECloud) ListNetworkEndpointGroup(zone string) ([]*computealpha.NetworkEndpointGroup, error) {
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return nil, err
}
mc := newNetworkEndpointGroupMetricContext("list", zone)
networkEndpointGroups := []*computealpha.NetworkEndpointGroup{}
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.serviceAlpha.NetworkEndpointGroups.List(gce.GetProjectID(), zone)
if pageToken != "" {
listCall.PageToken(pageToken)
}
res, err := listCall.Do()
mc.Observe(err)
if err != nil {
glog.Errorf("Error listing network endpoint group from GCE: %v", err)
return nil, err
}
pageToken = res.NextPageToken
networkEndpointGroups = append(networkEndpointGroups, res.Items...)
if page >= maxPages {
glog.Errorf("ListNetworkEndpointGroup exceeded maxPages=%d: truncating.", maxPages)
}
}
return networkEndpointGroups, nil
}
func (gce *GCECloud) AggregatedListNetworkEndpointGroup() (map[string][]*computealpha.NetworkEndpointGroup, error) {
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return nil, err
}
mc := newNetworkEndpointGroupMetricContext("aggregated_list", "")
zoneNetworkEndpointGroupMap := map[string][]*computealpha.NetworkEndpointGroup{}
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.serviceAlpha.NetworkEndpointGroups.AggregatedList(gce.GetProjectID())
if pageToken != "" {
listCall.PageToken(pageToken)
}
res, err := listCall.Do()
mc.Observe(err)
if err != nil {
glog.Errorf("Error listing network endpoint group from GCE: %v", err)
return nil, err
}
pageToken = res.NextPageToken
for key, negs := range res.Items {
if len(negs.NetworkEndpointGroups) == 0 {
continue
}
// key has the format of "zones/${zone_name}"
zone := strings.Split(key, "/")[1]
if _, ok := zoneNetworkEndpointGroupMap[zone]; !ok {
zoneNetworkEndpointGroupMap[zone] = []*computealpha.NetworkEndpointGroup{}
}
zoneNetworkEndpointGroupMap[zone] = append(zoneNetworkEndpointGroupMap[zone], negs.NetworkEndpointGroups...)
}
if page >= maxPages {
glog.Errorf("ListNetworkEndpointGroup exceeded maxPages=%d: truncating.", maxPages)
}
}
return zoneNetworkEndpointGroupMap, nil
}
func (gce *GCECloud) CreateNetworkEndpointGroup(neg *computealpha.NetworkEndpointGroup, zone string) error {
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return err
}
mc := newNetworkEndpointGroupMetricContext("create", zone)
op, err := gce.serviceAlpha.NetworkEndpointGroups.Insert(gce.GetProjectID(), zone, neg).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForZoneOp(op, zone, mc)
}
func (gce *GCECloud) DeleteNetworkEndpointGroup(name string, zone string) error {
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return err
}
mc := newNetworkEndpointGroupMetricContext("delete", zone)
op, err := gce.serviceAlpha.NetworkEndpointGroups.Delete(gce.GetProjectID(), zone, name).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForZoneOp(op, zone, mc)
}
func (gce *GCECloud) AttachNetworkEndpoints(name, zone string, endpoints []*computealpha.NetworkEndpoint) error {
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return err
}
mc := newNetworkEndpointGroupMetricContext("attach", zone)
op, err := gce.serviceAlpha.NetworkEndpointGroups.AttachNetworkEndpoints(gce.GetProjectID(), zone, name, &computealpha.NetworkEndpointGroupsAttachEndpointsRequest{
NetworkEndpoints: endpoints,
}).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForZoneOp(op, zone, mc)
}
func (gce *GCECloud) DetachNetworkEndpoints(name, zone string, endpoints []*computealpha.NetworkEndpoint) error {
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return err
}
mc := newNetworkEndpointGroupMetricContext("detach", zone)
op, err := gce.serviceAlpha.NetworkEndpointGroups.DetachNetworkEndpoints(gce.GetProjectID(), zone, name, &computealpha.NetworkEndpointGroupsDetachEndpointsRequest{
NetworkEndpoints: endpoints,
}).Do()
if err != nil {
return mc.Observe(err)
}
return gce.waitForZoneOp(op, zone, mc)
}
func (gce *GCECloud) ListNetworkEndpoints(name, zone string, showHealthStatus bool) ([]*computealpha.NetworkEndpointWithHealthStatus, error) {
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return nil, err
}
healthStatus := "SKIP"
if showHealthStatus {
healthStatus = "SHOW"
}
mc := newNetworkEndpointGroupMetricContext("list_networkendpoints", zone)
networkEndpoints := []*computealpha.NetworkEndpointWithHealthStatus{}
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.serviceAlpha.NetworkEndpointGroups.ListNetworkEndpoints(gce.GetProjectID(), zone, name, &computealpha.NetworkEndpointGroupsListEndpointsRequest{
HealthStatus: healthStatus,
})
if pageToken != "" {
listCall.PageToken(pageToken)
}
res, err := listCall.Do()
mc.Observe(err)
if err != nil {
return nil, err
}
pageToken = res.NextPageToken
networkEndpoints = append(networkEndpoints, res.Items...)
if page >= maxPages {
glog.Errorf("ListNetworkEndpoints exceeded maxPages=%d: truncating.", maxPages)
}
}
return networkEndpoints, nil
}

View file

@ -17,17 +17,20 @@ limitations under the License.
package gce
import (
"encoding/json"
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
computealpha "google.golang.org/api/compute/v0.alpha"
computebeta "google.golang.org/api/compute/v0.beta"
computev1 "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
)
func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error), mc *metricContext) error {
func (gce *GCECloud) waitForOp(op *computev1.Operation, getOperation func(operationName string) (*computev1.Operation, error), mc *metricContext) error {
if op == nil {
return mc.Observe(fmt.Errorf("operation must not be nil"))
}
@ -72,11 +75,11 @@ func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operatio
})
}
func opIsDone(op *compute.Operation) bool {
func opIsDone(op *computev1.Operation) bool {
return op != nil && op.Status == "DONE"
}
func getErrorFromOp(op *compute.Operation) error {
func getErrorFromOp(op *computev1.Operation) error {
if op != nil && op.Error != nil && len(op.Error.Errors) > 0 {
err := &googleapi.Error{
Code: int(op.HttpErrorStatusCode),
@ -89,20 +92,77 @@ func getErrorFromOp(op *compute.Operation) error {
return nil
}
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation, mc *metricContext) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.GlobalOperations.Get(gce.projectID, operationName).Do()
}, mc)
func (gce *GCECloud) waitForGlobalOp(op gceObject, mc *metricContext) error {
switch v := op.(type) {
case *computealpha.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceAlpha.GlobalOperations.Get(gce.projectID, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computebeta.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceBeta.GlobalOperations.Get(gce.projectID, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computev1.Operation:
return gce.waitForOp(op.(*computev1.Operation), func(operationName string) (*computev1.Operation, error) {
return gce.service.GlobalOperations.Get(gce.projectID, operationName).Do()
}, mc)
default:
return fmt.Errorf("unexpected type: %T", v)
}
}
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string, mc *metricContext) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.RegionOperations.Get(gce.projectID, region, operationName).Do()
}, mc)
func (gce *GCECloud) waitForRegionOp(op gceObject, region string, mc *metricContext) error {
switch v := op.(type) {
case *computealpha.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceAlpha.RegionOperations.Get(gce.projectID, region, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computebeta.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceBeta.RegionOperations.Get(gce.projectID, region, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computev1.Operation:
return gce.waitForOp(op.(*computev1.Operation), func(operationName string) (*computev1.Operation, error) {
return gce.service.RegionOperations.Get(gce.projectID, region, operationName).Do()
}, mc)
default:
return fmt.Errorf("unexpected type: %T", v)
}
}
func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string, mc *metricContext) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.ZoneOperations.Get(gce.projectID, zone, operationName).Do()
}, mc)
func (gce *GCECloud) waitForZoneOp(op gceObject, zone string, mc *metricContext) error {
switch v := op.(type) {
case *computealpha.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceAlpha.ZoneOperations.Get(gce.projectID, zone, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computebeta.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceBeta.ZoneOperations.Get(gce.projectID, zone, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computev1.Operation:
return gce.waitForOp(op.(*computev1.Operation), func(operationName string) (*computev1.Operation, error) {
return gce.service.ZoneOperations.Get(gce.projectID, zone, operationName).Do()
}, mc)
default:
return fmt.Errorf("unexpected type: %T", v)
}
}
func convertToV1Operation(object gceObject) *computev1.Operation {
enc, err := object.MarshalJSON()
if err != nil {
panic(fmt.Sprintf("Failed to encode to json: %v", err))
}
var op computev1.Operation
if err := json.Unmarshal(enc, &op); err != nil {
panic(fmt.Sprintf("Failed to convert GCE apiObject %v to v1 operation: %v", object, err))
}
return &op
}

View file

@ -20,8 +20,6 @@ import (
"fmt"
"net/http"
"path"
"strings"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
@ -31,10 +29,7 @@ import (
)
func newRoutesMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"routes_" + request, unusedMetricLabel, unusedMetricLabel},
}
return newGenericMetricContext("routes", request, unusedMetricLabel, unusedMetricLabel, computeV1Version)
}
func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
@ -46,7 +41,12 @@ func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, err
listCall := gce.service.Routes.List(gce.projectID)
prefix := truncateClusterName(clusterName)
listCall = listCall.Filter("name eq " + prefix + "-.*")
// Filter for routes starting with clustername AND belonging to the
// relevant gcp network AND having description = "k8s-node-route".
filter := "(name eq " + prefix + "-.*) "
filter = filter + "(network eq " + gce.networkURL + ") "
filter = filter + "(description eq " + k8sNodeRouteTag + ")"
listCall = listCall.Filter(filter)
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
@ -58,18 +58,6 @@ func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, err
}
pageToken = res.NextPageToken
for _, r := range res.Items {
if r.Network != gce.networkURL {
continue
}
// Not managed if route description != "k8s-node-route"
if r.Description != k8sNodeRouteTag {
continue
}
// Not managed if route name doesn't start with <clusterName>
if !strings.HasPrefix(r.Name, prefix) {
continue
}
target := path.Base(r.NextHopInstance)
// TODO: Should we lastComponent(target) this?
targetNodeName := types.NodeName(target) // NodeName == Instance Name on GCE

View file

@ -16,17 +16,10 @@ limitations under the License.
package gce
import (
"time"
compute "google.golang.org/api/compute/v1"
)
import compute "google.golang.org/api/compute/v1"
func newTargetPoolMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"targetpool_" + request, region, unusedMetricLabel},
}
return newGenericMetricContext("targetpool", request, region, unusedMetricLabel, computeV1Version)
}
// GetTargetPool returns the TargetPool by name.

View file

@ -18,16 +18,12 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
func newTargetProxyMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"targetproxy_" + request, unusedMetricLabel, unusedMetricLabel},
}
return newGenericMetricContext("targetproxy", request, unusedMetricLabel, unusedMetricLabel, computeV1Version)
}
// GetTargetHttpProxy returns the UrlMap by name.

View file

@ -18,16 +18,12 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
func newUrlMapMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"urlmap_" + request, unusedMetricLabel, unusedMetricLabel},
}
return newGenericMetricContext("urlmap", request, unusedMetricLabel, unusedMetricLabel, computeV1Version)
}
// GetUrlMap returns the UrlMap by name.

View file

@ -149,3 +149,7 @@ func ignoreNotFound(err error) error {
func isNotFoundOrInUse(err error) bool {
return isNotFound(err) || isInUsedByError(err)
}
func makeGoogleAPINotFoundError(message string) error {
return &googleapi.Error{Code: http.StatusNotFound, Message: message}
}

View file

@ -18,7 +18,6 @@ package gce
import (
"fmt"
"time"
compute "google.golang.org/api/compute/v1"
@ -27,10 +26,7 @@ import (
)
func newZonesMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"zones_" + request, region, unusedMetricLabel},
}
return newGenericMetricContext("zones", request, region, unusedMetricLabel, computeV1Version)
}
// GetZone creates a cloudprovider.Zone of the current zone and region

View file

@ -17,26 +17,39 @@ limitations under the License.
package gce
import (
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
)
const (
// Version strings for recording metrics.
computeV1Version = "v1"
computeAlphaVersion = "alpha"
computeBetaVersion = "beta"
)
type apiCallMetrics struct {
latency *prometheus.HistogramVec
errors *prometheus.CounterVec
}
var (
apiMetrics = registerAPIMetrics(
metricLabels = []string{
"request", // API function that is begin invoked.
"region", // region (optional).
"zone", // zone (optional).
)
"version", // API version.
}
apiMetrics = registerAPIMetrics(metricLabels...)
)
type metricContext struct {
start time.Time
start time.Time
// The cardinalities of attributes and metricLabels (defined above) must
// match, or prometheus will panic.
attributes []string
}
@ -54,6 +67,19 @@ func (mc *metricContext) Observe(err error) error {
return err
}
func newGenericMetricContext(prefix, request, region, zone, version string) *metricContext {
if len(strings.TrimSpace(zone)) == 0 {
zone = unusedMetricLabel
}
if len(strings.TrimSpace(region)) == 0 {
region = unusedMetricLabel
}
return &metricContext{
start: time.Now(),
attributes: []string{prefix + "_" + request, region, zone, version},
}
}
// registerApiMetrics adds metrics definitions for a category of API calls.
func registerAPIMetrics(attributes ...string) *apiCallMetrics {
metrics := &apiCallMetrics{