Merge pull request #2608 from fmejia97/expose-udp-metrics-updated

Expose UDP message on /metrics endpoint
This commit is contained in:
k8s-ci-robot 2018-06-14 06:13:28 -07:00 committed by GitHub
commit c9a0c90295
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 575 additions and 746 deletions

View file

@ -39,7 +39,9 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/controller"
"k8s.io/ingress-nginx/internal/ingress/metric/collector"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/net/ssl"
"k8s.io/ingress-nginx/version"
@ -125,6 +127,18 @@ func main() {
mux := http.NewServeMux()
go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux)
err = collector.InitNGINXStatusCollector(conf.Namespace, class.IngressClass, conf.ListenPorts.Status)
if err != nil {
glog.Fatalf("Error generating metric collector: %v", err)
}
err = collector.InitUDPCollector(conf.Namespace, class.IngressClass, 8000)
if err != nil {
glog.Fatalf("Error generating UDP collector: %v", err)
}
ngx.Start()
}

View file

@ -1,30 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import "github.com/prometheus/client_golang/prometheus"
// Stopable defines a prometheus collector that can be stopped
type Stopable interface {
prometheus.Collector
Stop()
}
type scrapeRequest struct {
results chan<- prometheus.Metric
done chan struct{}
}

View file

@ -1,225 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"github.com/golang/glog"
)
var (
ac = regexp.MustCompile(`Active connections: (\d+)`)
sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`)
reading = regexp.MustCompile(`Reading: (\d+)`)
writing = regexp.MustCompile(`Writing: (\d+)`)
waiting = regexp.MustCompile(`Waiting: (\d+)`)
)
type basicStatus struct {
// Active total number of active connections
Active int
// Accepted total number of accepted client connections
Accepted int
// Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit).
Handled int
// Requests total number of client requests.
Requests int
// Reading current number of connections where nginx is reading the request header.
Reading int
// Writing current number of connections where nginx is writing the response back to the client.
Writing int
// Waiting current number of idle client connections waiting for a request.
Waiting int
}
// https://github.com/vozlt/nginx-module-vts
type vts struct {
NginxVersion string `json:"nginxVersion"`
LoadMsec int `json:"loadMsec"`
NowMsec int `json:"nowMsec"`
// Total connections and requests(same as stub_status_module in NGINX)
Connections connections `json:"connections"`
// Traffic(in/out) and request and response counts and cache hit ratio per each server zone
ServerZones map[string]serverZone `json:"serverZones"`
// Traffic(in/out) and request and response counts and cache hit ratio per each server zone filtered through
// the vhost_traffic_status_filter_by_set_key directive
FilterZones map[string]map[string]filterZone `json:"filterZones"`
// Traffic(in/out) and request and response counts per server in each upstream group
UpstreamZones map[string][]upstreamZone `json:"upstreamZones"`
}
type serverZone struct {
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
Responses response `json:"responses"`
Cache cache `json:"cache"`
}
type filterZone struct {
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
Cache cache `json:"cache"`
Responses response `json:"responses"`
}
type upstreamZone struct {
Responses response `json:"responses"`
Server string `json:"server"`
RequestCounter float64 `json:"requestCounter"`
InBytes float64 `json:"inBytes"`
OutBytes float64 `json:"outBytes"`
ResponseMsec float64 `json:"responseMsec"`
Weight float64 `json:"weight"`
MaxFails float64 `json:"maxFails"`
FailTimeout float64 `json:"failTimeout"`
Backup BoolToFloat64 `json:"backup"`
Down BoolToFloat64 `json:"down"`
}
type cache struct {
Miss float64 `json:"miss"`
Bypass float64 `json:"bypass"`
Expired float64 `json:"expired"`
Stale float64 `json:"stale"`
Updating float64 `json:"updating"`
Revalidated float64 `json:"revalidated"`
Hit float64 `json:"hit"`
Scarce float64 `json:"scarce"`
}
type response struct {
OneXx float64 `json:"1xx"`
TwoXx float64 `json:"2xx"`
TheeXx float64 `json:"3xx"`
FourXx float64 `json:"4xx"`
FiveXx float64 `json:"5xx"`
}
type connections struct {
Active float64 `json:"active"`
Reading float64 `json:"reading"`
Writing float64 `json:"writing"`
Waiting float64 `json:"waiting"`
Accepted float64 `json:"accepted"`
Handled float64 `json:"handled"`
Requests float64 `json:"requests"`
}
// BoolToFloat64 ...
type BoolToFloat64 float64
// UnmarshalJSON ...
func (bit BoolToFloat64) UnmarshalJSON(data []byte) error {
asString := string(data)
if asString == "1" || asString == "true" {
bit = 1
} else if asString == "0" || asString == "false" {
bit = 0
} else {
return fmt.Errorf(fmt.Sprintf("boolean unmarshal error: invalid input %s", asString))
}
return nil
}
func getNginxStatus(port int, path string) (*basicStatus, error) {
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
glog.V(3).Infof("start scraping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err)
}
return parse(string(data)), nil
}
func httpBody(url string) ([]byte, error) {
resp, err := http.DefaultClient.Get(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx : %v", err)
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode)
}
return data, nil
}
func getNginxVtsMetrics(port int, path string) (*vts, error) {
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
glog.V(3).Infof("start scraping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx vts (%v)", err)
}
var vts *vts
err = json.Unmarshal(data, &vts)
if err != nil {
return nil, fmt.Errorf("unexpected error json unmarshal (%v)", err)
}
glog.V(3).Infof("scrape returned : %v", vts)
return vts, nil
}
func parse(data string) *basicStatus {
acr := ac.FindStringSubmatch(data)
sahrr := sahr.FindStringSubmatch(data)
readingr := reading.FindStringSubmatch(data)
writingr := writing.FindStringSubmatch(data)
waitingr := waiting.FindStringSubmatch(data)
return &basicStatus{
toInt(acr, 1),
toInt(sahrr, 1),
toInt(sahrr, 2),
toInt(sahrr, 3),
toInt(readingr, 1),
toInt(writingr, 1),
toInt(waitingr, 1),
}
}
func toInt(data []string, pos int) int {
if len(data) == 0 {
return 0
}
if pos > len(data) {
return 0
}
if v, err := strconv.Atoi(data[pos]); err == nil {
return v
}
return 0
}

View file

@ -1,72 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"testing"
"github.com/kylelemons/godebug/pretty"
)
func TestParseStatus(t *testing.T) {
tests := []struct {
in string
out *basicStatus
}{
{`Active connections: 43
server accepts handled requests
7368 7368 10993
Reading: 0 Writing: 5 Waiting: 38`,
&basicStatus{43, 7368, 7368, 10993, 0, 5, 38},
},
{`Active connections: 0
server accepts handled requests
1 7 0
Reading: A Writing: B Waiting: 38`,
&basicStatus{0, 1, 7, 0, 0, 0, 38},
},
}
for _, test := range tests {
r := parse(test.in)
if diff := pretty.Compare(r, test.out); diff != "" {
t.Logf("%v", diff)
t.Fatalf("expected %v but returned %v", test.out, r)
}
}
}
func TestToint(t *testing.T) {
tests := []struct {
in []string
pos int
exp int
}{
{[]string{}, 0, 0},
{[]string{}, 1, 0},
{[]string{"A"}, 0, 0},
{[]string{"1"}, 0, 1},
{[]string{"a", "2"}, 1, 2},
}
for _, test := range tests {
v := toInt(test.in, test.pos)
if v != test.exp {
t.Fatalf("expected %v but returned %v", test.exp, v)
}
}
}

View file

@ -1,273 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"reflect"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
const ns = "nginx"
type (
vtsCollector struct {
scrapeChan chan scrapeRequest
port int
path string
data *vtsData
watchNamespace string
ingressClass string
}
vtsData struct {
bytes *prometheus.Desc
cache *prometheus.Desc
connections *prometheus.Desc
responses *prometheus.Desc
requests *prometheus.Desc
filterZoneBytes *prometheus.Desc
filterZoneResponses *prometheus.Desc
filterZoneCache *prometheus.Desc
upstreamBackup *prometheus.Desc
upstreamBytes *prometheus.Desc
upstreamDown *prometheus.Desc
upstreamFailTimeout *prometheus.Desc
upstreamMaxFails *prometheus.Desc
upstreamResponses *prometheus.Desc
upstreamRequests *prometheus.Desc
upstreamResponseMsec *prometheus.Desc
upstreamWeight *prometheus.Desc
}
)
// NewNGINXVTSCollector returns a new prometheus collector for the VTS module
func NewNGINXVTSCollector(watchNamespace, ingressClass string, port int, path string) Stopable {
p := vtsCollector{
scrapeChan: make(chan scrapeRequest),
port: port,
path: path,
watchNamespace: watchNamespace,
ingressClass: ingressClass,
}
p.data = &vtsData{
bytes: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "bytes_total"),
"Nginx bytes count",
[]string{"ingress_class", "namespace", "server_zone", "direction"}, nil),
cache: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "cache_total"),
"Nginx cache count",
[]string{"ingress_class", "namespace", "server_zone", "type"}, nil),
connections: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "connections_total"),
"Nginx connections count",
[]string{"ingress_class", "namespace", "type"}, nil),
responses: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "responses_total"),
"The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"ingress_class", "namespace", "server_zone", "status_code"}, nil),
requests: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "requests_total"),
"The total number of requested client connections.",
[]string{"ingress_class", "namespace", "server_zone"}, nil),
filterZoneBytes: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "filterzone_bytes_total"),
"Nginx bytes count",
[]string{"ingress_class", "namespace", "server_zone", "key", "direction"}, nil),
filterZoneResponses: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "filterzone_responses_total"),
"The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"ingress_class", "namespace", "server_zone", "key", "status_code"}, nil),
filterZoneCache: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "filterzone_cache_total"),
"Nginx cache count",
[]string{"ingress_class", "namespace", "server_zone", "key", "type"}, nil),
upstreamBackup: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_backup"),
"Current backup setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamBytes: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_bytes_total"),
"The total number of bytes sent to this server.",
[]string{"ingress_class", "namespace", "upstream", "server", "direction"}, nil),
upstreamDown: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "vts_upstream_down_total"),
"Current down setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamFailTimeout: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_fail_timeout"),
"Current fail_timeout setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamMaxFails: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_maxfails"),
"Current max_fails setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamResponses: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_responses_total"),
"The number of upstream responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.",
[]string{"ingress_class", "namespace", "upstream", "server", "status_code"}, nil),
upstreamRequests: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_requests_total"),
"The total number of client connections forwarded to this server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamResponseMsec: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_response_msecs_avg"),
"The average of only upstream response processing times in milliseconds.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
upstreamWeight: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "upstream_weight"),
"Current upstream weight setting of the server.",
[]string{"ingress_class", "namespace", "upstream", "server"}, nil),
}
go p.start()
return p
}
// Describe implements prometheus.Collector.
func (p vtsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- p.data.bytes
ch <- p.data.cache
ch <- p.data.connections
ch <- p.data.requests
ch <- p.data.responses
ch <- p.data.upstreamBackup
ch <- p.data.upstreamBytes
ch <- p.data.upstreamDown
ch <- p.data.upstreamFailTimeout
ch <- p.data.upstreamMaxFails
ch <- p.data.upstreamRequests
ch <- p.data.upstreamResponseMsec
ch <- p.data.upstreamResponses
ch <- p.data.upstreamWeight
ch <- p.data.filterZoneBytes
ch <- p.data.filterZoneCache
ch <- p.data.filterZoneResponses
}
// Collect implements prometheus.Collector.
func (p vtsCollector) Collect(ch chan<- prometheus.Metric) {
req := scrapeRequest{results: ch, done: make(chan struct{})}
p.scrapeChan <- req
<-req.done
}
func (p vtsCollector) start() {
for req := range p.scrapeChan {
ch := req.results
p.scrapeVts(ch)
req.done <- struct{}{}
}
}
func (p vtsCollector) Stop() {
close(p.scrapeChan)
}
// scrapeVts scrape nginx vts metrics
func (p vtsCollector) scrapeVts(ch chan<- prometheus.Metric) {
nginxMetrics, err := getNginxVtsMetrics(p.port, p.path)
if err != nil {
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
return
}
reflectMetrics(&nginxMetrics.Connections, p.data.connections, ch, p.ingressClass, p.watchNamespace)
for name, zones := range nginxMetrics.UpstreamZones {
for pos, value := range zones {
reflectMetrics(&zones[pos].Responses, p.data.upstreamResponses, ch, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamRequests,
prometheus.CounterValue, zones[pos].RequestCounter, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamDown,
prometheus.CounterValue, float64(zones[pos].Down), p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamWeight,
prometheus.CounterValue, zones[pos].Weight, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamResponseMsec,
prometheus.CounterValue, zones[pos].ResponseMsec, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamBackup,
prometheus.CounterValue, float64(zones[pos].Backup), p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamFailTimeout,
prometheus.CounterValue, zones[pos].FailTimeout, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamMaxFails,
prometheus.CounterValue, zones[pos].MaxFails, p.ingressClass, p.watchNamespace, name, value.Server)
ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes,
prometheus.CounterValue, zones[pos].InBytes, p.ingressClass, p.watchNamespace, name, value.Server, "in")
ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes,
prometheus.CounterValue, zones[pos].OutBytes, p.ingressClass, p.watchNamespace, name, value.Server, "out")
}
}
for name, zone := range nginxMetrics.ServerZones {
reflectMetrics(&zone.Responses, p.data.responses, ch, p.ingressClass, p.watchNamespace, name)
reflectMetrics(&zone.Cache, p.data.cache, ch, p.ingressClass, p.watchNamespace, name)
ch <- prometheus.MustNewConstMetric(p.data.requests,
prometheus.CounterValue, zone.RequestCounter, p.ingressClass, p.watchNamespace, name)
ch <- prometheus.MustNewConstMetric(p.data.bytes,
prometheus.CounterValue, zone.InBytes, p.ingressClass, p.watchNamespace, name, "in")
ch <- prometheus.MustNewConstMetric(p.data.bytes,
prometheus.CounterValue, zone.OutBytes, p.ingressClass, p.watchNamespace, name, "out")
}
for serverZone, keys := range nginxMetrics.FilterZones {
for name, zone := range keys {
reflectMetrics(&zone.Responses, p.data.filterZoneResponses, ch, p.ingressClass, p.watchNamespace, serverZone, name)
reflectMetrics(&zone.Cache, p.data.filterZoneCache, ch, p.ingressClass, p.watchNamespace, serverZone, name)
ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes,
prometheus.CounterValue, zone.InBytes, p.ingressClass, p.watchNamespace, serverZone, name, "in")
ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes,
prometheus.CounterValue, zone.OutBytes, p.ingressClass, p.watchNamespace, serverZone, name, "out")
}
}
}
func reflectMetrics(value interface{}, desc *prometheus.Desc, ch chan<- prometheus.Metric, labels ...string) {
val := reflect.ValueOf(value).Elem()
for i := 0; i < val.NumField(); i++ {
tag := val.Type().Field(i).Tag
l := append(labels, tag.Get("json"))
ch <- prometheus.MustNewConstMetric(desc,
prometheus.CounterValue, val.Field(i).Interface().(float64),
l...)
}
}

View file

@ -65,24 +65,14 @@ type statusModule string
const (
ngxHealthPath = "/healthz"
defaultStatusModule statusModule = "default"
vtsStatusModule statusModule = "vts"
)
var (
tmplPath = "/etc/nginx/template/nginx.tmpl"
cfgPath = "/etc/nginx/nginx.conf"
nginxBinary = "/usr/sbin/nginx"
tmplPath = "/etc/nginx/template/nginx.tmpl"
)
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = nginxBinary
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
@ -95,8 +85,6 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
}
n := &NGINXController{
binary: ngx,
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
@ -131,8 +119,6 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
fs,
n.updateCh)
n.stats = newStatsCollector(config.Namespace, class.IngressClass, n.binary, n.cfg.ListenPorts.Status)
n.syncQueue = task.NewTaskQueue(n.syncIngress)
n.annotations = annotations.NewAnnotationExtractor(n.store)
@ -252,12 +238,9 @@ type NGINXController struct {
t *ngx_template.Template
binary string
resolver []net.IP
stats *statsCollector
statusModule statusModule
// returns true if IPV6 is enabled in the pod
isIPV6Enabled bool
isShuttingDown bool
@ -279,7 +262,7 @@ func (n *NGINXController) Start() {
go n.syncStatus.Run()
}
cmd := exec.Command(n.binary, "-c", cfgPath)
cmd := nginxExecCommand()
// put NGINX in another process group to prevent it
// to receive signals meant for the controller
@ -316,7 +299,7 @@ func (n *NGINXController) Start() {
// release command resources
cmd.Process.Release()
// start a new nginx master process if the controller is not being stopped
cmd = exec.Command(n.binary, "-c", cfgPath)
cmd = nginxExecCommand()
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
@ -361,9 +344,9 @@ func (n *NGINXController) Stop() error {
n.syncStatus.Shutdown()
}
// send stop signal to NGINX
glog.Info("Stopping NGINX process")
cmd := exec.Command(n.binary, "-c", cfgPath, "-s", "quit")
// Send stop signal to Nginx
glog.Info("stopping NGINX process...")
cmd := nginxExecCommand("-s", "quit")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
@ -422,7 +405,7 @@ func (n NGINXController) testTemplate(cfg []byte) error {
if err != nil {
return err
}
out, err := exec.Command(n.binary, "-t", "-c", tmpfile.Name()).CombinedOutput()
out, err := nginxTestCommand(tmpfile.Name()).CombinedOutput()
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
oe := fmt.Sprintf(`
@ -483,17 +466,10 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
n.Proxy.ServerList = servers
}
// we need to check if the status module configuration changed
if cfg.EnableVtsStatus {
n.setupMonitor(vtsStatusModule)
} else {
n.setupMonitor(defaultStatusModule)
}
// NGINX cannot resize the hash tables used to store server names. For
// this reason we check if the current size is correct for the host
// names defined in the Ingress rules and adjust the value if
// necessary.
// NGINX cannot resize the hash tables used to store server names.
// For this reason we check if the defined size defined is correct
// for the FQDN defined in the ingress rules adjusting the value
// if is required.
// https://trac.nginx.org/nginx/ticket/352
// https://trac.nginx.org/nginx/ticket/631
var longestName int
@ -659,7 +635,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
return err
}
o, err := exec.Command(n.binary, "-s", "reload", "-c", cfgPath).CombinedOutput()
o, err := nginxExecCommand("-s", "reload").CombinedOutput()
if err != nil {
return fmt.Errorf("%v\n%v", err, string(o))
}

View file

@ -1,97 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/internal/ingress/controller/metric/collector"
)
const (
ngxStatusPath = "/nginx_status"
ngxVtsPath = "/nginx_status/format/json"
)
func (n *NGINXController) setupMonitor(sm statusModule) {
csm := n.statusModule
if csm != sm {
glog.Infof("changing prometheus collector from %v to %v", csm, sm)
n.stats.stop(csm)
n.stats.start(sm)
n.statusModule = sm
}
}
type statsCollector struct {
process prometheus.Collector
basic collector.Stopable
vts collector.Stopable
namespace string
watchClass string
port int
}
func (s *statsCollector) stop(sm statusModule) {
switch sm {
case defaultStatusModule:
s.basic.Stop()
prometheus.Unregister(s.basic)
case vtsStatusModule:
s.vts.Stop()
prometheus.Unregister(s.vts)
}
}
func (s *statsCollector) start(sm statusModule) {
switch sm {
case defaultStatusModule:
s.basic = collector.NewNginxStatus(s.namespace, s.watchClass, s.port, ngxStatusPath)
prometheus.Register(s.basic)
break
case vtsStatusModule:
s.vts = collector.NewNGINXVTSCollector(s.namespace, s.watchClass, s.port, ngxVtsPath)
prometheus.Register(s.vts)
break
}
}
func newStatsCollector(ns, class, binary string, port int) *statsCollector {
glog.Infof("starting new nginx stats collector for Ingress controller running in namespace %v (class %v)", ns, class)
glog.Infof("collector extracting information from port %v", port)
pc, err := collector.NewNamedProcess(true, collector.BinaryNameMatcher{
Name: "nginx",
Binary: binary,
})
if err != nil {
glog.Fatalf("unexpected error registering nginx collector: %v", err)
}
err = prometheus.Register(pc)
if err != nil {
glog.Fatalf("unexpected error registering nginx collector: %v", err)
}
return &statsCollector{
namespace: ns,
watchClass: class,
process: pc,
port: port,
}
}

View file

@ -17,6 +17,8 @@ limitations under the License.
package controller
import (
"os"
"os/exec"
"syscall"
"github.com/golang/glog"
@ -66,3 +68,28 @@ func sysctlFSFileMax() int {
glog.V(2).Infof("rlimit.max=%v", rLimit.Max)
return int(rLimit.Max)
}
const (
defBinary = "/usr/sbin/nginx"
cfgPath = "/etc/nginx/nginx.conf"
)
func nginxExecCommand(args ...string) *exec.Cmd {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = defBinary
}
cmdArgs := []string{"-c", cfgPath}
cmdArgs = append(cmdArgs, args...)
return exec.Command(ngx, cmdArgs...)
}
func nginxTestCommand(cfg string) *exec.Cmd {
ngx := os.Getenv("NGINX_BINARY")
if ngx == "" {
ngx = defBinary
}
return exec.Command(ngx, "-c", cfg, "-t")
}

View file

@ -0,0 +1,51 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"fmt"
"net"
)
const packetSize = 1024 * 65
// newUDPListener creates a new UDP listener used to process messages
// from the NGINX log phase containing information about a request
func newUDPListener(port int) (*net.UDPConn, error) {
service := fmt.Sprintf("127.0.0.1:%v", port)
udpAddr, err := net.ResolveUDPAddr("udp4", service)
if err != nil {
return nil, err
}
return net.ListenUDP("udp", udpAddr)
}
// handleMessages process packets received in an UDP connection
func handleMessages(conn *net.UDPConn, fn func([]byte)) {
msg := make([]byte, packetSize)
for {
s, _, err := conn.ReadFrom(msg[0:])
if err != nil {
continue
}
fn(msg[0:s])
}
}

View file

@ -0,0 +1,70 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"fmt"
"net"
"sync/atomic"
"testing"
"time"
)
func TestNewUDPLogListener(t *testing.T) {
port := freeUDPPort()
var count uint64
fn := func(message []byte) {
t.Logf("message: %v", string(message))
atomic.AddUint64(&count, 1)
}
t.Logf("UDP Port: %v", port)
l, err := newUDPListener(port)
if err != nil {
t.Errorf("unexpected error creating UDP listener: %v", err)
}
if l == nil {
t.Errorf("expected a listener but none returned")
}
go handleMessages(l, fn)
conn, _ := net.Dial("udp", fmt.Sprintf(":%v", port))
conn.Write([]byte("message"))
conn.Close()
time.Sleep(1 * time.Millisecond)
if count != 1 {
t.Errorf("expected only one message from the UDP listern but %v returned", count)
}
}
func freeUDPPort() int {
l, err := net.ListenUDP("udp", &net.UDPAddr{})
if err != nil {
return 0
}
if err := l.Close(); err != nil {
return 0
}
return l.LocalAddr().(*net.UDPAddr).Port
}

View file

@ -17,16 +17,30 @@ limitations under the License.
package collector
import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
var (
ac = regexp.MustCompile(`Active connections: (\d+)`)
sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`)
reading = regexp.MustCompile(`Reading: (\d+)`)
writing = regexp.MustCompile(`Writing: (\d+)`)
waiting = regexp.MustCompile(`Waiting: (\d+)`)
)
type (
nginxStatusCollector struct {
scrapeChan chan scrapeRequest
ngxHealthPort int
ngxVtsPath string
ngxStatusPath string
data *nginxStatusData
watchNamespace string
ingressClass string
@ -37,15 +51,33 @@ type (
requestsTotal *prometheus.Desc
connections *prometheus.Desc
}
basicStatus struct {
// Active total number of active connections
Active int
// Accepted total number of accepted client connections
Accepted int
// Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit).
Handled int
// Requests total number of client requests.
Requests int
// Reading current number of connections where nginx is reading the request header.
Reading int
// Writing current number of connections where nginx is writing the response back to the client.
Writing int
// Waiting current number of idle client connections waiting for a request.
Waiting int
}
)
// NewNginxStatus returns a new prometheus collector the default nginx status module
func NewNginxStatus(watchNamespace, ingressClass string, ngxHealthPort int, ngxVtsPath string) Stopable {
// InitNGINXStatusCollector returns a new prometheus collector the default nginx status module
func InitNGINXStatusCollector(watchNamespace, ingressClass string, ngxHealthPort int) error {
const ns string = "nginx"
const ngxStatusPath = "/nginx_status"
p := nginxStatusCollector{
scrapeChan: make(chan scrapeRequest),
ngxHealthPort: ngxHealthPort,
ngxVtsPath: ngxVtsPath,
ngxStatusPath: ngxStatusPath,
watchNamespace: watchNamespace,
ingressClass: ingressClass,
}
@ -62,14 +94,20 @@ func NewNginxStatus(watchNamespace, ingressClass string, ngxHealthPort int, ngxV
[]string{"ingress_class", "namespace"}, nil),
connections: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "connnections"),
prometheus.BuildFQName(ns, "", "connections"),
"current number of client connections with state {reading, writing, waiting}",
[]string{"ingress_class", "namespace", "state"}, nil),
}
go p.start()
err := prometheus.Register(p)
return p
if err != nil {
return fmt.Errorf("error while registering nginx status collector : %v", err)
}
go p.Run()
return nil
}
// Describe implements prometheus.Collector.
@ -86,7 +124,7 @@ func (p nginxStatusCollector) Collect(ch chan<- prometheus.Metric) {
<-req.done
}
func (p nginxStatusCollector) start() {
func (p nginxStatusCollector) Run() {
for req := range p.scrapeChan {
ch := req.results
p.scrape(ch)
@ -98,9 +136,71 @@ func (p nginxStatusCollector) Stop() {
close(p.scrapeChan)
}
func httpBody(url string) ([]byte, error) {
resp, err := http.DefaultClient.Get(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx : %v", err)
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode)
}
return data, nil
}
func toInt(data []string, pos int) int {
if len(data) == 0 {
return 0
}
if pos > len(data) {
return 0
}
if v, err := strconv.Atoi(data[pos]); err == nil {
return v
}
return 0
}
func parse(data string) *basicStatus {
acr := ac.FindStringSubmatch(data)
sahrr := sahr.FindStringSubmatch(data)
readingr := reading.FindStringSubmatch(data)
writingr := writing.FindStringSubmatch(data)
waitingr := waiting.FindStringSubmatch(data)
return &basicStatus{
toInt(acr, 1),
toInt(sahrr, 1),
toInt(sahrr, 2),
toInt(sahrr, 3),
toInt(readingr, 1),
toInt(writingr, 1),
toInt(waitingr, 1),
}
}
func getNginxStatus(port int, path string) (*basicStatus, error) {
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
glog.V(3).Infof("start scraping url: %v", url)
data, err := httpBody(url)
if err != nil {
return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err)
}
return parse(string(data)), nil
}
// nginxStatusCollector scrape the nginx status
func (p nginxStatusCollector) scrape(ch chan<- prometheus.Metric) {
s, err := getNginxStatus(p.ngxHealthPort, p.ngxVtsPath)
s, err := getNginxStatus(p.ngxHealthPort, p.ngxStatusPath)
if err != nil {
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
return

View file

@ -26,6 +26,17 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
type scrapeRequest struct {
results chan<- prometheus.Metric
done chan struct{}
}
// Stopable defines a prometheus collector that can be stopped
type Stopable interface {
prometheus.Collector
Stop()
}
// BinaryNameMatcher ...
type BinaryNameMatcher struct {
Name string
@ -60,8 +71,8 @@ type namedProcess struct {
data namedProcessData
}
// NewNamedProcess returns a new prometheus collector for the nginx process
func NewNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) {
// newNamedProcess returns a new prometheus collector for the nginx process
func newNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) {
fs, err := proc.NewFS("/proc")
if err != nil {
return nil, err

View file

@ -0,0 +1,277 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"encoding/json"
"net"
"strings"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
type udpData struct {
Host string `json:"host"` // Label
Status string `json:"status"` // Label
RealIPAddress string `json:"realIpAddr"` // Label
RemoteAddress string `json:"remoteAddr"` // Label
RemoteUser string `json:"remoteUser"` // Label
BytesSent float64 `json:"bytesSent"` // Metric
Protocol string `json:"protocol"` // Label
Method string `json:"method"` // Label
URI string `json:"uri"` // Label
RequestLength float64 `json:"requestLength"` // Metric
RequestTime float64 `json:"requestTime"` // Metric
UpstreamName string `json:"upstreamName"` // Label
UpstreamIP string `json:"upstreamIP"` // Label
UpstreamResponseTime float64 `json:"upstreamResponseTime"` // Metric
UpstreamStatus string `json:"upstreamStatus"` // Label
Namespace string `json:"namespace"` // Label
Ingress string `json:"ingress"` // Label
Service string `json:"service"` // Label
}
// UDPCollector stores prometheus metrics and ingress meta-data
type UDPCollector struct {
upstreamResponseTime *prometheus.HistogramVec
requestTime *prometheus.HistogramVec
requestLength *prometheus.HistogramVec
bytesSent *prometheus.HistogramVec
collectorSuccess *prometheus.GaugeVec
collectorSuccessTime *prometheus.GaugeVec
requests *prometheus.CounterVec
listener *net.UDPConn
ns string
ingressClass string
port int
}
// InitUDPCollector creates a new UDPCollector instance
func InitUDPCollector(ns string, class string, port int) error {
sc := UDPCollector{}
ns = strings.Replace(ns, "-", "_", -1)
listener, err := newUDPListener(port)
if err != nil {
return err
}
sc.listener = listener
sc.ns = ns
sc.ingressClass = class
sc.port = port
requestTags := []string{"host", "status", "remote_address", "real_ip_address", "remote_user", "protocol", "method", "uri", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"}
collectorTags := []string{"namespace", "ingress_class"}
sc.upstreamResponseTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "upstream_response_time_seconds",
Help: "The time spent on receiving the response from the upstream server",
Namespace: ns,
},
requestTags,
)
sc.requestTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_duration_seconds",
Help: "The request processing time in seconds",
Namespace: ns,
},
requestTags,
)
sc.requestLength = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_length_bytes",
Help: "The request length (including request line, header, and request body)",
Namespace: ns,
Buckets: prometheus.LinearBuckets(10, 10, 10), // 10 buckets, each 10 bytes wide.
},
requestTags,
)
sc.requests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "requests",
Help: "The total number of client requests.",
Namespace: ns,
},
collectorTags,
)
sc.bytesSent = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "bytes_sent",
Help: "The the number of bytes sent to a client",
Namespace: ns,
Buckets: prometheus.ExponentialBuckets(10, 10, 7), // 7 buckets, exponential factor of 10.
},
requestTags,
)
sc.collectorSuccess = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "collector_last_run_successful",
Help: "Whether the last collector run was successful (success = 1, failure = 0).",
Namespace: ns,
},
collectorTags,
)
sc.collectorSuccessTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "collector_last_run_successful_timestamp_seconds",
Help: "Timestamp of the last successful collector run",
Namespace: ns,
},
collectorTags,
)
prometheus.MustRegister(sc.upstreamResponseTime)
prometheus.MustRegister(sc.requestTime)
prometheus.MustRegister(sc.requestLength)
prometheus.MustRegister(sc.requests)
prometheus.MustRegister(sc.bytesSent)
prometheus.MustRegister(sc.collectorSuccess)
prometheus.MustRegister(sc.collectorSuccessTime)
go sc.Run()
return nil
}
func (sc *UDPCollector) handleMessage(msg []byte) {
glog.V(5).Infof("msg: %v", string(msg))
collectorSuccess := true
// Unmarshall bytes
var stats udpData
err := json.Unmarshal(msg, &stats)
if err != nil {
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
collectorSuccess = false
return
}
// Create Request Labels Map
requestLabels := prometheus.Labels{
"host": stats.Host,
"status": stats.Status,
"remote_address": stats.RemoteAddress,
"real_ip_address": stats.RealIPAddress,
"remote_user": stats.RemoteUser,
"protocol": stats.Protocol,
"method": stats.Method,
"uri": stats.URI,
"upstream_name": stats.UpstreamName,
"upstream_ip": stats.UpstreamIP,
"upstream_status": stats.UpstreamStatus,
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}
// Create Collector Labels Map
collectorLabels := prometheus.Labels{
"namespace": sc.ns,
"ingress_class": sc.ingressClass,
}
// Emit metrics
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching requests metric: %v", err)
collectorSuccess = false
} else {
requestsMetric.Inc()
}
if stats.UpstreamResponseTime != -1 {
upstreamResponseTimeMetric, err := sc.upstreamResponseTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching upstream response time metric: %v", err)
collectorSuccess = false
} else {
upstreamResponseTimeMetric.Observe(stats.UpstreamResponseTime)
}
}
if stats.RequestTime != -1 {
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request duration metric: %v", err)
collectorSuccess = false
} else {
requestTimeMetric.Observe(stats.RequestTime)
}
}
if stats.RequestLength != -1 {
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request length metric: %v", err)
collectorSuccess = false
} else {
requestLengthMetric.Observe(stats.RequestLength)
}
}
if stats.BytesSent != -1 {
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
collectorSuccess = false
} else {
bytesSentMetric.Observe(stats.BytesSent)
}
}
collectorSuccessMetric, err := sc.collectorSuccess.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching collector success metric: %v", err)
} else {
if collectorSuccess {
collectorSuccessMetric.Set(1)
collectorSuccessTimeMetric, err := sc.collectorSuccessTime.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching collector success time metric: %v", err)
} else {
collectorSuccessTimeMetric.Set(float64(time.Now().Unix()))
}
} else {
collectorSuccessMetric.Set(0)
}
}
}
// Run adds a message handler to a UDP listener
func (sc *UDPCollector) Run() {
handleMessages(sc.listener, sc.handleMessage)
}