Improve shutdown logic: wait until no requests are made
Pods in Kubernetes endpoints are expected to shut-down 'gracefully' after receiving SIGTERM - we should keep accepting new connections for a while. This is because Kubernetes updates Service endpoints and sends SIGTERM to pods *in parallel*. See https://github.com/kubernetes/kubernetes/issues/106476 for more detail.
This commit is contained in:
parent
e76ea4766c
commit
25f5d09ec0
3 changed files with 151 additions and 1 deletions
|
@ -23,6 +23,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
|
"k8s.io/ingress-nginx/internal/ingress/metric/collectors"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -377,6 +378,63 @@ func (n *NGINXController) Start() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stopWait waits until no more connections are made to nginx.
|
||||||
|
//
|
||||||
|
// This waits until all of following conditions are met:
|
||||||
|
// - No more requests are made to nginx for the last 5 seconds.
|
||||||
|
// - 'shutdown-grace-period' seconds have passed after calling this method.
|
||||||
|
//
|
||||||
|
// Pods in Kubernetes endpoints are expected to shut-down 'gracefully' after receiving SIGTERM -
|
||||||
|
// we should keep accepting new connections for a while. This is because Kubernetes updates Service endpoints
|
||||||
|
// and sends SIGTERM to pods *in parallel*.
|
||||||
|
// If we don't see new requests for 5 seconds, then we assume that this pod was removed from the upstream endpoints
|
||||||
|
// (AWS ALB endpoints for example), and proceed with shutdown.
|
||||||
|
//
|
||||||
|
// See https://github.com/kubernetes/kubernetes/issues/106476 for more detail on this issue.
|
||||||
|
func (n *NGINXController) stopWait() {
|
||||||
|
const checkFrequency = time.Second
|
||||||
|
const waitUntilNoConnectionsFor = int((5 * time.Second) / checkFrequency)
|
||||||
|
waitAtLeastUntil := time.Now().Add(time.Duration(n.cfg.ShutdownGracePeriod) * time.Second)
|
||||||
|
|
||||||
|
var scraper collectors.NginxStatusScraper
|
||||||
|
lastRequests := 0
|
||||||
|
noChangeTimes := 0
|
||||||
|
|
||||||
|
for ; ; time.Sleep(checkFrequency) {
|
||||||
|
st, err := scraper.Scrape()
|
||||||
|
if err != nil {
|
||||||
|
klog.Warningf("failed to scrape nginx status: %v", err)
|
||||||
|
noChangeTimes = 0
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
diff := st.Requests - lastRequests
|
||||||
|
// We assume that there were no client requests to nginx, if and only if
|
||||||
|
// there were 0 to 2 increase in handled requests from the last scrape.
|
||||||
|
// 1 is to account for our own stub_status request from this method,
|
||||||
|
// and the other 1 is to account for the readinessProbe.
|
||||||
|
// Note that readinessProbe DO happen even when the pod is terminating.
|
||||||
|
// See: https://github.com/kubernetes/kubernetes/issues/122824#issuecomment-1899224434
|
||||||
|
noChange := 0 <= diff && diff <= 2
|
||||||
|
if noChange {
|
||||||
|
noChangeTimes++
|
||||||
|
if noChangeTimes >= waitUntilNoConnectionsFor {
|
||||||
|
// Safe to proceed shutdown, we are seeing no more client request.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
noChangeTimes = 0
|
||||||
|
}
|
||||||
|
lastRequests = st.Requests
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait at least for the configured duration, if any
|
||||||
|
delay := waitAtLeastUntil.Sub(time.Now())
|
||||||
|
if delay > 0 {
|
||||||
|
time.Sleep(delay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Stop gracefully stops the NGINX master process.
|
// Stop gracefully stops the NGINX master process.
|
||||||
func (n *NGINXController) Stop() error {
|
func (n *NGINXController) Stop() error {
|
||||||
n.isShuttingDown = true
|
n.isShuttingDown = true
|
||||||
|
@ -388,7 +446,8 @@ func (n *NGINXController) Stop() error {
|
||||||
return fmt.Errorf("shutdown already in progress")
|
return fmt.Errorf("shutdown already in progress")
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(time.Duration(n.cfg.ShutdownGracePeriod) * time.Second)
|
klog.InfoS("Graceful shutdown - waiting until no more requests are made")
|
||||||
|
n.stopWait()
|
||||||
|
|
||||||
klog.InfoS("Shutting down controller queues")
|
klog.InfoS("Shutting down controller queues")
|
||||||
close(n.stopCh)
|
close(n.stopCh)
|
||||||
|
|
|
@ -624,6 +624,10 @@ func (f *Framework) ScaleDeploymentToZero(name string) {
|
||||||
assert.Nil(ginkgo.GinkgoT(), err, "getting deployment")
|
assert.Nil(ginkgo.GinkgoT(), err, "getting deployment")
|
||||||
assert.NotNil(ginkgo.GinkgoT(), d, "expected a deployment but none returned")
|
assert.NotNil(ginkgo.GinkgoT(), d, "expected a deployment but none returned")
|
||||||
|
|
||||||
|
err = waitForPodsDeleted(f.KubeClientSet, 2*time.Minute, f.Namespace, &metav1.ListOptions{
|
||||||
|
LabelSelector: labelSelectorToString(d.Spec.Selector.MatchLabels),
|
||||||
|
})
|
||||||
|
assert.Nil(ginkgo.GinkgoT(), err, "waiting for no pods")
|
||||||
err = WaitForEndpoints(f.KubeClientSet, DefaultTimeout, name, f.Namespace, 0)
|
err = WaitForEndpoints(f.KubeClientSet, DefaultTimeout, name, f.Namespace, 0)
|
||||||
assert.Nil(ginkgo.GinkgoT(), err, "waiting for no endpoints")
|
assert.Nil(ginkgo.GinkgoT(), err, "waiting for no endpoints")
|
||||||
}
|
}
|
||||||
|
|
87
test/e2e/gracefulshutdown/k8s_async_shutdown.go
Normal file
87
test/e2e/gracefulshutdown/k8s_async_shutdown.go
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
Copyright 2020 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package gracefulshutdown
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/onsi/ginkgo/v2"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/ingress-nginx/test/e2e/framework"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = framework.IngressNginxDescribe("[Shutdown] Asynchronous shutdown", func() {
|
||||||
|
f := framework.NewDefaultFramework("k8s-async-shutdown", func(f *framework.Framework) {
|
||||||
|
f.Namespace = "k8s-async-shutdown"
|
||||||
|
})
|
||||||
|
|
||||||
|
host := "async-shutdown"
|
||||||
|
|
||||||
|
ginkgo.BeforeEach(func() {
|
||||||
|
f.NewSlowEchoDeployment()
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.It("should not shut down while still receiving traffic", func() {
|
||||||
|
defer ginkgo.GinkgoRecover()
|
||||||
|
|
||||||
|
err := f.UpdateIngressControllerDeployment(func(deployment *appsv1.Deployment) error {
|
||||||
|
// Note: e2e's default terminationGracePeriodSeconds is 1 for some reason, so extend it
|
||||||
|
grace := int64(300)
|
||||||
|
deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = &grace
|
||||||
|
_, err := f.KubeClientSet.AppsV1().Deployments(f.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
assert.Nil(ginkgo.GinkgoT(), err, "updating ingress controller deployment")
|
||||||
|
|
||||||
|
f.EnsureIngress(framework.NewSingleIngress(host, "/", host, f.Namespace, framework.SlowEchoService, 80, nil))
|
||||||
|
|
||||||
|
f.WaitForNginxServer(host,
|
||||||
|
func(server string) bool {
|
||||||
|
return strings.Contains(server, "server_name "+host)
|
||||||
|
})
|
||||||
|
|
||||||
|
// We need to get pod IP first because after the pod becomes terminating,
|
||||||
|
// it is removed from Service endpoints, and becomes unable to be discovered by "f.HTTPTestClient()".
|
||||||
|
ip := f.GetNginxPodIP()
|
||||||
|
|
||||||
|
// Assume that the upstream takes 30 seconds to update its endpoints,
|
||||||
|
// therefore we are still receiving traffic while shutting down
|
||||||
|
go func() {
|
||||||
|
defer ginkgo.GinkgoRecover()
|
||||||
|
for i := 0; i < 120; i++ {
|
||||||
|
f.HTTPDumbTestClient().
|
||||||
|
GET("/").
|
||||||
|
WithURL(fmt.Sprintf("http://%s/", ip)).
|
||||||
|
WithHeader("Host", host).
|
||||||
|
Expect().
|
||||||
|
Status(http.StatusOK)
|
||||||
|
|
||||||
|
framework.Sleep(250 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
f.ScaleDeploymentToZero("nginx-ingress-controller")
|
||||||
|
assert.GreaterOrEqualf(ginkgo.GinkgoT(), int(time.Since(start).Seconds()), 35,
|
||||||
|
"should take more than 30 + 5 seconds for graceful shutdown")
|
||||||
|
})
|
||||||
|
})
|
Loading…
Reference in a new issue