From d83a15e91fad0a70d9470deb3bf1704a1f45de34 Mon Sep 17 00:00:00 2001 From: Nick Sardo Date: Tue, 18 Apr 2017 14:34:37 -0700 Subject: [PATCH] Wait for operations instead of sleep --- .../gce/cmd/mode-updater/mode-updater.go | 140 +++++++++++++++--- 1 file changed, 118 insertions(+), 22 deletions(-) diff --git a/controllers/gce/cmd/mode-updater/mode-updater.go b/controllers/gce/cmd/mode-updater/mode-updater.go index 8dfeedcce..a637f7f30 100644 --- a/controllers/gce/cmd/mode-updater/mode-updater.go +++ b/controllers/gce/cmd/mode-updater/mode-updater.go @@ -9,10 +9,14 @@ import ( "sync" "time" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" + "golang.org/x/oauth2" "golang.org/x/oauth2/google" compute "google.golang.org/api/compute/v1" + "google.golang.org/api/googleapi" ) var ( @@ -33,7 +37,10 @@ const ( balancingModeRATE = "RATE" balancingModeUTIL = "UTILIZATION" - version = 0.1 + operationPollInterval = 3 * time.Second + // Creating Route in very large clusters, may take more than half an hour. + operationPollTimeoutDuration = time.Hour + version = 0.1 ) func main() { @@ -159,15 +166,17 @@ func updateMultipleBackends() { NamedPorts: ig.NamedPorts, } fmt.Println("Creating", instanceGroupTemp, "zone:", zone) - _, err = s.InstanceGroups.Insert(projectID, zone, newIg).Do() + op, err := s.InstanceGroups.Insert(projectID, zone, newIg).Do() if err != nil { panic(err) } + + if err = waitForZoneOp(op, zone); err != nil { + panic(err) + } } } - sleep(10 * time.Second) - // Straddle both groups fmt.Println("Straddle both groups in backend services") setBackendsTo(true, balancingModeInverse(targetBalancingMode), true, balancingModeInverse(targetBalancingMode)) @@ -175,36 +184,30 @@ func updateMultipleBackends() { fmt.Println("Migrate instances to temporary group") migrateInstances(instanceGroupName, instanceGroupTemp) - time.Sleep(20 * time.Second) - - // Remove original backends + // Remove original backends to get rid of old balancing mode fmt.Println("Remove original backends") setBackendsTo(false, "", true, balancingModeInverse(targetBalancingMode)) - sleep(1 * time.Minute) - - // Straddle both groups (new balancing mode) + // Straddle both groups (creates backend services to original groups with target mode) fmt.Println("Create backends pointing to original instance groups") setBackendsTo(true, targetBalancingMode, true, balancingModeInverse(targetBalancingMode)) - sleep(20 * time.Second) - fmt.Println("Migrate instances back to original groups") migrateInstances(instanceGroupTemp, instanceGroupName) - sleep(20 * time.Second) - fmt.Println("Remove temporary backends") setBackendsTo(true, targetBalancingMode, false, "") - sleep(20 * time.Second) - fmt.Println("Delete temporary instance groups") for z := range igs { - _, err := s.InstanceGroups.Delete(projectID, z, instanceGroupTemp).Do() + op, err := s.InstanceGroups.Delete(projectID, z, instanceGroupTemp).Do() if err != nil { fmt.Println("Couldn't delete temporary instance group", instanceGroupTemp) } + + if err = waitForZoneOp(op, z); err != nil { + fmt.Println("Couldn't wait for operation: deleting temporary instance group", instanceGroupName) + } } } @@ -238,10 +241,14 @@ func setBackendsTo(orig bool, origMode string, temp bool, tempMode string) { } } bsi.Backends = union - _, err := s.BackendServices.Update(projectID, bsi.Name, bsi).Do() + op, err := s.BackendServices.Update(projectID, bsi.Name, bsi).Do() if err != nil { panic(err) } + + if err = waitForGlobalOp(op); err != nil { + panic(err) + } } } @@ -286,19 +293,26 @@ func migrateInstances(fromIG, toIG string) error { z := getResourceName(i.Zone, "zones") fmt.Printf(" - %s (%s)\n", i.Name, z) rr := &compute.InstanceGroupsRemoveInstancesRequest{Instances: []*compute.InstanceReference{{Instance: i.SelfLink}}} - _, err := s.InstanceGroups.RemoveInstances(projectID, z, fromIG, rr).Do() + op, err := s.InstanceGroups.RemoveInstances(projectID, z, fromIG, rr).Do() if err != nil { fmt.Println("Skipping error when removing instance from group", err) } - time.Sleep(10 * time.Second) + + if err = waitForZoneOp(op, z); err != nil { + fmt.Println("Failed to wait for operation: removing instance from group", err) + } ra := &compute.InstanceGroupsAddInstancesRequest{Instances: []*compute.InstanceReference{{Instance: i.SelfLink}}} - _, err = s.InstanceGroups.AddInstances(projectID, z, toIG, ra).Do() + op, err = s.InstanceGroups.AddInstances(projectID, z, toIG, ra).Do() if err != nil { if !strings.Contains(err.Error(), "memberAlreadyExists") { // GLBC already added the instance back to the IG fmt.Println("failed to add instance to new IG", i.Name, err) } } + + if err = waitForZoneOp(op, z); err != nil { + fmt.Println("Failed to wait for operation: adding instance to group", err) + } wg.Done() }(i) time.Sleep(10 * time.Second) @@ -342,9 +356,15 @@ func updateSingleBackend(bs *compute.BackendService) { return } - if _, err := s.BackendServices.Update(projectID, bs.Name, bs).Do(); err != nil { + op, err := s.BackendServices.Update(projectID, bs.Name, bs).Do() + if err != nil { panic(err) } + + if err = waitForGlobalOp(op); err != nil { + panic(err) + } + fmt.Println("Updated single backend service to target balancing mode.") } @@ -363,3 +383,79 @@ func getIGClusterIds() []string { } return ids } + +func waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error)) error { + if op == nil { + return fmt.Errorf("operation must not be nil") + } + + if opIsDone(op) { + return getErrorFromOp(op) + } + + opStart := time.Now() + opName := op.Name + return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) { + start := time.Now() + duration := time.Now().Sub(start) + if duration > 5*time.Second { + glog.Infof("pollOperation: throttled %v for %v", duration, opName) + } + pollOp, err := getOperation(opName) + if err != nil { + glog.Warningf("GCE poll operation %s failed: pollOp: [%v] err: [%v] getErrorFromOp: [%v]", + opName, pollOp, err, getErrorFromOp(pollOp)) + } + done := opIsDone(pollOp) + if done { + duration := time.Now().Sub(opStart) + if duration > 1*time.Minute { + // Log the JSON. It's cleaner than the %v structure. + enc, err := pollOp.MarshalJSON() + if err != nil { + glog.Warningf("waitForOperation: long operation (%v): %v (failed to encode to JSON: %v)", + duration, pollOp, err) + } else { + glog.Infof("waitForOperation: long operation (%v): %v", + duration, string(enc)) + } + } + } + return done, getErrorFromOp(pollOp) + }) +} + +func opIsDone(op *compute.Operation) bool { + return op != nil && op.Status == "DONE" +} + +func getErrorFromOp(op *compute.Operation) error { + if op != nil && op.Error != nil && len(op.Error.Errors) > 0 { + err := &googleapi.Error{ + Code: int(op.HttpErrorStatusCode), + Message: op.Error.Errors[0].Message, + } + glog.Errorf("GCE operation failed: %v", err) + return err + } + + return nil +} + +func waitForGlobalOp(op *compute.Operation) error { + return waitForOp(op, func(operationName string) (*compute.Operation, error) { + return s.GlobalOperations.Get(projectID, operationName).Do() + }) +} + +func waitForRegionOp(op *compute.Operation, region string) error { + return waitForOp(op, func(operationName string) (*compute.Operation, error) { + return s.RegionOperations.Get(projectID, region, operationName).Do() + }) +} + +func waitForZoneOp(op *compute.Operation, zone string) error { + return waitForOp(op, func(operationName string) (*compute.Operation, error) { + return s.ZoneOperations.Get(projectID, zone, operationName).Do() + }) +}