diff --git a/controllers/nginx/pkg/cmd/controller/metrics.go b/controllers/nginx/pkg/cmd/controller/metrics.go new file mode 100644 index 000000000..8ab86f148 --- /dev/null +++ b/controllers/nginx/pkg/cmd/controller/metrics.go @@ -0,0 +1,233 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "path/filepath" + + "github.com/golang/glog" + + common "github.com/ncabatoff/process-exporter" + "github.com/ncabatoff/process-exporter/proc" + "github.com/prometheus/client_golang/prometheus" +) + +type exeMatcher struct { + name string + args []string +} + +func (em exeMatcher) MatchAndName(nacl common.NameAndCmdline) (bool, string) { + if len(nacl.Cmdline) == 0 { + return false, "" + } + cmd := filepath.Base(nacl.Cmdline[0]) + return em.name == cmd, "" +} + +func (n *NGINXController) setupMonitor(args []string) { + pc, err := newProcessCollector(true, exeMatcher{"nginx", args}) + if err != nil { + glog.Fatalf("unexpedted error registering nginx collector: %v", err) + } + err = prometheus.Register(pc) + if err != nil { + glog.Warningf("unexpected error registering nginx collector: %v", err) + } +} + +var ( + numprocsDesc = prometheus.NewDesc( + "nginx_num_procs", + "number of processes", + nil, nil) + + cpuSecsDesc = prometheus.NewDesc( + "nginx_cpu_seconds_total", + "Cpu usage in seconds", + nil, nil) + + readBytesDesc = prometheus.NewDesc( + "nginx_read_bytes_total", + "number of bytes read", + nil, nil) + + writeBytesDesc = prometheus.NewDesc( + "nginx_write_bytes_total", + "number of bytes written", + nil, nil) + + memResidentbytesDesc = prometheus.NewDesc( + "nginx_resident_memory_bytes", + "number of bytes of memory in use", + nil, nil) + + memVirtualbytesDesc = prometheus.NewDesc( + "nginx_virtual_memory_bytes", + "number of bytes of memory in use", + nil, nil) + + startTimeDesc = prometheus.NewDesc( + "nginx_oldest_start_time_seconds", + "start time in seconds since 1970/01/01", + nil, nil) + + activeDesc = prometheus.NewDesc( + "nginx_active_connections", + "total number of active connections", + nil, nil) + + acceptedDesc = prometheus.NewDesc( + "nginx_accepted_connections", + "total number of accepted client connections", + nil, nil) + + handledDesc = prometheus.NewDesc( + "nginx_handled_connections", + "total number of handled connections", + nil, nil) + + requestsDesc = prometheus.NewDesc( + "nginx_total_requests", + "total number of client requests", + nil, nil) + + readingDesc = prometheus.NewDesc( + "nginx_current_reading_connections", + "current number of connections where nginx is reading the request header", + nil, nil) + + writingDesc = prometheus.NewDesc( + "nginx_current_writing_connections", + "current number of connections where nginx is writing the response back to the client", + nil, nil) + + waitingDesc = prometheus.NewDesc( + "nginx_current_waiting_connections", + "current number of idle client connections waiting for a request", + nil, nil) +) + +type ( + scrapeRequest struct { + results chan<- prometheus.Metric + done chan struct{} + } + + namedProcessCollector struct { + scrapeChan chan scrapeRequest + *proc.Grouper + fs *proc.FS + } +) + +func newProcessCollector( + children bool, + n common.MatchNamer) (*namedProcessCollector, error) { + + fs, err := proc.NewFS("/proc") + if err != nil { + return nil, err + } + p := &namedProcessCollector{ + scrapeChan: make(chan scrapeRequest), + Grouper: proc.NewGrouper(children, n), + fs: fs, + } + _, err = p.Update(p.fs.AllProcs()) + if err != nil { + return nil, err + } + + go p.start() + + return p, nil +} + +// Describe implements prometheus.Collector. +func (p *namedProcessCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- cpuSecsDesc + ch <- numprocsDesc + ch <- readBytesDesc + ch <- writeBytesDesc + ch <- memResidentbytesDesc + ch <- memVirtualbytesDesc + ch <- startTimeDesc +} + +// Collect implements prometheus.Collector. +func (p *namedProcessCollector) Collect(ch chan<- prometheus.Metric) { + req := scrapeRequest{results: ch, done: make(chan struct{})} + p.scrapeChan <- req + <-req.done +} + +func (p *namedProcessCollector) start() { + for req := range p.scrapeChan { + ch := req.results + p.scrape(ch) + req.done <- struct{}{} + } +} + +func (p *namedProcessCollector) scrape(ch chan<- prometheus.Metric) { + s, err := getNginxStatus() + if err != nil { + glog.Warningf("unexpected error obtaining nginx status info: %v", err) + return + } + + ch <- prometheus.MustNewConstMetric(activeDesc, + prometheus.GaugeValue, float64(s.Active)) + ch <- prometheus.MustNewConstMetric(acceptedDesc, + prometheus.GaugeValue, float64(s.Accepted)) + ch <- prometheus.MustNewConstMetric(handledDesc, + prometheus.GaugeValue, float64(s.Handled)) + ch <- prometheus.MustNewConstMetric(requestsDesc, + prometheus.GaugeValue, float64(s.Requests)) + ch <- prometheus.MustNewConstMetric(readingDesc, + prometheus.GaugeValue, float64(s.Reading)) + ch <- prometheus.MustNewConstMetric(writingDesc, + prometheus.GaugeValue, float64(s.Writing)) + ch <- prometheus.MustNewConstMetric(waitingDesc, + prometheus.GaugeValue, float64(s.Waiting)) + + _, err = p.Update(p.fs.AllProcs()) + if err != nil { + glog.Warningf("unexpected error obtaining nginx process info: %v", err) + return + } + + for gname, gcounts := range p.Groups() { + glog.Infof("%v", gname) + glog.Infof("%v", gcounts) + ch <- prometheus.MustNewConstMetric(numprocsDesc, + prometheus.GaugeValue, float64(gcounts.Procs)) + ch <- prometheus.MustNewConstMetric(memResidentbytesDesc, + prometheus.GaugeValue, float64(gcounts.Memresident)) + ch <- prometheus.MustNewConstMetric(memVirtualbytesDesc, + prometheus.GaugeValue, float64(gcounts.Memvirtual)) + ch <- prometheus.MustNewConstMetric(startTimeDesc, + prometheus.GaugeValue, float64(gcounts.OldestStartTime.Unix())) + ch <- prometheus.MustNewConstMetric(cpuSecsDesc, + prometheus.CounterValue, gcounts.Cpu) + ch <- prometheus.MustNewConstMetric(readBytesDesc, + prometheus.CounterValue, float64(gcounts.ReadBytes)) + ch <- prometheus.MustNewConstMetric(writeBytesDesc, + prometheus.CounterValue, float64(gcounts.WriteBytes)) + } +} diff --git a/controllers/nginx/pkg/cmd/controller/nginx.go b/controllers/nginx/pkg/cmd/controller/nginx.go index b2a4cec94..9e7bd2202 100644 --- a/controllers/nginx/pkg/cmd/controller/nginx.go +++ b/controllers/nginx/pkg/cmd/controller/nginx.go @@ -18,24 +18,30 @@ package main import ( "bytes" + "errors" "fmt" "io/ioutil" + "net" "net/http" "os" "os/exec" + "syscall" + "time" "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api" - "k8s.io/ingress/core/pkg/ingress" - "k8s.io/ingress/core/pkg/ingress/defaults" - - "errors" - "k8s.io/ingress/controllers/nginx/pkg/config" ngx_template "k8s.io/ingress/controllers/nginx/pkg/template" "k8s.io/ingress/controllers/nginx/pkg/version" + "k8s.io/ingress/core/pkg/ingress" + "k8s.io/ingress/core/pkg/ingress/defaults" +) + +const ( + ngxHealthPort = 18080 + ngxHealthPath = "/healthz" + ngxStatusPath = "/internal_nginx_status" ) var ( @@ -78,6 +84,7 @@ Error loading new template : %v } n.t = ngxTpl + go n.Start() return n @@ -93,15 +100,56 @@ type NGINXController struct { // Start start a new NGINX master process running in foreground. func (n NGINXController) Start() { glog.Info("starting NGINX process...") + + done := make(chan error, 1) cmd := exec.Command(n.binary, "-c", cfgPath) + n.start(cmd, done) + + // if the nginx master process dies the workers continue to process requests, + // passing checks but in case of updates in ingress no updates will be + // reflected in the nginx configuration which can lead to confusion and report + // issues because of this behavior. + // To avoid this issue we restart nginx in case of errors. + for { + err := <-done + if exitError, ok := err.(*exec.ExitError); ok { + waitStatus := exitError.Sys().(syscall.WaitStatus) + glog.Warningf(` +------------------------------------------------------------------------------- +NGINX master process died (%v): %v +------------------------------------------------------------------------------- +`, waitStatus.ExitStatus(), err) + } + cmd.Process.Release() + cmd = exec.Command(n.binary, "-c", cfgPath) + // we wait until the workers are killed + for { + conn, err := net.DialTimeout("tcp", "127.0.0.1:80", 1*time.Second) + if err == nil { + conn.Close() + break + } + time.Sleep(1 * time.Second) + } + // start a new nginx master process + n.start(cmd, done) + } +} + +func (n *NGINXController) start(cmd *exec.Cmd, done chan error) { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { glog.Fatalf("nginx error: %v", err) + done <- err + return } - if err := cmd.Wait(); err != nil { - glog.Errorf("nginx error: %v", err) - } + + n.setupMonitor(cmd.Args) + + go func() { + done <- cmd.Wait() + }() } // Reload checks if the running configuration file is different @@ -260,7 +308,7 @@ func (n NGINXController) Name() string { // Check returns if the nginx healthz endpoint is returning ok (status code 200) func (n NGINXController) Check(_ *http.Request) error { - res, err := http.Get("http://127.0.0.1:18080/healthz") + res, err := http.Get(fmt.Sprintf("http://localhost:%v%v", ngxHealthPort, ngxHealthPath)) if err != nil { return err } diff --git a/controllers/nginx/pkg/cmd/controller/status.go b/controllers/nginx/pkg/cmd/controller/status.go new file mode 100644 index 000000000..bfa1c383b --- /dev/null +++ b/controllers/nginx/pkg/cmd/controller/status.go @@ -0,0 +1,99 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "io/ioutil" + "net/http" + "regexp" + "strconv" +) + +var ( + ac = regexp.MustCompile(`Active connections: (\d+)`) + sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`) + reading = regexp.MustCompile(`Reading: (\d+)`) + writing = regexp.MustCompile(`Writing: (\d+)`) + waiting = regexp.MustCompile(`Waiting: (\d+)`) +) + +type nginxStatus struct { + // Active total number of active connections + Active int + // Accepted total number of accepted client connections + Accepted int + // Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit). + Handled int + // Requests total number of client requests. + Requests int + // Reading current number of connections where nginx is reading the request header. + Reading int + // Writing current number of connections where nginx is writing the response back to the client. + Writing int + // Waiting current number of idle client connections waiting for a request. + Waiting int +} + +func getNginxStatus() (*nginxStatus, error) { + resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%v%v", ngxHealthPort, ngxStatusPath)) + if err != nil { + return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err) + } + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("unexpected error scraping nginx status page (%v)", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 400 { + return nil, fmt.Errorf("unexpected error scraping nginx status page (status %v)", resp.StatusCode) + } + + return parse(string(data)), nil +} + +func parse(data string) *nginxStatus { + acr := ac.FindStringSubmatch(data) + sahrr := sahr.FindStringSubmatch(data) + readingr := reading.FindStringSubmatch(data) + writingr := writing.FindStringSubmatch(data) + waitingr := waiting.FindStringSubmatch(data) + + return &nginxStatus{ + toInt(acr, 1), + toInt(sahrr, 1), + toInt(sahrr, 2), + toInt(sahrr, 3), + toInt(readingr, 1), + toInt(writingr, 1), + toInt(waitingr, 1), + } +} + +func toInt(data []string, pos int) int { + if len(data) == 0 { + return 0 + } + if pos > len(data) { + return 0 + } + if v, err := strconv.Atoi(data[pos]); err == nil { + return v + } + return 0 +} diff --git a/controllers/nginx/pkg/cmd/controller/status_test.go b/controllers/nginx/pkg/cmd/controller/status_test.go new file mode 100644 index 000000000..1dda3a01e --- /dev/null +++ b/controllers/nginx/pkg/cmd/controller/status_test.go @@ -0,0 +1,70 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "reflect" + "testing" +) + +func TestParseStatus(t *testing.T) { + tests := []struct { + in string + out *nginxStatus + }{ + {`Active connections: 43 +server accepts handled requests + 7368 7368 10993 +Reading: 0 Writing: 5 Waiting: 38`, + &nginxStatus{43, 7368, 7368, 10993, 0, 5, 38}, + }, + {`Active connections: 0 +server accepts handled requests + 1 7 0 +Reading: A Writing: B Waiting: 38`, + &nginxStatus{0, 1, 7, 0, 0, 0, 38}, + }, + } + + for _, test := range tests { + r := parse(test.in) + if !reflect.DeepEqual(r, test.out) { + t.Fatalf("expected %v but returned %v", test.out, r) + } + } +} + +func TestToint(t *testing.T) { + tests := []struct { + in []string + pos int + exp int + }{ + {[]string{}, 0, 0}, + {[]string{}, 1, 0}, + {[]string{"A"}, 0, 0}, + {[]string{"1"}, 0, 1}, + {[]string{"a", "2"}, 1, 2}, + } + + for _, test := range tests { + v := toInt(test.in, test.pos) + if v != test.exp { + t.Fatalf("expected %v but returned %v", test.exp, v) + } + } +} diff --git a/controllers/nginx/rootfs/etc/nginx/template/nginx.tmpl b/controllers/nginx/rootfs/etc/nginx/template/nginx.tmpl index 906afd370..4c4845af3 100644 --- a/controllers/nginx/rootfs/etc/nginx/template/nginx.tmpl +++ b/controllers/nginx/rootfs/etc/nginx/template/nginx.tmpl @@ -349,6 +349,17 @@ http { {{ end }} } + # this location is used to extract nginx metrics + # using prometheus. + # TODO: enable extraction for vts module. + location /internal_nginx_status { + allow 127.0.0.1; + deny all; + + access_log off; + stub_status on; + } + location / { set $proxy_upstream_name "upstream-default-backend"; proxy_pass http://upstream-default-backend; diff --git a/core/pkg/ingress/controller/controller.go b/core/pkg/ingress/controller/controller.go index 73ed53e18..9afc60c6f 100644 --- a/core/pkg/ingress/controller/controller.go +++ b/core/pkg/ingress/controller/controller.go @@ -140,7 +140,7 @@ func newIngressController(config *Configuration) *GenericController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(unversionedcore.EventSinkImpl{ + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ Interface: config.Client.Core().Events(config.Namespace), }) diff --git a/core/pkg/ingress/controller/launch.go b/core/pkg/ingress/controller/launch.go index b21c648bf..95e50c877 100644 --- a/core/pkg/ingress/controller/launch.go +++ b/core/pkg/ingress/controller/launch.go @@ -11,9 +11,9 @@ import ( "time" "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/pflag" - "github.com/prometheus/client_golang/prometheus" "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" @@ -161,7 +161,7 @@ func registerHandlers(enableProfiling bool, port int, ic *GenericController) { ic.cfg.Backend, ) - mux.Handle("/metrics", prometheus.Handler()) + mux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/build", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) diff --git a/core/pkg/ingress/controller/metrics.go b/core/pkg/ingress/controller/metrics.go index 1b22b5bed..de3acbd20 100644 --- a/core/pkg/ingress/controller/metrics.go +++ b/core/pkg/ingress/controller/metrics.go @@ -16,7 +16,9 @@ limitations under the License. package controller -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/prometheus/client_golang/prometheus" +) const ( ns = "ingress_controller" @@ -27,9 +29,6 @@ const ( func init() { prometheus.MustRegister(reloadOperation) prometheus.MustRegister(reloadOperationErrors) - - reloadOperationErrors.WithLabelValues(reloadLabel).Set(0) - reloadOperation.WithLabelValues(reloadLabel).Set(0) } var (