Wait for operations instead of sleep
This commit is contained in:
parent
591f20f799
commit
d83a15e91f
1 changed files with 118 additions and 22 deletions
|
@ -9,10 +9,14 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
|
||||||
"golang.org/x/oauth2"
|
"golang.org/x/oauth2"
|
||||||
"golang.org/x/oauth2/google"
|
"golang.org/x/oauth2/google"
|
||||||
|
|
||||||
compute "google.golang.org/api/compute/v1"
|
compute "google.golang.org/api/compute/v1"
|
||||||
|
"google.golang.org/api/googleapi"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -33,6 +37,9 @@ const (
|
||||||
balancingModeRATE = "RATE"
|
balancingModeRATE = "RATE"
|
||||||
balancingModeUTIL = "UTILIZATION"
|
balancingModeUTIL = "UTILIZATION"
|
||||||
|
|
||||||
|
operationPollInterval = 3 * time.Second
|
||||||
|
// Creating Route in very large clusters, may take more than half an hour.
|
||||||
|
operationPollTimeoutDuration = time.Hour
|
||||||
version = 0.1
|
version = 0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -159,15 +166,17 @@ func updateMultipleBackends() {
|
||||||
NamedPorts: ig.NamedPorts,
|
NamedPorts: ig.NamedPorts,
|
||||||
}
|
}
|
||||||
fmt.Println("Creating", instanceGroupTemp, "zone:", zone)
|
fmt.Println("Creating", instanceGroupTemp, "zone:", zone)
|
||||||
_, err = s.InstanceGroups.Insert(projectID, zone, newIg).Do()
|
op, err := s.InstanceGroups.Insert(projectID, zone, newIg).Do()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = waitForZoneOp(op, zone); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sleep(10 * time.Second)
|
|
||||||
|
|
||||||
// Straddle both groups
|
// Straddle both groups
|
||||||
fmt.Println("Straddle both groups in backend services")
|
fmt.Println("Straddle both groups in backend services")
|
||||||
setBackendsTo(true, balancingModeInverse(targetBalancingMode), true, balancingModeInverse(targetBalancingMode))
|
setBackendsTo(true, balancingModeInverse(targetBalancingMode), true, balancingModeInverse(targetBalancingMode))
|
||||||
|
@ -175,36 +184,30 @@ func updateMultipleBackends() {
|
||||||
fmt.Println("Migrate instances to temporary group")
|
fmt.Println("Migrate instances to temporary group")
|
||||||
migrateInstances(instanceGroupName, instanceGroupTemp)
|
migrateInstances(instanceGroupName, instanceGroupTemp)
|
||||||
|
|
||||||
time.Sleep(20 * time.Second)
|
// Remove original backends to get rid of old balancing mode
|
||||||
|
|
||||||
// Remove original backends
|
|
||||||
fmt.Println("Remove original backends")
|
fmt.Println("Remove original backends")
|
||||||
setBackendsTo(false, "", true, balancingModeInverse(targetBalancingMode))
|
setBackendsTo(false, "", true, balancingModeInverse(targetBalancingMode))
|
||||||
|
|
||||||
sleep(1 * time.Minute)
|
// Straddle both groups (creates backend services to original groups with target mode)
|
||||||
|
|
||||||
// Straddle both groups (new balancing mode)
|
|
||||||
fmt.Println("Create backends pointing to original instance groups")
|
fmt.Println("Create backends pointing to original instance groups")
|
||||||
setBackendsTo(true, targetBalancingMode, true, balancingModeInverse(targetBalancingMode))
|
setBackendsTo(true, targetBalancingMode, true, balancingModeInverse(targetBalancingMode))
|
||||||
|
|
||||||
sleep(20 * time.Second)
|
|
||||||
|
|
||||||
fmt.Println("Migrate instances back to original groups")
|
fmt.Println("Migrate instances back to original groups")
|
||||||
migrateInstances(instanceGroupTemp, instanceGroupName)
|
migrateInstances(instanceGroupTemp, instanceGroupName)
|
||||||
|
|
||||||
sleep(20 * time.Second)
|
|
||||||
|
|
||||||
fmt.Println("Remove temporary backends")
|
fmt.Println("Remove temporary backends")
|
||||||
setBackendsTo(true, targetBalancingMode, false, "")
|
setBackendsTo(true, targetBalancingMode, false, "")
|
||||||
|
|
||||||
sleep(20 * time.Second)
|
|
||||||
|
|
||||||
fmt.Println("Delete temporary instance groups")
|
fmt.Println("Delete temporary instance groups")
|
||||||
for z := range igs {
|
for z := range igs {
|
||||||
_, err := s.InstanceGroups.Delete(projectID, z, instanceGroupTemp).Do()
|
op, err := s.InstanceGroups.Delete(projectID, z, instanceGroupTemp).Do()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Couldn't delete temporary instance group", instanceGroupTemp)
|
fmt.Println("Couldn't delete temporary instance group", instanceGroupTemp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = waitForZoneOp(op, z); err != nil {
|
||||||
|
fmt.Println("Couldn't wait for operation: deleting temporary instance group", instanceGroupName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -238,10 +241,14 @@ func setBackendsTo(orig bool, origMode string, temp bool, tempMode string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bsi.Backends = union
|
bsi.Backends = union
|
||||||
_, err := s.BackendServices.Update(projectID, bsi.Name, bsi).Do()
|
op, err := s.BackendServices.Update(projectID, bsi.Name, bsi).Do()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = waitForGlobalOp(op); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,19 +293,26 @@ func migrateInstances(fromIG, toIG string) error {
|
||||||
z := getResourceName(i.Zone, "zones")
|
z := getResourceName(i.Zone, "zones")
|
||||||
fmt.Printf(" - %s (%s)\n", i.Name, z)
|
fmt.Printf(" - %s (%s)\n", i.Name, z)
|
||||||
rr := &compute.InstanceGroupsRemoveInstancesRequest{Instances: []*compute.InstanceReference{{Instance: i.SelfLink}}}
|
rr := &compute.InstanceGroupsRemoveInstancesRequest{Instances: []*compute.InstanceReference{{Instance: i.SelfLink}}}
|
||||||
_, err := s.InstanceGroups.RemoveInstances(projectID, z, fromIG, rr).Do()
|
op, err := s.InstanceGroups.RemoveInstances(projectID, z, fromIG, rr).Do()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Skipping error when removing instance from group", err)
|
fmt.Println("Skipping error when removing instance from group", err)
|
||||||
}
|
}
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
|
if err = waitForZoneOp(op, z); err != nil {
|
||||||
|
fmt.Println("Failed to wait for operation: removing instance from group", err)
|
||||||
|
}
|
||||||
|
|
||||||
ra := &compute.InstanceGroupsAddInstancesRequest{Instances: []*compute.InstanceReference{{Instance: i.SelfLink}}}
|
ra := &compute.InstanceGroupsAddInstancesRequest{Instances: []*compute.InstanceReference{{Instance: i.SelfLink}}}
|
||||||
_, err = s.InstanceGroups.AddInstances(projectID, z, toIG, ra).Do()
|
op, err = s.InstanceGroups.AddInstances(projectID, z, toIG, ra).Do()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.Contains(err.Error(), "memberAlreadyExists") { // GLBC already added the instance back to the IG
|
if !strings.Contains(err.Error(), "memberAlreadyExists") { // GLBC already added the instance back to the IG
|
||||||
fmt.Println("failed to add instance to new IG", i.Name, err)
|
fmt.Println("failed to add instance to new IG", i.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = waitForZoneOp(op, z); err != nil {
|
||||||
|
fmt.Println("Failed to wait for operation: adding instance to group", err)
|
||||||
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(i)
|
}(i)
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
|
@ -342,9 +356,15 @@ func updateSingleBackend(bs *compute.BackendService) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := s.BackendServices.Update(projectID, bs.Name, bs).Do(); err != nil {
|
op, err := s.BackendServices.Update(projectID, bs.Name, bs).Do()
|
||||||
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = waitForGlobalOp(op); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
fmt.Println("Updated single backend service to target balancing mode.")
|
fmt.Println("Updated single backend service to target balancing mode.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,3 +383,79 @@ func getIGClusterIds() []string {
|
||||||
}
|
}
|
||||||
return ids
|
return ids
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error)) error {
|
||||||
|
if op == nil {
|
||||||
|
return fmt.Errorf("operation must not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if opIsDone(op) {
|
||||||
|
return getErrorFromOp(op)
|
||||||
|
}
|
||||||
|
|
||||||
|
opStart := time.Now()
|
||||||
|
opName := op.Name
|
||||||
|
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
|
||||||
|
start := time.Now()
|
||||||
|
duration := time.Now().Sub(start)
|
||||||
|
if duration > 5*time.Second {
|
||||||
|
glog.Infof("pollOperation: throttled %v for %v", duration, opName)
|
||||||
|
}
|
||||||
|
pollOp, err := getOperation(opName)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("GCE poll operation %s failed: pollOp: [%v] err: [%v] getErrorFromOp: [%v]",
|
||||||
|
opName, pollOp, err, getErrorFromOp(pollOp))
|
||||||
|
}
|
||||||
|
done := opIsDone(pollOp)
|
||||||
|
if done {
|
||||||
|
duration := time.Now().Sub(opStart)
|
||||||
|
if duration > 1*time.Minute {
|
||||||
|
// Log the JSON. It's cleaner than the %v structure.
|
||||||
|
enc, err := pollOp.MarshalJSON()
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("waitForOperation: long operation (%v): %v (failed to encode to JSON: %v)",
|
||||||
|
duration, pollOp, err)
|
||||||
|
} else {
|
||||||
|
glog.Infof("waitForOperation: long operation (%v): %v",
|
||||||
|
duration, string(enc))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return done, getErrorFromOp(pollOp)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func opIsDone(op *compute.Operation) bool {
|
||||||
|
return op != nil && op.Status == "DONE"
|
||||||
|
}
|
||||||
|
|
||||||
|
func getErrorFromOp(op *compute.Operation) error {
|
||||||
|
if op != nil && op.Error != nil && len(op.Error.Errors) > 0 {
|
||||||
|
err := &googleapi.Error{
|
||||||
|
Code: int(op.HttpErrorStatusCode),
|
||||||
|
Message: op.Error.Errors[0].Message,
|
||||||
|
}
|
||||||
|
glog.Errorf("GCE operation failed: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForGlobalOp(op *compute.Operation) error {
|
||||||
|
return waitForOp(op, func(operationName string) (*compute.Operation, error) {
|
||||||
|
return s.GlobalOperations.Get(projectID, operationName).Do()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForRegionOp(op *compute.Operation, region string) error {
|
||||||
|
return waitForOp(op, func(operationName string) (*compute.Operation, error) {
|
||||||
|
return s.RegionOperations.Get(projectID, region, operationName).Do()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForZoneOp(op *compute.Operation, zone string) error {
|
||||||
|
return waitForOp(op, func(operationName string) (*compute.Operation, error) {
|
||||||
|
return s.ZoneOperations.Get(projectID, zone, operationName).Do()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue