feat: allow configuring nginx worker reload behaviour, to prevent multiple concurrent worker reloads

Signed-off-by: Rafael da Fonseca <rafael.fonseca@wildlifestudios.com>
This commit is contained in:
Rafael da Fonseca 2024-01-19 21:22:12 +00:00
parent a596a18aeb
commit bc96769c91
4 changed files with 66 additions and 3 deletions

View file

@ -114,6 +114,7 @@ The following table shows a configuration option's name, type, and the default v
|[worker-processes](#worker-processes)|string|`<Number of CPUs>`|| |[worker-processes](#worker-processes)|string|`<Number of CPUs>`||
|[worker-cpu-affinity](#worker-cpu-affinity)|string|""|| |[worker-cpu-affinity](#worker-cpu-affinity)|string|""||
|[worker-shutdown-timeout](#worker-shutdown-timeout)|string|"240s"|| |[worker-shutdown-timeout](#worker-shutdown-timeout)|string|"240s"||
|[concurrently-reload-worker-processes](#concurrently-reload-worker-processes)|bool|"true"||
|[load-balance](#load-balance)|string|"round_robin"|| |[load-balance](#load-balance)|string|"round_robin"||
|[variables-hash-bucket-size](#variables-hash-bucket-size)|int|128|| |[variables-hash-bucket-size](#variables-hash-bucket-size)|int|128||
|[variables-hash-max-size](#variables-hash-max-size)|int|2048|| |[variables-hash-max-size](#variables-hash-max-size)|int|2048||

View file

@ -474,6 +474,13 @@ type Configuration struct {
// http://nginx.org/en/docs/ngx_core_module.html#worker_processes // http://nginx.org/en/docs/ngx_core_module.html#worker_processes
WorkerProcesses string `json:"worker-processes,omitempty"` WorkerProcesses string `json:"worker-processes,omitempty"`
// Defines whether multiple concurrent reloads of worker processes should occur.
// Set this to false to prevent more than n x 2 workers to exist at any time, to avoid potential OOM situations and high CPU load
// With this setting on false, configuration changes in the queue will be re-queued with an exponential backoff, until the number of worker process is the expected value.
// By default new worker processes are spawned every time there's a change that cannot be applied dynamically with no upper limit to the number of running workers
// http://nginx.org/en/docs/ngx_core_module.html#worker_processes
ConcurrentlyReloadWorkers bool `json:"concurrently-reload-worker-processes,omitempty"`
// Defines a timeout for a graceful shutdown of worker processes // Defines a timeout for a graceful shutdown of worker processes
// http://nginx.org/en/docs/ngx_core_module.html#worker_shutdown_timeout // http://nginx.org/en/docs/ngx_core_module.html#worker_shutdown_timeout
WorkerShutdownTimeout string `json:"worker-shutdown-timeout,omitempty"` WorkerShutdownTimeout string `json:"worker-shutdown-timeout,omitempty"`
@ -842,6 +849,7 @@ func NewDefault() Configuration {
UseGzip: false, UseGzip: false,
UseGeoIP2: false, UseGeoIP2: false,
WorkerProcesses: strconv.Itoa(runtime.NumCPU()), WorkerProcesses: strconv.Itoa(runtime.NumCPU()),
ConcurrentlyReloadWorkers: true,
WorkerShutdownTimeout: "240s", WorkerShutdownTimeout: "240s",
VariablesHashBucketSize: 256, VariablesHashBucketSize: 256,
VariablesHashMaxSize: 2048, VariablesHashMaxSize: 2048,

View file

@ -35,6 +35,7 @@ import (
"syscall" "syscall"
"text/template" "text/template"
"time" "time"
"unicode"
proxyproto "github.com/armon/go-proxyproto" proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels" "github.com/eapache/channels"
@ -90,6 +91,7 @@ func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXContro
resolver: h, resolver: h,
cfg: config, cfg: config,
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1), syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
workersReloading: false,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller", Component: "nginx-ingress-controller",
@ -227,6 +229,8 @@ type NGINXController struct {
syncRateLimiter flowcontrol.RateLimiter syncRateLimiter flowcontrol.RateLimiter
workersReloading bool
// stopLock is used to enforce that only a single call to Stop send at // stopLock is used to enforce that only a single call to Stop send at
// a given time. We allow stopping through an HTTP endpoint and // a given time. We allow stopping through an HTTP endpoint and
// allowing concurrent stoppers leads to stack traces. // allowing concurrent stoppers leads to stack traces.
@ -668,6 +672,12 @@ Error: %v
// //
//nolint:gocritic // the cfg shouldn't be changed, and shouldn't be mutated by other processes while being rendered. //nolint:gocritic // the cfg shouldn't be changed, and shouldn't be mutated by other processes while being rendered.
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
concurrentlyReloadWorkers := n.store.GetBackendConfiguration().ConcurrentlyReloadWorkers
if !concurrentlyReloadWorkers && n.workersReloading {
klog.InfoS("worker reload already in progress, requeuing reload")
return errors.New("worker reload already in progress, requeuing reload")
}
cfg := n.store.GetBackendConfiguration() cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver cfg.Resolver = n.resolver
@ -733,9 +743,41 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
return fmt.Errorf("%v\n%v", err, string(o)) return fmt.Errorf("%v\n%v", err, string(o))
} }
// Reload status checking runs in a separate goroutine to avoid blocking the sync queue
if !concurrentlyReloadWorkers {
go n.awaitWorkersReload()
}
return nil return nil
} }
// awaitWorkersReload checks if the number of workers has returned to the expected count
func (n *NGINXController) awaitWorkersReload() {
n.workersReloading = true
defer func() { n.workersReloading = false }()
expectedWorkers := n.store.GetBackendConfiguration().WorkerProcesses
var numWorkers string
klog.V(3).Infof("waiting for worker count to be equal to %s", expectedWorkers)
for numWorkers != expectedWorkers {
time.Sleep(time.Second)
o, err := exec.Command("/bin/sh", "-c", "pgrep worker | wc -l").Output()
if err != nil {
klog.ErrorS(err, string(numWorkers))
return
}
// cleanup any non-printable chars from shell output
numWorkers = strings.Map(func(r rune) rune {
if unicode.IsPrint(r) {
return r
}
return -1
}, string(o))
klog.V(3).Infof("Currently running nginx worker processes: %s, expected %s", numWorkers, expectedWorkers)
}
}
// nginxHashBucketSize computes the correct NGINX hash_bucket_size for a hash // nginxHashBucketSize computes the correct NGINX hash_bucket_size for a hash
// with the given longest key. // with the given longest key.
func nginxHashBucketSize(longestString int) int { func nginxHashBucketSize(longestString int) int {

View file

@ -67,6 +67,7 @@ const (
luaSharedDictsKey = "lua-shared-dicts" luaSharedDictsKey = "lua-shared-dicts"
plugins = "plugins" plugins = "plugins"
debugConnections = "debug-connections" debugConnections = "debug-connections"
concurrentlyReloadWorkers = "concurrently-reload-worker-processes"
) )
var ( var (
@ -385,6 +386,17 @@ func ReadConfig(src map[string]string) config.Configuration {
delete(conf, workerProcesses) delete(conf, workerProcesses)
} }
if val, ok := conf[concurrentlyReloadWorkers]; ok {
boolVal, err := strconv.ParseBool(val)
if err != nil {
to.ConcurrentlyReloadWorkers = true
klog.Warningf("failed to parse concurrently-reload-worker-processes setting, valid values are true or false, found %s", val)
} else {
to.ConcurrentlyReloadWorkers = boolVal
}
delete(conf, concurrentlyReloadWorkers)
}
if val, ok := conf[plugins]; ok { if val, ok := conf[plugins]; ok {
to.Plugins = splitAndTrimSpace(val, ",") to.Plugins = splitAndTrimSpace(val, ",")
delete(conf, plugins) delete(conf, plugins)