Refactor shutdown e2e tests
This commit is contained in:
parent
a4f7006dbd
commit
a383117e1e
3 changed files with 59 additions and 32 deletions
|
@ -19,7 +19,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os/exec"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -34,6 +33,7 @@ import (
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
networking "k8s.io/api/networking/v1beta1"
|
networking "k8s.io/api/networking/v1beta1"
|
||||||
apiextcs "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
apiextcs "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
|
@ -502,14 +502,12 @@ func UpdateDeployment(kubeClientSet kubernetes.Interface, namespace string, name
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
rolloutStatsCmd := fmt.Sprintf("%v --namespace %s rollout status deployment/%s -w --timeout 5m", KubectlPath, namespace, deployment.Name)
|
|
||||||
|
|
||||||
if updateFunc != nil {
|
if updateFunc != nil {
|
||||||
if err := updateFunc(deployment); err != nil {
|
if err := updateFunc(deployment); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = exec.Command("bash", "-c", rolloutStatsCmd).Run()
|
err = waitForDeploymentRollout(kubeClientSet, deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -522,7 +520,7 @@ func UpdateDeployment(kubeClientSet kubernetes.Interface, namespace string, name
|
||||||
return errors.Wrapf(err, "scaling the number of replicas to %v", replicas)
|
return errors.Wrapf(err, "scaling the number of replicas to %v", replicas)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = exec.Command("/bin/bash", "-c", rolloutStatsCmd).Run()
|
err = waitForDeploymentRollout(kubeClientSet, deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -538,6 +536,29 @@ func UpdateDeployment(kubeClientSet kubernetes.Interface, namespace string, name
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitForDeploymentRollout(kubeClientSet kubernetes.Interface, resource *appsv1.Deployment) error {
|
||||||
|
return wait.Poll(Poll, 5*time.Minute, func() (bool, error) {
|
||||||
|
d, err := kubeClientSet.AppsV1().Deployments(resource.Namespace).Get(context.TODO(), resource.Name, metav1.GetOptions{})
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.DeletionTimestamp != nil {
|
||||||
|
return false, fmt.Errorf("deployment %q is being deleted", resource.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedReplicas == d.Status.Replicas && d.Status.UnavailableReplicas == 0 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateIngress runs the given updateFunc on the ingress
|
// UpdateIngress runs the given updateFunc on the ingress
|
||||||
func UpdateIngress(kubeClientSet kubernetes.Interface, namespace string, name string, updateFunc func(d *networking.Ingress) error) error {
|
func UpdateIngress(kubeClientSet kubernetes.Interface, namespace string, name string, updateFunc func(d *networking.Ingress) error) error {
|
||||||
ingress, err := kubeClientSet.NetworkingV1beta1().Ingresses(namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
ingress, err := kubeClientSet.NetworkingV1beta1().Ingresses(namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
||||||
|
|
|
@ -166,7 +166,7 @@ func waitForPodsDeleted(kubeClientSet kubernetes.Interface, timeout time.Duratio
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForEndpoints waits for a given amount of time until an endpoint contains.
|
// WaitForEndpoints waits for a given amount of time until the number of endpoints = expectedEndpoints.
|
||||||
func WaitForEndpoints(kubeClientSet kubernetes.Interface, timeout time.Duration, name, ns string, expectedEndpoints int) error {
|
func WaitForEndpoints(kubeClientSet kubernetes.Interface, timeout time.Duration, name, ns string, expectedEndpoints int) error {
|
||||||
if expectedEndpoints == 0 {
|
if expectedEndpoints == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -180,16 +180,7 @@ func WaitForEndpoints(kubeClientSet kubernetes.Interface, timeout time.Duration,
|
||||||
|
|
||||||
assert.Nil(ginkgo.GinkgoT(), err, "getting endpoints")
|
assert.Nil(ginkgo.GinkgoT(), err, "getting endpoints")
|
||||||
|
|
||||||
if len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Addresses) == 0 {
|
if countReadyEndpoints(endpoint) == expectedEndpoints {
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
r := 0
|
|
||||||
for _, es := range endpoint.Subsets {
|
|
||||||
r += len(es.Addresses)
|
|
||||||
}
|
|
||||||
|
|
||||||
if r == expectedEndpoints {
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,6 +188,19 @@ func WaitForEndpoints(kubeClientSet kubernetes.Interface, timeout time.Duration,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func countReadyEndpoints(e *core.Endpoints) int {
|
||||||
|
if e == nil || e.Subsets == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
num := 0
|
||||||
|
for _, sub := range e.Subsets {
|
||||||
|
num += len(sub.Addresses)
|
||||||
|
}
|
||||||
|
|
||||||
|
return num
|
||||||
|
}
|
||||||
|
|
||||||
// podRunningReady checks whether pod p's phase is running and it has a ready
|
// podRunningReady checks whether pod p's phase is running and it has a ready
|
||||||
// condition of status true.
|
// condition of status true.
|
||||||
func podRunningReady(p *core.Pod) (bool, error) {
|
func podRunningReady(p *core.Pod) (bool, error) {
|
||||||
|
|
|
@ -62,7 +62,7 @@ var _ = framework.IngressNginxDescribe("[Shutdown] ingress controller", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
ginkgo.It("should shutdown after waiting 60 seconds for pending connections to be closed", func(done ginkgo.Done) {
|
ginkgo.It("should shutdown after waiting 60 seconds for pending connections to be closed", func(done ginkgo.Done) {
|
||||||
defer ginkgo.GinkgoRecover()
|
defer close(done)
|
||||||
|
|
||||||
err := framework.UpdateDeployment(f.KubeClientSet, f.Namespace, "nginx-ingress-controller", 1,
|
err := framework.UpdateDeployment(f.KubeClientSet, f.Namespace, "nginx-ingress-controller", 1,
|
||||||
func(deployment *appsv1.Deployment) error {
|
func(deployment *appsv1.Deployment) error {
|
||||||
|
@ -71,7 +71,7 @@ var _ = framework.IngressNginxDescribe("[Shutdown] ingress controller", func() {
|
||||||
_, err := f.KubeClientSet.AppsV1().Deployments(f.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
|
_, err := f.KubeClientSet.AppsV1().Deployments(f.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
assert.Nil(ginkgo.GinkgoT(), err)
|
assert.Nil(ginkgo.GinkgoT(), err, "updating ingress controller deployment")
|
||||||
|
|
||||||
annotations := map[string]string{
|
annotations := map[string]string{
|
||||||
"nginx.ingress.kubernetes.io/proxy-send-timeout": "600",
|
"nginx.ingress.kubernetes.io/proxy-send-timeout": "600",
|
||||||
|
@ -87,26 +87,28 @@ var _ = framework.IngressNginxDescribe("[Shutdown] ingress controller", func() {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
result := make(chan int)
|
result := make(chan int)
|
||||||
go func(host string, c chan int) {
|
go func() {
|
||||||
defer ginkgo.GinkgoRecover()
|
|
||||||
|
|
||||||
resp := f.HTTPTestClient().
|
resp := f.HTTPTestClient().
|
||||||
GET("/sleep/70").
|
GET("/sleep/70").
|
||||||
WithHeader("Host", host).
|
WithHeader("Host", host).
|
||||||
Expect().
|
Expect().
|
||||||
Raw()
|
Raw()
|
||||||
|
|
||||||
c <- resp.StatusCode
|
result <- resp.StatusCode
|
||||||
}(host, result)
|
}()
|
||||||
|
|
||||||
|
framework.Sleep(1)
|
||||||
|
|
||||||
f.ScaleDeploymentToZero("nginx-ingress-controller")
|
f.ScaleDeploymentToZero("nginx-ingress-controller")
|
||||||
|
|
||||||
assert.Equal(ginkgo.GinkgoT(), <-result, http.StatusOK, "expecting a valid response from HTTP request")
|
statusCode := <-result
|
||||||
|
assert.Equal(ginkgo.GinkgoT(), http.StatusOK, statusCode, "expecting a valid response from HTTP request")
|
||||||
assert.GreaterOrEqual(ginkgo.GinkgoT(), int(time.Since(startTime).Seconds()), 60, "waiting shutdown")
|
assert.GreaterOrEqual(ginkgo.GinkgoT(), int(time.Since(startTime).Seconds()), 60, "waiting shutdown")
|
||||||
close(done)
|
|
||||||
}, 100)
|
}, 100)
|
||||||
|
|
||||||
ginkgo.It("should shutdown after waiting 150 seconds for pending connections to be closed", func(done ginkgo.Done) {
|
ginkgo.It("should shutdown after waiting 150 seconds for pending connections to be closed", func(done ginkgo.Done) {
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
err := framework.UpdateDeployment(f.KubeClientSet, f.Namespace, "nginx-ingress-controller", 1,
|
err := framework.UpdateDeployment(f.KubeClientSet, f.Namespace, "nginx-ingress-controller", 1,
|
||||||
func(deployment *appsv1.Deployment) error {
|
func(deployment *appsv1.Deployment) error {
|
||||||
grace := int64(3600)
|
grace := int64(3600)
|
||||||
|
@ -130,22 +132,22 @@ var _ = framework.IngressNginxDescribe("[Shutdown] ingress controller", func() {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
result := make(chan int)
|
result := make(chan int)
|
||||||
go func(host string, c chan int) {
|
go func() {
|
||||||
defer ginkgo.GinkgoRecover()
|
|
||||||
|
|
||||||
resp := f.HTTPTestClient().
|
resp := f.HTTPTestClient().
|
||||||
GET("/sleep/150").
|
GET("/sleep/150").
|
||||||
WithHeader("Host", host).
|
WithHeader("Host", host).
|
||||||
Expect().
|
Expect().
|
||||||
Raw()
|
Raw()
|
||||||
|
|
||||||
c <- resp.StatusCode
|
result <- resp.StatusCode
|
||||||
}(host, result)
|
}()
|
||||||
|
|
||||||
|
framework.Sleep(1)
|
||||||
|
|
||||||
f.ScaleDeploymentToZero("nginx-ingress-controller")
|
f.ScaleDeploymentToZero("nginx-ingress-controller")
|
||||||
|
|
||||||
assert.Equal(ginkgo.GinkgoT(), <-result, http.StatusOK, "expecting a valid response from HTTP request")
|
statusCode := <-result
|
||||||
|
assert.Equal(ginkgo.GinkgoT(), http.StatusOK, statusCode, "expecting a valid response from HTTP request")
|
||||||
assert.GreaterOrEqual(ginkgo.GinkgoT(), int(time.Since(startTime).Seconds()), 150, "waiting shutdown")
|
assert.GreaterOrEqual(ginkgo.GinkgoT(), int(time.Since(startTime).Seconds()), 150, "waiting shutdown")
|
||||||
close(done)
|
|
||||||
}, 200)
|
}, 200)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue