Implement HealthzChecker interface. Add flag to allow profiling

This commit is contained in:
Manuel de Brito Fontes 2016-03-22 13:51:50 -03:00
parent d9934ec4db
commit f5892e06fe
10 changed files with 233 additions and 58 deletions

5
Godeps/Godeps.json generated
View file

@ -618,6 +618,11 @@
"Comment": "v1.2.1-beta.0", "Comment": "v1.2.1-beta.0",
"Rev": "fd557a2c9f47c655f9c07d9cf9dee2539e935703" "Rev": "fd557a2c9f47c655f9c07d9cf9dee2539e935703"
}, },
{
"ImportPath": "k8s.io/kubernetes/pkg/healthz",
"Comment": "v1.2.1-beta.0",
"Rev": "fd557a2c9f47c655f9c07d9cf9dee2539e935703"
},
{ {
"ImportPath": "k8s.io/kubernetes/pkg/kubectl", "ImportPath": "k8s.io/kubernetes/pkg/kubectl",
"Comment": "v1.2.1-beta.0", "Comment": "v1.2.1-beta.0",

View file

@ -0,0 +1,20 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthz implements basic http server health checking.
// Usage:
// import _ "healthz" registers a handler on the path '/healthz', that serves 200s
package healthz

View file

@ -0,0 +1,133 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthz
import (
"bytes"
"fmt"
"net/http"
"sync"
)
// HealthzChecker is a named healthz check.
type HealthzChecker interface {
Name() string
Check(req *http.Request) error
}
var defaultHealthz = sync.Once{}
// DefaultHealthz installs the default healthz check to the http.DefaultServeMux.
func DefaultHealthz(checks ...HealthzChecker) {
defaultHealthz.Do(func() {
InstallHandler(http.DefaultServeMux, checks...)
})
}
// PingHealthz returns true automatically when checked
var PingHealthz HealthzChecker = ping{}
// ping implements the simplest possible health checker.
type ping struct{}
func (ping) Name() string {
return "ping"
}
// PingHealthz is a health check that returns true.
func (ping) Check(_ *http.Request) error {
return nil
}
// NamedCheck returns a health checker for the given name and function.
func NamedCheck(name string, check func(r *http.Request) error) HealthzChecker {
return &healthzCheck{name, check}
}
// InstallHandler registers a handler for health checking on the path "/healthz" to mux.
func InstallHandler(mux mux, checks ...HealthzChecker) {
if len(checks) == 0 {
checks = []HealthzChecker{PingHealthz}
}
mux.Handle("/healthz", handleRootHealthz(checks...))
for _, check := range checks {
mux.Handle(fmt.Sprintf("/healthz/%v", check.Name()), adaptCheckToHandler(check.Check))
}
}
// mux is an interface describing the methods InstallHandler requires.
type mux interface {
Handle(pattern string, handler http.Handler)
}
// healthzCheck implements HealthzChecker on an arbitrary name and check function.
type healthzCheck struct {
name string
check func(r *http.Request) error
}
var _ HealthzChecker = &healthzCheck{}
func (c *healthzCheck) Name() string {
return c.name
}
func (c *healthzCheck) Check(r *http.Request) error {
return c.check(r)
}
// handleRootHealthz returns an http.HandlerFunc that serves the provided checks.
func handleRootHealthz(checks ...HealthzChecker) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
failed := false
var verboseOut bytes.Buffer
for _, check := range checks {
err := check.Check(r)
if err != nil {
fmt.Fprintf(&verboseOut, "[-]%v failed: %v\n", check.Name(), err)
failed = true
} else {
fmt.Fprintf(&verboseOut, "[+]%v ok\n", check.Name())
}
}
// always be verbose on failure
if failed {
http.Error(w, fmt.Sprintf("%vhealthz check failed", verboseOut.String()), http.StatusInternalServerError)
return
}
if _, found := r.URL.Query()["verbose"]; !found {
fmt.Fprint(w, "ok")
return
}
verboseOut.WriteTo(w)
fmt.Fprint(w, "healthz check passed\n")
})
}
// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := c(r)
if err != nil {
http.Error(w, fmt.Sprintf("Internal server error: %v", err), http.StatusInternalServerError)
} else {
fmt.Fprint(w, "ok")
}
})
}

View file

@ -18,7 +18,6 @@ package main
import ( import (
"fmt" "fmt"
"net/http"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -54,7 +53,7 @@ type loadBalancerController struct {
ingLister StoreToIngressLister ingLister StoreToIngressLister
svcLister cache.StoreToServiceLister svcLister cache.StoreToServiceLister
endpLister cache.StoreToEndpointsLister endpLister cache.StoreToEndpointsLister
nginx *nginx.NginxManager nginx *nginx.Manager
lbInfo *lbInfo lbInfo *lbInfo
defaultSvc string defaultSvc string
nxgConfigMap string nxgConfigMap string
@ -149,25 +148,6 @@ func (lbc *loadBalancerController) getTCPConfigMap(ns, name string) (*api.Config
return lbc.client.ConfigMaps(ns).Get(name) return lbc.client.ConfigMaps(ns).Get(name)
} }
func (lbc *loadBalancerController) registerHandlers() {
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if err := lbc.nginx.IsHealthy(); err != nil {
w.WriteHeader(500)
w.Write([]byte("nginx error"))
return
}
w.WriteHeader(200)
w.Write([]byte("ok"))
})
http.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) {
lbc.Stop()
})
glog.Fatalf(fmt.Sprintf("%v", http.ListenAndServe(fmt.Sprintf(":%v", *healthzPort), nil)))
}
func (lbc *loadBalancerController) sync() { func (lbc *loadBalancerController) sync() {
ings := lbc.ingLister.Store.List() ings := lbc.ingLister.Store.List()
upstreams, servers := lbc.getUpstreamServers(ings) upstreams, servers := lbc.getUpstreamServers(ings)
@ -540,7 +520,6 @@ func (lbc *loadBalancerController) Stop() {
func (lbc *loadBalancerController) Run() { func (lbc *loadBalancerController) Run() {
glog.Infof("starting NGINX loadbalancer controller") glog.Infof("starting NGINX loadbalancer controller")
go lbc.nginx.Start() go lbc.nginx.Start()
go lbc.registerHandlers()
go lbc.ingController.Run(lbc.stopCh) go lbc.ingController.Run(lbc.stopCh)
go lbc.endpController.Run(lbc.stopCh) go lbc.endpController.Run(lbc.stopCh)

View file

@ -19,6 +19,8 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"net/http"
"net/http/pprof"
"os" "os"
"time" "time"
@ -29,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
) )
@ -64,6 +67,8 @@ var (
buildCfg = flags.Bool("dump-nginx—configuration", false, `Returns a ConfigMap with the default nginx conguration. buildCfg = flags.Bool("dump-nginx—configuration", false, `Returns a ConfigMap with the default nginx conguration.
This can be used as a guide to create a custom configuration.`) This can be used as a guide to create a custom configuration.`)
profiling = flags.Bool("profiling", true, `Enable profiling via web interface host:port/debug/pprof/`)
) )
func main() { func main() {
@ -99,6 +104,8 @@ func main() {
glog.Fatalf("%v", err) glog.Fatalf("%v", err)
} }
go registerHandlers(lbc)
lbc.Run() lbc.Run()
for { for {
@ -115,3 +122,24 @@ type lbInfo struct {
PodIP string PodIP string
PodNamespace string PodNamespace string
} }
func registerHandlers(lbc *loadBalancerController) {
mux := http.NewServeMux()
healthz.InstallHandler(mux, lbc.nginx)
http.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) {
lbc.Stop()
})
if *profiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
}
server := &http.Server{
Addr: fmt.Sprintf(":%v", *healthzPort),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}

View file

@ -17,15 +17,19 @@ limitations under the License.
package nginx package nginx
import ( import (
"fmt"
"net/http"
"os" "os"
"os/exec" "os/exec"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/healthz"
) )
// Start starts a nginx (master process) and waits. If the process ends // Start starts a nginx (master process) and waits. If the process ends
// we need to kill the controller process and return the reason. // we need to kill the controller process and return the reason.
func (ngx *NginxManager) Start() { func (ngx *Manager) Start() {
glog.Info("Starting NGINX process...") glog.Info("Starting NGINX process...")
cmd := exec.Command("nginx") cmd := exec.Command("nginx")
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
@ -50,7 +54,7 @@ func (ngx *NginxManager) Start() {
// shut down, stop accepting new connections and continue to service current requests // shut down, stop accepting new connections and continue to service current requests
// until all such requests are serviced. After that, the old worker processes exit. // until all such requests are serviced. After that, the old worker processes exit.
// http://nginx.org/en/docs/beginners_guide.html#control // http://nginx.org/en/docs/beginners_guide.html#control
func (ngx *NginxManager) CheckAndReload(cfg *nginxConfiguration, ingressCfg IngressConfig) { func (ngx *Manager) CheckAndReload(cfg *nginxConfiguration, ingressCfg IngressConfig) {
ngx.reloadLock.Lock() ngx.reloadLock.Lock()
defer ngx.reloadLock.Unlock() defer ngx.reloadLock.Unlock()
@ -70,7 +74,7 @@ func (ngx *NginxManager) CheckAndReload(cfg *nginxConfiguration, ingressCfg Ingr
// shellOut executes a command and returns its combined standard output and standard // shellOut executes a command and returns its combined standard output and standard
// error in case of an error in the execution // error in case of an error in the execution
func (ngx *NginxManager) shellOut(cmd string) error { func (ngx *Manager) shellOut(cmd string) error {
out, err := exec.Command("sh", "-c", cmd).CombinedOutput() out, err := exec.Command("sh", "-c", cmd).CombinedOutput()
if err != nil { if err != nil {
glog.Errorf("failed to execute %v: %v", cmd, string(out)) glog.Errorf("failed to execute %v: %v", cmd, string(out))
@ -79,3 +83,26 @@ func (ngx *NginxManager) shellOut(cmd string) error {
return nil return nil
} }
// check to verify Manager implements HealthzChecker interface
var _ healthz.HealthzChecker = Manager{}
// Name returns the healthcheck name
func (ngx Manager) Name() string {
return "NGINX"
}
// Check returns if the nginx healthz endpoint is returning ok (status code 200)
func (ngx Manager) Check(_ *http.Request) error {
res, err := http.Get("http://127.0.0.1:8080/healthz")
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("NGINX is unhealthy")
}
return nil
}

View file

@ -203,8 +203,8 @@ type nginxConfiguration struct {
WorkerProcesses string `json:"workerProcesses,omitempty" structs:"workerProcesses,omitempty"` WorkerProcesses string `json:"workerProcesses,omitempty" structs:"workerProcesses,omitempty"`
} }
// NginxManager ... // Manager ...
type NginxManager struct { type Manager struct {
ConfigFile string ConfigFile string
defCfg *nginxConfiguration defCfg *nginxConfiguration
@ -257,11 +257,11 @@ func newDefaultNginxCfg() *nginxConfiguration {
} }
// NewManager ... // NewManager ...
func NewManager(kubeClient *client.Client) *NginxManager { func NewManager(kubeClient *client.Client) *Manager {
ngx := &NginxManager{ ngx := &Manager{
ConfigFile: "/etc/nginx/nginx.conf", ConfigFile: "/etc/nginx/nginx.conf",
defCfg: newDefaultNginxCfg(), defCfg: newDefaultNginxCfg(),
defResolver: strings.Join(getDnsServers(), " "), defResolver: strings.Join(getDNSServers(), " "),
reloadLock: &sync.Mutex{}, reloadLock: &sync.Mutex{},
} }
@ -274,7 +274,7 @@ func NewManager(kubeClient *client.Client) *NginxManager {
return ngx return ngx
} }
func (nginx *NginxManager) createCertsDir(base string) { func (nginx *Manager) createCertsDir(base string) {
if err := os.Mkdir(base, os.ModeDir); err != nil { if err := os.Mkdir(base, os.ModeDir); err != nil {
glog.Fatalf("Couldn't create directory %v: %v", base, err) glog.Fatalf("Couldn't create directory %v: %v", base, err)
} }

View file

@ -27,7 +27,7 @@ import (
) )
// AddOrUpdateCertAndKey creates a .pem file wth the cert and the key with the specified name // AddOrUpdateCertAndKey creates a .pem file wth the cert and the key with the specified name
func (nginx *NginxManager) AddOrUpdateCertAndKey(name string, cert string, key string) string { func (nginx *Manager) AddOrUpdateCertAndKey(name string, cert string, key string) string {
pemFileName := sslDirectory + "/" + name + ".pem" pemFileName := sslDirectory + "/" + name + ".pem"
pem, err := os.Create(pemFileName) pem, err := os.Create(pemFileName)
@ -47,7 +47,7 @@ func (nginx *NginxManager) AddOrUpdateCertAndKey(name string, cert string, key s
// CheckSSLCertificate checks if the certificate and key file are valid // CheckSSLCertificate checks if the certificate and key file are valid
// returning the result of the validation and the list of hostnames // returning the result of the validation and the list of hostnames
// contained in the common name/s // contained in the common name/s
func (nginx *NginxManager) CheckSSLCertificate(secretName string) ([]string, error) { func (nginx *Manager) CheckSSLCertificate(secretName string) ([]string, error) {
pemFileName := sslDirectory + "/" + secretName + ".pem" pemFileName := sslDirectory + "/" + secretName + ".pem"
pemCerts, err := ioutil.ReadFile(pemFileName) pemCerts, err := ioutil.ReadFile(pemFileName)
if err != nil { if err != nil {
@ -55,7 +55,7 @@ func (nginx *NginxManager) CheckSSLCertificate(secretName string) ([]string, err
} }
var block *pem.Block var block *pem.Block
block, pemCerts = pem.Decode(pemCerts) block, _ = pem.Decode(pemCerts)
cert, err := x509.ParseCertificate(block.Bytes) cert, err := x509.ParseCertificate(block.Bytes)
if err != nil { if err != nil {
@ -74,7 +74,7 @@ func (nginx *NginxManager) CheckSSLCertificate(secretName string) ([]string, err
// SearchDHParamFile iterates all the secrets mounted inside the /etc/nginx-ssl directory // SearchDHParamFile iterates all the secrets mounted inside the /etc/nginx-ssl directory
// in order to find a file with the name dhparam.pem. If such file exists it will // in order to find a file with the name dhparam.pem. If such file exists it will
// returns the path. If not it just returns an empty string // returns the path. If not it just returns an empty string
func (nginx *NginxManager) SearchDHParamFile(baseDir string) string { func (nginx *Manager) SearchDHParamFile(baseDir string) string {
files, _ := ioutil.ReadDir(baseDir) files, _ := ioutil.ReadDir(baseDir)
for _, file := range files { for _, file := range files {
if !file.IsDir() { if !file.IsDir() {

View file

@ -37,12 +37,12 @@ var funcMap = template.FuncMap{
}, },
} }
func (ngx *NginxManager) loadTemplate() { func (ngx *Manager) loadTemplate() {
tmpl, _ := template.New("nginx.tmpl").Funcs(funcMap).ParseFiles("./nginx.tmpl") tmpl, _ := template.New("nginx.tmpl").Funcs(funcMap).ParseFiles("./nginx.tmpl")
ngx.template = tmpl ngx.template = tmpl
} }
func (ngx *NginxManager) writeCfg(cfg *nginxConfiguration, ingressCfg IngressConfig) (bool, error) { func (ngx *Manager) writeCfg(cfg *nginxConfiguration, ingressCfg IngressConfig) (bool, error) {
fromMap := structs.Map(cfg) fromMap := structs.Map(cfg)
toMap := structs.Map(ngx.defCfg) toMap := structs.Map(ngx.defCfg)
curNginxCfg := merge(toMap, fromMap) curNginxCfg := merge(toMap, fromMap)

View file

@ -19,9 +19,7 @@ package nginx
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"net/http"
"os" "os"
"os/exec" "os/exec"
"reflect" "reflect"
@ -33,23 +31,8 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
// IsHealthy checks if nginx is running // getDNSServers returns the list of nameservers located in the file /etc/resolv.conf
func (ngx *NginxManager) IsHealthy() error { func getDNSServers() []string {
res, err := http.Get("http://127.0.0.1:8080/healthz")
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("NGINX is unhealthy")
}
return nil
}
// getDnsServers returns the list of nameservers located in the file /etc/resolv.conf
func getDnsServers() []string {
file, err := ioutil.ReadFile("/etc/resolv.conf") file, err := ioutil.ReadFile("/etc/resolv.conf")
if err != nil { if err != nil {
return []string{} return []string{}
@ -78,7 +61,7 @@ func getDnsServers() []string {
} }
// ReadConfig obtains the configuration defined by the user merged with the defaults. // ReadConfig obtains the configuration defined by the user merged with the defaults.
func (ngx *NginxManager) ReadConfig(config *api.ConfigMap) (*nginxConfiguration, error) { func (ngx *Manager) ReadConfig(config *api.ConfigMap) (*nginxConfiguration, error) {
if len(config.Data) == 0 { if len(config.Data) == 0 {
return newDefaultNginxCfg(), nil return newDefaultNginxCfg(), nil
} }
@ -96,7 +79,7 @@ func (ngx *NginxManager) ReadConfig(config *api.ConfigMap) (*nginxConfiguration,
return cfg, nil return cfg, nil
} }
func (ngx *NginxManager) needsReload(data *bytes.Buffer) (bool, error) { func (ngx *Manager) needsReload(data *bytes.Buffer) (bool, error) {
filename := ngx.ConfigFile filename := ngx.ConfigFile
in, err := os.Open(filename) in, err := os.Open(filename)
if err != nil { if err != nil {