Refactor health checks and wait until NGINX process ends

This commit is contained in:
Manuel Alejandro de Brito Fontes 2019-09-01 14:21:24 -04:00
parent c85450c1e7
commit c2935ca35c
No known key found for this signature in database
GPG key ID: 786136016A8BA02A
15 changed files with 126 additions and 69 deletions

View file

@ -108,6 +108,7 @@ container: clean-container .container-$(ARCH)
mkdir -p $(TEMP_DIR)/rootfs
cp bin/$(ARCH)/nginx-ingress-controller $(TEMP_DIR)/rootfs/nginx-ingress-controller
cp bin/$(ARCH)/dbg $(TEMP_DIR)/rootfs/dbg
cp bin/$(ARCH)/wait-shutdown $(TEMP_DIR)/rootfs/wait-shutdown
cp -RP ./* $(TEMP_DIR)
$(SED_I) "s|BASEIMAGE|$(BASEIMAGE)|g" $(DOCKERFILE)

View file

@ -60,3 +60,12 @@ go build \
-X ${PKG}/version.COMMIT=${GIT_COMMIT} \
-X ${PKG}/version.REPO=${REPO_INFO}" \
-o "bin/${ARCH}/dbg" "${PKG}/cmd/dbg"
go build \
"${GOBUILD_FLAGS}" \
-ldflags "-s -w \
-X ${PKG}/version.RELEASE=${TAG} \
-X ${PKG}/version.COMMIT=${GIT_COMMIT} \
-X ${PKG}/version.REPO=${REPO_INFO}" \
-o "bin/${ARCH}/wait-shutdown" "${PKG}/cmd/waitshutdown"

View file

@ -131,7 +131,7 @@ func main() {
mux := http.NewServeMux()
if conf.EnableProfiling {
registerProfiler(mux)
go registerProfiler()
}
registerHealthz(ngx, mux)
@ -265,7 +265,9 @@ func registerMetrics(reg *prometheus.Registry, mux *http.ServeMux) {
}
func registerProfiler(mux *http.ServeMux) {
func registerProfiler() {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/heap", pprof.Index)
mux.HandleFunc("/debug/pprof/mutex", pprof.Index)
@ -276,6 +278,12 @@ func registerProfiler(mux *http.ServeMux) {
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
server := &http.Server{
Addr: fmt.Sprintf(":10255"),
Handler: mux,
}
klog.Fatal(server.ListenAndServe())
}
func startHTTPServer(port int, mux *http.ServeMux) {

43
cmd/waitshutdown/main.go Normal file
View file

@ -0,0 +1,43 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"os"
"os/exec"
"time"
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/klog"
)
func main() {
err := exec.Command("bash", "-c", "pkill -SIGTERM -f nginx-ingress-controller").Run()
if err != nil {
klog.Errorf("unexpected error terminating ingress controller: %v", err)
os.Exit(1)
}
// wait for the NGINX process to terminate
timer := time.NewTicker(time.Second * 1)
for range timer.C {
if !nginx.IsRunning() {
timer.Stop()
break
}
}
}

View file

@ -25,7 +25,6 @@ import (
"github.com/ncabatoff/process-exporter/proc"
"github.com/pkg/errors"
"k8s.io/klog"
"k8s.io/ingress-nginx/internal/nginx"
)
@ -37,41 +36,48 @@ func (n NGINXController) Name() string {
// Check returns if the nginx healthz endpoint is returning ok (status code 200)
func (n *NGINXController) Check(_ *http.Request) error {
statusCode, _, err := nginx.NewGetStatusRequest(nginx.HealthPath)
if err != nil {
klog.Errorf("healthcheck error: %v", err)
return err
}
if statusCode != 200 {
klog.Errorf("healthcheck error: %v", statusCode)
return fmt.Errorf("ingress controller is not healthy")
}
statusCode, _, err = nginx.NewGetStatusRequest("/is-dynamic-lb-initialized")
if err != nil {
klog.Errorf("healthcheck error: %v", err)
return err
}
if statusCode != 200 {
klog.Errorf("healthcheck error: %v", statusCode)
return fmt.Errorf("dynamic load balancer not started")
if n.isShuttingDown {
return fmt.Errorf("the ingress controller is shutting down")
}
// check the nginx master process is running
fs, err := proc.NewFS("/proc", false)
if err != nil {
return errors.Wrap(err, "unexpected error reading /proc directory")
return errors.Wrap(err, "reading /proc directory")
}
f, err := ioutil.ReadFile(nginx.PID)
if err != nil {
return errors.Wrapf(err, "unexpected error reading %v", nginx.PID)
return errors.Wrapf(err, "reading %v", nginx.PID)
}
pid, err := strconv.Atoi(strings.TrimRight(string(f), "\r\n"))
if err != nil {
return errors.Wrapf(err, "unexpected error reading the nginx PID from %v", nginx.PID)
return errors.Wrapf(err, "reading NGINX PID from file %v", nginx.PID)
}
_, err = fs.NewProc(pid)
return err
if err != nil {
return errors.Wrapf(err, "checking for NGINX process with PID %v", pid)
}
statusCode, _, err := nginx.NewGetStatusRequest(nginx.HealthPath)
if err != nil {
return errors.Wrapf(err, "checking if NGINX is running")
}
if statusCode != 200 {
return fmt.Errorf("ingress controller is not healthy (%v)", statusCode)
}
statusCode, _, err = nginx.NewGetStatusRequest("/is-dynamic-lb-initialized")
if err != nil {
return errors.Wrapf(err, "checking if the dynamic load balancer started")
}
if statusCode != 200 {
return fmt.Errorf("dynamic load balancer not started")
}
return nil
}

View file

@ -35,12 +35,11 @@ import (
func TestNginxCheck(t *testing.T) {
mux := http.NewServeMux()
listener, err := net.Listen("unix", nginx.StatusSocket)
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", nginx.StatusPort))
if err != nil {
t.Fatalf("crating unix listener: %s", err)
t.Fatalf("crating tcp listener: %s", err)
}
defer listener.Close()
defer os.Remove(nginx.StatusSocket)
server := &httptest.Server{
Listener: listener,

View file

@ -796,8 +796,8 @@ type TemplateConfig struct {
EnableMetrics bool
PID string
StatusSocket string
StatusPath string
StatusPort int
StreamSocket string
}

View file

@ -37,6 +37,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/proxy"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/nginx"
"k8s.io/klog"
)
@ -268,6 +269,8 @@ func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Pr
n.cfg.ListenPorts.SSLProxy,
n.cfg.ListenPorts.Health,
n.cfg.ListenPorts.Default,
10255, // profiling port
nginx.StatusPort,
}
reserverdPorts := sets.NewInt(rp...)
// svcRef format: <(str)namespace>/<(str)service>:<(intstr)port>[:<("PROXY")decode>:<("PROXY")encode>]

View file

@ -605,8 +605,8 @@ func (n NGINXController) generateTemplate(cfg ngx_config.Configuration, ingressC
HealthzURI: nginx.HealthPath,
PID: nginx.PID,
StatusSocket: nginx.StatusSocket,
StatusPath: nginx.StatusPath,
StatusPort: nginx.StatusPort,
StreamSocket: nginx.StreamSocket,
}

View file

@ -17,6 +17,7 @@ limitations under the License.
package controller
import (
"fmt"
"io"
"io/ioutil"
"net"
@ -148,16 +149,15 @@ func TestIsDynamicConfigurationEnough(t *testing.T) {
}
func TestConfigureDynamically(t *testing.T) {
listener, err := net.Listen("unix", nginx.StatusSocket)
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", nginx.StatusPort))
if err != nil {
t.Errorf("crating unix listener: %s", err)
t.Fatalf("crating unix listener: %s", err)
}
defer listener.Close()
defer os.Remove(nginx.StatusSocket)
streamListener, err := net.Listen("unix", nginx.StreamSocket)
if err != nil {
t.Errorf("crating unix listener: %s", err)
t.Fatalf("crating unix listener: %s", err)
}
defer streamListener.Close()
defer os.Remove(nginx.StreamSocket)
@ -319,12 +319,11 @@ func TestConfigureDynamically(t *testing.T) {
}
func TestConfigureCertificates(t *testing.T) {
listener, err := net.Listen("unix", nginx.StatusSocket)
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", nginx.StatusPort))
if err != nil {
t.Fatalf("crating unix listener: %s", err)
}
defer listener.Close()
defer os.Remove(nginx.StatusSocket)
streamListener, err := net.Listen("unix", nginx.StreamSocket)
if err != nil {

View file

@ -21,7 +21,6 @@ import (
"net"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
@ -97,7 +96,7 @@ func TestStatusCollector(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
listener, err := net.Listen("unix", nginx.StatusSocket)
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", nginx.StatusPort))
if err != nil {
t.Fatalf("crating unix listener: %s", err)
}
@ -145,7 +144,6 @@ func TestStatusCollector(t *testing.T) {
cm.Stop()
listener.Close()
os.Remove(nginx.StatusSocket)
})
}
}

View file

@ -28,7 +28,6 @@ import (
"time"
ps "github.com/mitchellh/go-ps"
"github.com/tv42/httpunix"
"k8s.io/klog"
)
@ -38,8 +37,8 @@ var TemplatePath = "/etc/nginx/template/nginx.tmpl"
// PID defines the location of the pid file used by NGINX
var PID = "/tmp/nginx.pid"
// StatusSocket defines the location of the unix socket used by NGINX for the status server
var StatusSocket = "/tmp/nginx-status-server.sock"
// StatusPort port used by NGINX for the status server
var StatusPort = 10256
// HealthPath defines the path used to define the health check location in NGINX
var HealthPath = "/healthz"
@ -56,17 +55,12 @@ var StreamSocket = "/tmp/ingress-stream.sock"
var statusLocation = "nginx-status"
var httpClient *http.Client
func init() {
httpClient = buildUnixSocketClient(HealthCheckTimeout)
}
// NewGetStatusRequest creates a new GET request to the internal NGINX status server
func NewGetStatusRequest(path string) (int, []byte, error) {
url := fmt.Sprintf("%v://%v%v", httpunix.Scheme, statusLocation, path)
url := fmt.Sprintf("http://127.0.0.1:%v%v", StatusPort, path)
res, err := httpClient.Get(url)
client := http.Client{}
res, err := client.Get(url)
if err != nil {
return 0, nil, err
}
@ -82,14 +76,15 @@ func NewGetStatusRequest(path string) (int, []byte, error) {
// NewPostStatusRequest creates a new POST request to the internal NGINX status server
func NewPostStatusRequest(path, contentType string, data interface{}) (int, []byte, error) {
url := fmt.Sprintf("%v://%v%v", httpunix.Scheme, statusLocation, path)
url := fmt.Sprintf("http://127.0.0.1:%v%v", StatusPort, path)
buf, err := json.Marshal(data)
if err != nil {
return 0, nil, err
}
res, err := httpClient.Post(url, contentType, bytes.NewReader(buf))
client := http.Client{}
res, err := client.Post(url, contentType, bytes.NewReader(buf))
if err != nil {
return 0, nil, err
}
@ -142,19 +137,6 @@ func readFileToString(path string) (string, error) {
return string(contents), nil
}
func buildUnixSocketClient(timeout time.Duration) *http.Client {
u := &httpunix.Transport{
DialTimeout: 1 * time.Second,
RequestTimeout: timeout,
ResponseHeaderTimeout: timeout,
}
u.RegisterLocation(statusLocation, StatusSocket)
return &http.Client{
Transport: u,
}
}
// Version return details about NGINX
func Version() string {
flag := "-v"

View file

@ -558,7 +558,7 @@ http {
# default server, used for NGINX healthcheck and access to nginx stats
server {
listen unix:{{ .StatusSocket }};
listen 127.0.0.1:{{ .StatusPort }};
set $proxy_upstream_name "internal";
keepalive_timeout 0;

View file

@ -22,5 +22,14 @@ spec:
- name: nginx-ingress-controller
livenessProbe:
timeoutSeconds: 1
initialDelaySeconds: 1
periodSeconds: 2
readinessProbe:
timeoutSeconds: 1
initialDelaySeconds: 1
periodSeconds: 2
lifecycle:
preStop:
exec:
command:
- /wait-shutdown

View file

@ -199,7 +199,7 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
It("sets controllerPodsCount in Lua general configuration", func() {
// https://github.com/curl/curl/issues/936
curlCmd := fmt.Sprintf("curl --fail --silent --unix-socket %v http://localhost/configuration/general", nginx.StatusSocket)
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/general", nginx.StatusPort)
output, err := f.ExecIngressPod(curlCmd)
Expect(err).ToNot(HaveOccurred())