Decouple shared functions between controllers (#8829)

* Decouple shared functions between controllers

* Apply suggestions from code review

Co-authored-by: Jintao Zhang <tao12345666333@163.com>

* Fix package names and fmt

Co-authored-by: Jintao Zhang <tao12345666333@163.com>
This commit is contained in:
Ricardo Katz 2022-07-20 15:53:44 -03:00 committed by GitHub
parent 8f9df544ea
commit 4c6a7ee158
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 413 additions and 134 deletions

101
cmd/dataplane/main.go Normal file
View file

@ -0,0 +1,101 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"math/rand" // #nosec
"net/http"
"os"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"k8s.io/klog/v2"
"k8s.io/ingress-nginx/internal/ingress/controller"
"k8s.io/ingress-nginx/internal/ingress/metric"
"k8s.io/ingress-nginx/internal/nginx"
ingressflags "k8s.io/ingress-nginx/pkg/flags"
"k8s.io/ingress-nginx/pkg/metrics"
"k8s.io/ingress-nginx/pkg/util/file"
"k8s.io/ingress-nginx/pkg/util/process"
"k8s.io/ingress-nginx/version"
)
func main() {
klog.InitFlags(nil)
rand.Seed(time.Now().UnixNano())
fmt.Println(version.String())
var err error
showVersion, conf, err := ingressflags.ParseFlags()
if showVersion {
os.Exit(0)
}
if err != nil {
klog.Fatal(err)
}
err = file.CreateRequiredDirectories()
if err != nil {
klog.Fatal(err)
}
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewGoCollector())
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{
PidFn: func() (int, error) { return os.Getpid(), nil },
ReportErrors: true,
}))
mc := metric.NewDummyCollector()
if conf.EnableMetrics {
// TODO: Ingress class is not a part of dataplane anymore
mc, err = metric.NewCollector(conf.MetricsPerHost, conf.ReportStatusClasses, reg, conf.IngressClassConfiguration.Controller, *conf.MetricsBuckets)
if err != nil {
klog.Fatalf("Error creating prometheus collector: %v", err)
}
}
// Pass the ValidationWebhook status to determine if we need to start the collector
// for the admissionWebhook
// TODO: Dataplane does not contain validation webhook so the MetricCollector should not receive
// this as an argument
mc.Start(conf.ValidationWebhook)
if conf.EnableProfiling {
// TODO: Turn Profiler address configurable via flags
go metrics.RegisterProfiler("127.0.0.1", nginx.ProfilerPort)
}
ngx := controller.NewNGINXController(conf, mc)
mux := http.NewServeMux()
metrics.RegisterHealthz(nginx.HealthPath, mux)
metrics.RegisterMetrics(reg, mux)
go metrics.StartHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)
go ngx.Start()
process.HandleSigterm(ngx, conf.PostShutdownGracePeriod, func(code int) {
os.Exit(code)
})
}

View file

@ -21,34 +21,34 @@ import (
"fmt"
"math/rand" // #nosec
"net/http"
"net/http/pprof"
"os"
"os/signal"
"path/filepath"
"runtime"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/client_golang/prometheus/collectors"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
discovery "k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
certutil "k8s.io/client-go/util/cert"
"k8s.io/klog/v2"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress/controller"
"k8s.io/ingress-nginx/internal/ingress/metric"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/net/ssl"
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/ingress-nginx/pkg/util/file"
"k8s.io/ingress-nginx/version"
ingressflags "k8s.io/ingress-nginx/pkg/flags"
"k8s.io/ingress-nginx/pkg/metrics"
"k8s.io/ingress-nginx/pkg/util/process"
)
func main() {
@ -58,7 +58,7 @@ func main() {
fmt.Println(version.String())
showVersion, conf, err := parseFlags()
showVersion, conf, err := ingressflags.ParseFlags()
if showVersion {
os.Exit(0)
}
@ -125,8 +125,8 @@ func main() {
reg := prometheus.NewRegistry()
reg.MustRegister(prometheus.NewGoCollector())
reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
reg.MustRegister(collectors.NewGoCollector())
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{
PidFn: func() (int, error) { return os.Getpid(), nil },
ReportErrors: true,
}))
@ -143,14 +143,14 @@ func main() {
mc.Start(conf.ValidationWebhook)
if conf.EnableProfiling {
go registerProfiler()
go metrics.RegisterProfiler("127.0.0.1", nginx.ProfilerPort)
}
ngx := controller.NewNGINXController(conf, mc)
mux := http.NewServeMux()
registerHealthz(nginx.HealthPath, ngx, mux)
registerMetrics(reg, mux)
metrics.RegisterHealthz(nginx.HealthPath, mux, ngx)
metrics.RegisterMetrics(reg, mux)
_, errExists := os.Stat("/chroot")
if errExists == nil {
@ -159,35 +159,14 @@ func main() {
}
go startHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)
go metrics.StartHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)
go ngx.Start()
handleSigterm(ngx, conf.PostShutdownGracePeriod, func(code int) {
process.HandleSigterm(ngx, conf.PostShutdownGracePeriod, func(code int) {
os.Exit(code)
})
}
type exiter func(code int)
func handleSigterm(ngx *controller.NGINXController, delay int, exit exiter) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM)
<-signalChan
klog.InfoS("Received SIGTERM, shutting down")
exitCode := 0
if err := ngx.Stop(); err != nil {
klog.Warningf("Error during shutdown: %v", err)
exitCode = 1
}
klog.Infof("Handled quit, delaying controller exit for %d seconds", delay)
time.Sleep(time.Duration(delay) * time.Second)
klog.InfoS("Exiting", "code", exitCode)
exit(exitCode)
}
// createApiserverClient creates a new Kubernetes REST client. apiserverHost is
// the URL of the API server in the format protocol://address:port/pathPrefix,
// kubeConfig is the location of a kubeconfig file. If defined, the kubeconfig
@ -293,60 +272,6 @@ func handleFatalInitError(err error) {
err)
}
func registerHealthz(healthPath string, ic *controller.NGINXController, mux *http.ServeMux) {
// expose health check endpoint (/healthz)
healthz.InstallPathHandler(mux,
healthPath,
healthz.PingHealthz,
ic,
)
}
func registerMetrics(reg *prometheus.Registry, mux *http.ServeMux) {
mux.Handle(
"/metrics",
promhttp.InstrumentMetricHandler(
reg,
promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
),
)
}
func registerProfiler() {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/heap", pprof.Index)
mux.HandleFunc("/debug/pprof/mutex", pprof.Index)
mux.HandleFunc("/debug/pprof/goroutine", pprof.Index)
mux.HandleFunc("/debug/pprof/threadcreate", pprof.Index)
mux.HandleFunc("/debug/pprof/block", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
server := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%v", nginx.ProfilerPort),
//G112 (CWE-400): Potential Slowloris Attack
ReadHeaderTimeout: 10 * time.Second,
Handler: mux,
}
klog.Fatal(server.ListenAndServe())
}
func startHTTPServer(host string, port int, mux *http.ServeMux) {
server := &http.Server{
Addr: fmt.Sprintf("%s:%v", host, port),
Handler: mux,
ReadTimeout: 10 * time.Second,
ReadHeaderTimeout: 10 * time.Second,
WriteTimeout: 300 * time.Second,
IdleTimeout: 120 * time.Second,
}
klog.Fatal(server.ListenAndServe())
}
func checkService(key string, kubeClient *kubernetes.Clientset) error {
ns, name, err := k8s.ParseNameNS(key)
if err != nil {

View file

@ -33,6 +33,8 @@ import (
"k8s.io/ingress-nginx/internal/ingress/controller"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/nginx"
ingressflags "k8s.io/ingress-nginx/pkg/flags"
"k8s.io/ingress-nginx/pkg/util/process"
)
func TestCreateApiserverClient(t *testing.T) {
@ -83,7 +85,7 @@ func TestHandleSigterm(t *testing.T) {
t.Fatalf("error creating pod %v: %v", pod, err)
}
resetForTesting(func() { t.Fatal("bad parse") })
ingressflags.ResetForTesting(func() { t.Fatal("bad parse") })
os.Setenv("POD_NAME", podName)
os.Setenv("POD_NAMESPACE", namespace)
@ -97,7 +99,7 @@ func TestHandleSigterm(t *testing.T) {
}()
os.Args = []string{"cmd", "--default-backend-service", "ingress-nginx/default-backend-http", "--http-port", "0", "--https-port", "0"}
_, conf, err := parseFlags()
_, conf, err := ingressflags.ParseFlags()
if err != nil {
t.Errorf("Unexpected error creating NGINX controller: %v", err)
}
@ -105,7 +107,7 @@ func TestHandleSigterm(t *testing.T) {
ngx := controller.NewNGINXController(conf, nil)
go handleSigterm(ngx, 10, func(code int) {
go process.HandleSigterm(ngx, 10, func(code int) {
if code != 1 {
t.Errorf("Expected exit code 1 but %d received", code)
}

View file

@ -26,10 +26,10 @@ import (
networking "k8s.io/api/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
ing_errors "k8s.io/ingress-nginx/internal/ingress/errors"
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/pkg/util/file"
)
var (

View file

@ -26,9 +26,9 @@ import (
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/ingress-nginx/internal/file"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/ingress-nginx/pkg/util/file"
)
func TestNginxCheck(t *testing.T) {

View file

@ -26,7 +26,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/defaults"
"k8s.io/ingress-nginx/internal/runtime"
"k8s.io/ingress-nginx/pkg/util/runtime"
)
var (

View file

@ -39,8 +39,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/annotations/canary"
"k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist"
@ -56,6 +56,8 @@ import (
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/net/ssl"
"k8s.io/ingress-nginx/pkg/util/file"
)
type fakeIngressStore struct {

View file

@ -44,10 +44,8 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
adm_controller "k8s.io/ingress-nginx/internal/admission/controller"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/ingress/controller/process"
@ -61,6 +59,9 @@ import (
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/ingress-nginx/internal/task"
"k8s.io/ingress-nginx/internal/watch"
"k8s.io/ingress-nginx/pkg/util/file"
klog "k8s.io/klog/v2"
)
const (

View file

@ -20,15 +20,17 @@ import (
"fmt"
"strings"
"k8s.io/klog/v2"
apiv1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress"
klog "k8s.io/klog/v2"
"k8s.io/ingress-nginx/internal/net/ssl"
"k8s.io/ingress-nginx/pkg/util/file"
)
// syncSecret synchronizes the content of a TLS Secret (certificate(s), secret

View file

@ -43,9 +43,11 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-nginx/internal/ingress/inspector"
"k8s.io/klog/v2"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/ingress-nginx/pkg/util/file"
klog "k8s.io/klog/v2"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
@ -56,7 +58,6 @@ import (
"k8s.io/ingress-nginx/internal/ingress/errors"
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/nginx"
)
// IngressFilterFunc decides if an Ingress should be omitted or not

View file

@ -34,7 +34,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/controller/config"
ing_net "k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/runtime"
"k8s.io/ingress-nginx/pkg/util/runtime"
)
const (

View file

@ -21,7 +21,7 @@ import (
"os"
"testing"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/pkg/util/file"
)
func TestGetDNSServers(t *testing.T) {

View file

@ -39,11 +39,14 @@ import (
"github.com/zakjan/cert-chain-resolver/certUtil"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/watch"
"k8s.io/klog/v2"
"k8s.io/ingress-nginx/pkg/util/file"
klog "k8s.io/klog/v2"
)
// FakeSSLCertificateUID defines the default UID to use for the fake SSL

View file

@ -39,7 +39,7 @@ import (
"time"
certutil "k8s.io/client-go/util/cert"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/pkg/util/file"
)
// generateRSACerts generates a self signed certificate using a self generated ca

View file

@ -23,7 +23,7 @@ import (
"testing"
"time"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/pkg/util/file"
)
func prepareTimeout() chan bool {

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package main
package flags
import (
"flag"
@ -37,7 +37,11 @@ import (
klog "k8s.io/klog/v2"
)
func parseFlags() (bool, *controller.Configuration, error) {
// TODO: We should split the flags functions between common for all programs
// and specific for each component (like webhook, controller and configurer)
// ParseFlags generates a configuration for Ingress Controller based on the flags
// provided by users
func ParseFlags() (bool, *controller.Configuration, error) {
var (
flags = pflag.NewFlagSet("", pflag.ExitOnError)
@ -379,3 +383,12 @@ https://blog.maxmind.com/2019/12/18/significant-changes-to-accessing-and-using-g
return false, config, err
}
// ResetForTesting clears all flag state and sets the usage function as directed.
// After calling resetForTesting, parse errors in flag handling will not
// exit the program.
// Extracted from https://github.com/golang/go/blob/master/src/flag/export_test.go
func ResetForTesting(usage func()) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
flag.Usage = usage
}

View file

@ -14,32 +14,22 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package main
package flags
import (
"flag"
"os"
"testing"
)
// resetForTesting clears all flag state and sets the usage function as directed.
// After calling resetForTesting, parse errors in flag handling will not
// exit the program.
// Extracted from https://github.com/golang/go/blob/master/src/flag/export_test.go
func resetForTesting(usage func()) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
flag.Usage = usage
}
func TestNoMandatoryFlag(t *testing.T) {
_, _, err := parseFlags()
_, _, err := ParseFlags()
if err != nil {
t.Fatalf("Expected no error but got: %s", err)
}
}
func TestDefaults(t *testing.T) {
resetForTesting(func() { t.Fatal("Parsing failed") })
ResetForTesting(func() { t.Fatal("Parsing failed") })
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
@ -49,7 +39,7 @@ func TestDefaults(t *testing.T) {
"--https-port", "0",
}
showVersion, conf, err := parseFlags()
showVersion, conf, err := ParseFlags()
if err != nil {
t.Fatalf("Unexpected error parsing default flags: %v", err)
}
@ -68,52 +58,52 @@ func TestSetupSSLProxy(t *testing.T) {
}
func TestFlagConflict(t *testing.T) {
resetForTesting(func() { t.Fatal("Parsing failed") })
ResetForTesting(func() { t.Fatal("Parsing failed") })
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"cmd", "--publish-service", "namespace/test", "--http-port", "0", "--https-port", "0", "--publish-status-address", "1.1.1.1"}
_, _, err := parseFlags()
_, _, err := ParseFlags()
if err == nil {
t.Fatalf("Expected an error parsing flags but none returned")
}
}
func TestMaxmindEdition(t *testing.T) {
resetForTesting(func() { t.Fatal("Parsing failed") })
ResetForTesting(func() { t.Fatal("Parsing failed") })
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"cmd", "--publish-service", "namespace/test", "--http-port", "0", "--https-port", "0", "--maxmind-license-key", "0000000", "--maxmind-edition-ids", "GeoLite2-City, TestCheck"}
_, _, err := parseFlags()
_, _, err := ParseFlags()
if err == nil {
t.Fatalf("Expected an error parsing flags but none returned")
}
}
func TestMaxmindMirror(t *testing.T) {
resetForTesting(func() { t.Fatal("Parsing failed") })
ResetForTesting(func() { t.Fatal("Parsing failed") })
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"cmd", "--publish-service", "namespace/test", "--http-port", "0", "--https-port", "0", "--maxmind-mirror", "http://geoip.local", "--maxmind-license-key", "0000000", "--maxmind-edition-ids", "GeoLite2-City, TestCheck"}
_, _, err := parseFlags()
_, _, err := ParseFlags()
if err == nil {
t.Fatalf("Expected an error parsing flags but none returned")
}
}
func TestMaxmindRetryDownload(t *testing.T) {
resetForTesting(func() { t.Fatal("Parsing failed") })
ResetForTesting(func() { t.Fatal("Parsing failed") })
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
os.Args = []string{"cmd", "--publish-service", "namespace/test", "--http-port", "0", "--https-port", "0", "--maxmind-mirror", "http://127.0.0.1", "--maxmind-license-key", "0000000", "--maxmind-edition-ids", "GeoLite2-City", "--maxmind-retries-timeout", "1s", "--maxmind-retries-count", "3"}
_, _, err := parseFlags()
_, _, err := ParseFlags()
if err == nil {
t.Fatalf("Expected an error parsing flags but none returned")
}

87
pkg/metrics/handler.go Normal file
View file

@ -0,0 +1,87 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"net/http"
"net/http/pprof"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apiserver/pkg/server/healthz"
klog "k8s.io/klog/v2"
)
func RegisterHealthz(healthPath string, mux *http.ServeMux, checks ...healthz.HealthChecker) {
healthCheck := []healthz.HealthChecker{healthz.PingHealthz}
if len(checks) > 0 {
healthCheck = append(healthCheck, checks...)
}
// expose health check endpoint (/healthz)
healthz.InstallPathHandler(mux,
healthPath,
healthCheck...,
)
}
func RegisterMetrics(reg *prometheus.Registry, mux *http.ServeMux) {
mux.Handle(
"/metrics",
promhttp.InstrumentMetricHandler(
reg,
promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
),
)
}
func RegisterProfiler(host string, port int) {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/heap", pprof.Index)
mux.HandleFunc("/debug/pprof/mutex", pprof.Index)
mux.HandleFunc("/debug/pprof/goroutine", pprof.Index)
mux.HandleFunc("/debug/pprof/threadcreate", pprof.Index)
mux.HandleFunc("/debug/pprof/block", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
server := &http.Server{
Addr: fmt.Sprintf("%s:%d", host, port),
//G112 (CWE-400): Potential Slowloris Attack
ReadHeaderTimeout: 10 * time.Second,
Handler: mux,
}
klog.Fatal(server.ListenAndServe())
}
func StartHTTPServer(host string, port int, mux *http.ServeMux) {
server := &http.Server{
Addr: fmt.Sprintf("%s:%v", host, port),
Handler: mux,
ReadTimeout: 10 * time.Second,
ReadHeaderTimeout: 10 * time.Second,
WriteTimeout: 300 * time.Second,
IdleTimeout: 120 * time.Second,
}
klog.Fatal(server.ListenAndServe())
}

View file

@ -0,0 +1,24 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
// ProcessController defines a common interface for a process to be controlled,
// like the configurer, the webhook or the proper ingress controller
type ProcessController interface {
Start()
Stop() error
}

View file

@ -0,0 +1,49 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import (
"os"
"os/signal"
"syscall"
"time"
klog "k8s.io/klog/v2"
)
type exiter func(code int)
// HandleSigterm receives a ProcessController interface and deals with
// the graceful shutdown
func HandleSigterm(ngx ProcessController, delay int, exit exiter) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM)
<-signalChan
klog.InfoS("Received SIGTERM, shutting down")
exitCode := 0
if err := ngx.Stop(); err != nil {
klog.Warningf("Error during shutdown: %v", err)
exitCode = 1
}
klog.Infof("Handled quit, delaying controller exit for %d seconds", delay)
time.Sleep(time.Duration(delay) * time.Second)
klog.InfoS("Exiting", "code", exitCode)
exit(exitCode)
}

View file

@ -0,0 +1,79 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import (
"fmt"
"syscall"
"testing"
"time"
)
type FakeProcess struct {
shouldError bool
exitCode int
}
func (f *FakeProcess) Start() {
}
func (f *FakeProcess) Stop() error {
if f.shouldError {
return fmt.Errorf("error")
}
return nil
}
func (f *FakeProcess) exiterFunc(code int) {
f.exitCode = code
}
func sendDelayedSignal(delay time.Duration) {
time.Sleep(delay * time.Second)
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}
func TestHandleSigterm(t *testing.T) {
tests := []struct {
name string
shouldError bool
delay int
}{
{
name: "should exit without error",
shouldError: false,
},
{
name: "should exit with error",
shouldError: true,
delay: 2,
},
}
for _, tt := range tests {
process := &FakeProcess{shouldError: tt.shouldError}
t.Run(tt.name, func(t *testing.T) {
go sendDelayedSignal(2) // Send a signal after 2 seconds
HandleSigterm(process, tt.delay, process.exiterFunc)
})
if tt.shouldError && process.exitCode != 1 {
t.Errorf("wrong return, should be 1 and returned %d", process.exitCode)
}
if !tt.shouldError && process.exitCode != 0 {
t.Errorf("wrong return, should be 0 and returned %d", process.exitCode)
}
}
}