From 2cd2da7c3fb406af6bc8181ec944ddc1948eb449 Mon Sep 17 00:00:00 2001 From: Francisco Mejia Date: Wed, 13 Jun 2018 20:55:07 -0400 Subject: [PATCH] Create UDP collector that listens to UDP messages from monitor.lua and exposes them on /metrics endpoint --- cmd/nginx/main.go | 14 + .../controller/metric/collector/scrape.go | 30 -- .../controller/metric/collector/status.go | 225 -------------- .../metric/collector/status_test.go | 72 ----- .../controller/metric/collector/vts.go | 273 ----------------- internal/ingress/controller/nginx.go | 50 +--- internal/ingress/controller/stat_collector.go | 97 ------ internal/ingress/controller/util.go | 27 ++ internal/ingress/metric/collector/listener.go | 51 ++++ .../ingress/metric/collector/listener_test.go | 70 +++++ .../collector/nginx_status_collector.go} | 120 +++++++- .../collector/process_collector.go} | 15 +- .../ingress/metric/collector/udp_collector.go | 277 ++++++++++++++++++ 13 files changed, 575 insertions(+), 746 deletions(-) delete mode 100644 internal/ingress/controller/metric/collector/scrape.go delete mode 100644 internal/ingress/controller/metric/collector/status.go delete mode 100644 internal/ingress/controller/metric/collector/status_test.go delete mode 100644 internal/ingress/controller/metric/collector/vts.go delete mode 100644 internal/ingress/controller/stat_collector.go create mode 100644 internal/ingress/metric/collector/listener.go create mode 100644 internal/ingress/metric/collector/listener_test.go rename internal/ingress/{controller/metric/collector/nginx.go => metric/collector/nginx_status_collector.go} (53%) rename internal/ingress/{controller/metric/collector/process.go => metric/collector/process_collector.go} (92%) create mode 100644 internal/ingress/metric/collector/udp_collector.go diff --git a/cmd/nginx/main.go b/cmd/nginx/main.go index 14b94b904..4d03526cf 100644 --- a/cmd/nginx/main.go +++ b/cmd/nginx/main.go @@ -39,7 +39,9 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/ingress-nginx/internal/file" + "k8s.io/ingress-nginx/internal/ingress/annotations/class" "k8s.io/ingress-nginx/internal/ingress/controller" + "k8s.io/ingress-nginx/internal/ingress/metric/collector" "k8s.io/ingress-nginx/internal/k8s" "k8s.io/ingress-nginx/internal/net/ssl" "k8s.io/ingress-nginx/version" @@ -125,6 +127,18 @@ func main() { mux := http.NewServeMux() go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux) + err = collector.InitNGINXStatusCollector(conf.Namespace, class.IngressClass, conf.ListenPorts.Status) + + if err != nil { + glog.Fatalf("Error generating metric collector: %v", err) + } + + err = collector.InitUDPCollector(conf.Namespace, class.IngressClass, 8000) + + if err != nil { + glog.Fatalf("Error generating UDP collector: %v", err) + } + ngx.Start() } diff --git a/internal/ingress/controller/metric/collector/scrape.go b/internal/ingress/controller/metric/collector/scrape.go deleted file mode 100644 index a078b2859..000000000 --- a/internal/ingress/controller/metric/collector/scrape.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package collector - -import "github.com/prometheus/client_golang/prometheus" - -// Stopable defines a prometheus collector that can be stopped -type Stopable interface { - prometheus.Collector - Stop() -} - -type scrapeRequest struct { - results chan<- prometheus.Metric - done chan struct{} -} diff --git a/internal/ingress/controller/metric/collector/status.go b/internal/ingress/controller/metric/collector/status.go deleted file mode 100644 index e195b045d..000000000 --- a/internal/ingress/controller/metric/collector/status.go +++ /dev/null @@ -1,225 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package collector - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "regexp" - "strconv" - - "github.com/golang/glog" -) - -var ( - ac = regexp.MustCompile(`Active connections: (\d+)`) - sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`) - reading = regexp.MustCompile(`Reading: (\d+)`) - writing = regexp.MustCompile(`Writing: (\d+)`) - waiting = regexp.MustCompile(`Waiting: (\d+)`) -) - -type basicStatus struct { - // Active total number of active connections - Active int - // Accepted total number of accepted client connections - Accepted int - // Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit). - Handled int - // Requests total number of client requests. - Requests int - // Reading current number of connections where nginx is reading the request header. - Reading int - // Writing current number of connections where nginx is writing the response back to the client. - Writing int - // Waiting current number of idle client connections waiting for a request. - Waiting int -} - -// https://github.com/vozlt/nginx-module-vts -type vts struct { - NginxVersion string `json:"nginxVersion"` - LoadMsec int `json:"loadMsec"` - NowMsec int `json:"nowMsec"` - // Total connections and requests(same as stub_status_module in NGINX) - Connections connections `json:"connections"` - // Traffic(in/out) and request and response counts and cache hit ratio per each server zone - ServerZones map[string]serverZone `json:"serverZones"` - // Traffic(in/out) and request and response counts and cache hit ratio per each server zone filtered through - // the vhost_traffic_status_filter_by_set_key directive - FilterZones map[string]map[string]filterZone `json:"filterZones"` - // Traffic(in/out) and request and response counts per server in each upstream group - UpstreamZones map[string][]upstreamZone `json:"upstreamZones"` -} - -type serverZone struct { - RequestCounter float64 `json:"requestCounter"` - InBytes float64 `json:"inBytes"` - OutBytes float64 `json:"outBytes"` - Responses response `json:"responses"` - Cache cache `json:"cache"` -} - -type filterZone struct { - RequestCounter float64 `json:"requestCounter"` - InBytes float64 `json:"inBytes"` - OutBytes float64 `json:"outBytes"` - Cache cache `json:"cache"` - Responses response `json:"responses"` -} - -type upstreamZone struct { - Responses response `json:"responses"` - Server string `json:"server"` - RequestCounter float64 `json:"requestCounter"` - InBytes float64 `json:"inBytes"` - OutBytes float64 `json:"outBytes"` - ResponseMsec float64 `json:"responseMsec"` - Weight float64 `json:"weight"` - MaxFails float64 `json:"maxFails"` - FailTimeout float64 `json:"failTimeout"` - Backup BoolToFloat64 `json:"backup"` - Down BoolToFloat64 `json:"down"` -} - -type cache struct { - Miss float64 `json:"miss"` - Bypass float64 `json:"bypass"` - Expired float64 `json:"expired"` - Stale float64 `json:"stale"` - Updating float64 `json:"updating"` - Revalidated float64 `json:"revalidated"` - Hit float64 `json:"hit"` - Scarce float64 `json:"scarce"` -} - -type response struct { - OneXx float64 `json:"1xx"` - TwoXx float64 `json:"2xx"` - TheeXx float64 `json:"3xx"` - FourXx float64 `json:"4xx"` - FiveXx float64 `json:"5xx"` -} - -type connections struct { - Active float64 `json:"active"` - Reading float64 `json:"reading"` - Writing float64 `json:"writing"` - Waiting float64 `json:"waiting"` - Accepted float64 `json:"accepted"` - Handled float64 `json:"handled"` - Requests float64 `json:"requests"` -} - -// BoolToFloat64 ... -type BoolToFloat64 float64 - -// UnmarshalJSON ... -func (bit BoolToFloat64) UnmarshalJSON(data []byte) error { - asString := string(data) - if asString == "1" || asString == "true" { - bit = 1 - } else if asString == "0" || asString == "false" { - bit = 0 - } else { - return fmt.Errorf(fmt.Sprintf("boolean unmarshal error: invalid input %s", asString)) - } - return nil -} - -func getNginxStatus(port int, path string) (*basicStatus, error) { - url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path) - glog.V(3).Infof("start scraping url: %v", url) - - data, err := httpBody(url) - - if err != nil { - return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err) - } - - return parse(string(data)), nil -} - -func httpBody(url string) ([]byte, error) { - resp, err := http.DefaultClient.Get(url) - if err != nil { - return nil, fmt.Errorf("unexpected error scraping nginx : %v", err) - } - - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err) - } - defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 400 { - return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode) - } - - return data, nil -} - -func getNginxVtsMetrics(port int, path string) (*vts, error) { - url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path) - glog.V(3).Infof("start scraping url: %v", url) - - data, err := httpBody(url) - - if err != nil { - return nil, fmt.Errorf("unexpected error scraping nginx vts (%v)", err) - } - - var vts *vts - err = json.Unmarshal(data, &vts) - if err != nil { - return nil, fmt.Errorf("unexpected error json unmarshal (%v)", err) - } - glog.V(3).Infof("scrape returned : %v", vts) - return vts, nil -} - -func parse(data string) *basicStatus { - acr := ac.FindStringSubmatch(data) - sahrr := sahr.FindStringSubmatch(data) - readingr := reading.FindStringSubmatch(data) - writingr := writing.FindStringSubmatch(data) - waitingr := waiting.FindStringSubmatch(data) - - return &basicStatus{ - toInt(acr, 1), - toInt(sahrr, 1), - toInt(sahrr, 2), - toInt(sahrr, 3), - toInt(readingr, 1), - toInt(writingr, 1), - toInt(waitingr, 1), - } -} - -func toInt(data []string, pos int) int { - if len(data) == 0 { - return 0 - } - if pos > len(data) { - return 0 - } - if v, err := strconv.Atoi(data[pos]); err == nil { - return v - } - return 0 -} diff --git a/internal/ingress/controller/metric/collector/status_test.go b/internal/ingress/controller/metric/collector/status_test.go deleted file mode 100644 index 5d3075dae..000000000 --- a/internal/ingress/controller/metric/collector/status_test.go +++ /dev/null @@ -1,72 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package collector - -import ( - "testing" - - "github.com/kylelemons/godebug/pretty" -) - -func TestParseStatus(t *testing.T) { - tests := []struct { - in string - out *basicStatus - }{ - {`Active connections: 43 -server accepts handled requests - 7368 7368 10993 -Reading: 0 Writing: 5 Waiting: 38`, - &basicStatus{43, 7368, 7368, 10993, 0, 5, 38}, - }, - {`Active connections: 0 -server accepts handled requests - 1 7 0 -Reading: A Writing: B Waiting: 38`, - &basicStatus{0, 1, 7, 0, 0, 0, 38}, - }, - } - - for _, test := range tests { - r := parse(test.in) - if diff := pretty.Compare(r, test.out); diff != "" { - t.Logf("%v", diff) - t.Fatalf("expected %v but returned %v", test.out, r) - } - } -} - -func TestToint(t *testing.T) { - tests := []struct { - in []string - pos int - exp int - }{ - {[]string{}, 0, 0}, - {[]string{}, 1, 0}, - {[]string{"A"}, 0, 0}, - {[]string{"1"}, 0, 1}, - {[]string{"a", "2"}, 1, 2}, - } - - for _, test := range tests { - v := toInt(test.in, test.pos) - if v != test.exp { - t.Fatalf("expected %v but returned %v", test.exp, v) - } - } -} diff --git a/internal/ingress/controller/metric/collector/vts.go b/internal/ingress/controller/metric/collector/vts.go deleted file mode 100644 index 33eeac492..000000000 --- a/internal/ingress/controller/metric/collector/vts.go +++ /dev/null @@ -1,273 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package collector - -import ( - "reflect" - - "github.com/golang/glog" - - "github.com/prometheus/client_golang/prometheus" -) - -const ns = "nginx" - -type ( - vtsCollector struct { - scrapeChan chan scrapeRequest - port int - path string - data *vtsData - watchNamespace string - ingressClass string - } - - vtsData struct { - bytes *prometheus.Desc - cache *prometheus.Desc - connections *prometheus.Desc - responses *prometheus.Desc - requests *prometheus.Desc - filterZoneBytes *prometheus.Desc - filterZoneResponses *prometheus.Desc - filterZoneCache *prometheus.Desc - upstreamBackup *prometheus.Desc - upstreamBytes *prometheus.Desc - upstreamDown *prometheus.Desc - upstreamFailTimeout *prometheus.Desc - upstreamMaxFails *prometheus.Desc - upstreamResponses *prometheus.Desc - upstreamRequests *prometheus.Desc - upstreamResponseMsec *prometheus.Desc - upstreamWeight *prometheus.Desc - } -) - -// NewNGINXVTSCollector returns a new prometheus collector for the VTS module -func NewNGINXVTSCollector(watchNamespace, ingressClass string, port int, path string) Stopable { - - p := vtsCollector{ - scrapeChan: make(chan scrapeRequest), - port: port, - path: path, - watchNamespace: watchNamespace, - ingressClass: ingressClass, - } - - p.data = &vtsData{ - bytes: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "bytes_total"), - "Nginx bytes count", - []string{"ingress_class", "namespace", "server_zone", "direction"}, nil), - - cache: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "cache_total"), - "Nginx cache count", - []string{"ingress_class", "namespace", "server_zone", "type"}, nil), - - connections: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "connections_total"), - "Nginx connections count", - []string{"ingress_class", "namespace", "type"}, nil), - - responses: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "responses_total"), - "The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.", - []string{"ingress_class", "namespace", "server_zone", "status_code"}, nil), - - requests: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "requests_total"), - "The total number of requested client connections.", - []string{"ingress_class", "namespace", "server_zone"}, nil), - - filterZoneBytes: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "filterzone_bytes_total"), - "Nginx bytes count", - []string{"ingress_class", "namespace", "server_zone", "key", "direction"}, nil), - - filterZoneResponses: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "filterzone_responses_total"), - "The number of responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.", - []string{"ingress_class", "namespace", "server_zone", "key", "status_code"}, nil), - - filterZoneCache: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "filterzone_cache_total"), - "Nginx cache count", - []string{"ingress_class", "namespace", "server_zone", "key", "type"}, nil), - - upstreamBackup: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "upstream_backup"), - "Current backup setting of the server.", - []string{"ingress_class", "namespace", "upstream", "server"}, nil), - - upstreamBytes: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "upstream_bytes_total"), - "The total number of bytes sent to this server.", - []string{"ingress_class", "namespace", "upstream", "server", "direction"}, nil), - - upstreamDown: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "vts_upstream_down_total"), - "Current down setting of the server.", - []string{"ingress_class", "namespace", "upstream", "server"}, nil), - - upstreamFailTimeout: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "upstream_fail_timeout"), - "Current fail_timeout setting of the server.", - []string{"ingress_class", "namespace", "upstream", "server"}, nil), - - upstreamMaxFails: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "upstream_maxfails"), - "Current max_fails setting of the server.", - []string{"ingress_class", "namespace", "upstream", "server"}, nil), - - upstreamResponses: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "upstream_responses_total"), - "The number of upstream responses with status codes 1xx, 2xx, 3xx, 4xx, and 5xx.", - []string{"ingress_class", "namespace", "upstream", "server", "status_code"}, nil), - - upstreamRequests: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "upstream_requests_total"), - "The total number of client connections forwarded to this server.", - []string{"ingress_class", "namespace", "upstream", "server"}, nil), - - upstreamResponseMsec: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "upstream_response_msecs_avg"), - "The average of only upstream response processing times in milliseconds.", - []string{"ingress_class", "namespace", "upstream", "server"}, nil), - - upstreamWeight: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "upstream_weight"), - "Current upstream weight setting of the server.", - []string{"ingress_class", "namespace", "upstream", "server"}, nil), - } - - go p.start() - - return p -} - -// Describe implements prometheus.Collector. -func (p vtsCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- p.data.bytes - ch <- p.data.cache - ch <- p.data.connections - ch <- p.data.requests - ch <- p.data.responses - ch <- p.data.upstreamBackup - ch <- p.data.upstreamBytes - ch <- p.data.upstreamDown - ch <- p.data.upstreamFailTimeout - ch <- p.data.upstreamMaxFails - ch <- p.data.upstreamRequests - ch <- p.data.upstreamResponseMsec - ch <- p.data.upstreamResponses - ch <- p.data.upstreamWeight - ch <- p.data.filterZoneBytes - ch <- p.data.filterZoneCache - ch <- p.data.filterZoneResponses -} - -// Collect implements prometheus.Collector. -func (p vtsCollector) Collect(ch chan<- prometheus.Metric) { - req := scrapeRequest{results: ch, done: make(chan struct{})} - p.scrapeChan <- req - <-req.done -} - -func (p vtsCollector) start() { - for req := range p.scrapeChan { - ch := req.results - p.scrapeVts(ch) - req.done <- struct{}{} - } -} - -func (p vtsCollector) Stop() { - close(p.scrapeChan) -} - -// scrapeVts scrape nginx vts metrics -func (p vtsCollector) scrapeVts(ch chan<- prometheus.Metric) { - nginxMetrics, err := getNginxVtsMetrics(p.port, p.path) - if err != nil { - glog.Warningf("unexpected error obtaining nginx status info: %v", err) - return - } - - reflectMetrics(&nginxMetrics.Connections, p.data.connections, ch, p.ingressClass, p.watchNamespace) - - for name, zones := range nginxMetrics.UpstreamZones { - for pos, value := range zones { - reflectMetrics(&zones[pos].Responses, p.data.upstreamResponses, ch, p.ingressClass, p.watchNamespace, name, value.Server) - - ch <- prometheus.MustNewConstMetric(p.data.upstreamRequests, - prometheus.CounterValue, zones[pos].RequestCounter, p.ingressClass, p.watchNamespace, name, value.Server) - ch <- prometheus.MustNewConstMetric(p.data.upstreamDown, - prometheus.CounterValue, float64(zones[pos].Down), p.ingressClass, p.watchNamespace, name, value.Server) - ch <- prometheus.MustNewConstMetric(p.data.upstreamWeight, - prometheus.CounterValue, zones[pos].Weight, p.ingressClass, p.watchNamespace, name, value.Server) - ch <- prometheus.MustNewConstMetric(p.data.upstreamResponseMsec, - prometheus.CounterValue, zones[pos].ResponseMsec, p.ingressClass, p.watchNamespace, name, value.Server) - ch <- prometheus.MustNewConstMetric(p.data.upstreamBackup, - prometheus.CounterValue, float64(zones[pos].Backup), p.ingressClass, p.watchNamespace, name, value.Server) - ch <- prometheus.MustNewConstMetric(p.data.upstreamFailTimeout, - prometheus.CounterValue, zones[pos].FailTimeout, p.ingressClass, p.watchNamespace, name, value.Server) - ch <- prometheus.MustNewConstMetric(p.data.upstreamMaxFails, - prometheus.CounterValue, zones[pos].MaxFails, p.ingressClass, p.watchNamespace, name, value.Server) - ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes, - prometheus.CounterValue, zones[pos].InBytes, p.ingressClass, p.watchNamespace, name, value.Server, "in") - ch <- prometheus.MustNewConstMetric(p.data.upstreamBytes, - prometheus.CounterValue, zones[pos].OutBytes, p.ingressClass, p.watchNamespace, name, value.Server, "out") - } - } - - for name, zone := range nginxMetrics.ServerZones { - reflectMetrics(&zone.Responses, p.data.responses, ch, p.ingressClass, p.watchNamespace, name) - reflectMetrics(&zone.Cache, p.data.cache, ch, p.ingressClass, p.watchNamespace, name) - - ch <- prometheus.MustNewConstMetric(p.data.requests, - prometheus.CounterValue, zone.RequestCounter, p.ingressClass, p.watchNamespace, name) - ch <- prometheus.MustNewConstMetric(p.data.bytes, - prometheus.CounterValue, zone.InBytes, p.ingressClass, p.watchNamespace, name, "in") - ch <- prometheus.MustNewConstMetric(p.data.bytes, - prometheus.CounterValue, zone.OutBytes, p.ingressClass, p.watchNamespace, name, "out") - } - - for serverZone, keys := range nginxMetrics.FilterZones { - for name, zone := range keys { - reflectMetrics(&zone.Responses, p.data.filterZoneResponses, ch, p.ingressClass, p.watchNamespace, serverZone, name) - reflectMetrics(&zone.Cache, p.data.filterZoneCache, ch, p.ingressClass, p.watchNamespace, serverZone, name) - - ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes, - prometheus.CounterValue, zone.InBytes, p.ingressClass, p.watchNamespace, serverZone, name, "in") - ch <- prometheus.MustNewConstMetric(p.data.filterZoneBytes, - prometheus.CounterValue, zone.OutBytes, p.ingressClass, p.watchNamespace, serverZone, name, "out") - } - } -} - -func reflectMetrics(value interface{}, desc *prometheus.Desc, ch chan<- prometheus.Metric, labels ...string) { - val := reflect.ValueOf(value).Elem() - - for i := 0; i < val.NumField(); i++ { - tag := val.Type().Field(i).Tag - l := append(labels, tag.Get("json")) - ch <- prometheus.MustNewConstMetric(desc, - prometheus.CounterValue, val.Field(i).Interface().(float64), - l...) - } -} diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 268ae88e7..1681022d7 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -65,24 +65,14 @@ type statusModule string const ( ngxHealthPath = "/healthz" - - defaultStatusModule statusModule = "default" - vtsStatusModule statusModule = "vts" ) var ( - tmplPath = "/etc/nginx/template/nginx.tmpl" - cfgPath = "/etc/nginx/nginx.conf" - nginxBinary = "/usr/sbin/nginx" + tmplPath = "/etc/nginx/template/nginx.tmpl" ) // NewNGINXController creates a new NGINX Ingress controller. func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController { - ngx := os.Getenv("NGINX_BINARY") - if ngx == "" { - ngx = nginxBinary - } - eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ @@ -95,8 +85,6 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl } n := &NGINXController{ - binary: ngx, - isIPV6Enabled: ing_net.IsIPv6Enabled(), resolver: h, @@ -131,8 +119,6 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl fs, n.updateCh) - n.stats = newStatsCollector(config.Namespace, class.IngressClass, n.binary, n.cfg.ListenPorts.Status) - n.syncQueue = task.NewTaskQueue(n.syncIngress) n.annotations = annotations.NewAnnotationExtractor(n.store) @@ -252,12 +238,9 @@ type NGINXController struct { t *ngx_template.Template - binary string resolver []net.IP - stats *statsCollector - statusModule statusModule - + // returns true if IPV6 is enabled in the pod isIPV6Enabled bool isShuttingDown bool @@ -279,7 +262,7 @@ func (n *NGINXController) Start() { go n.syncStatus.Run() } - cmd := exec.Command(n.binary, "-c", cfgPath) + cmd := nginxExecCommand() // put NGINX in another process group to prevent it // to receive signals meant for the controller @@ -316,7 +299,7 @@ func (n *NGINXController) Start() { // release command resources cmd.Process.Release() // start a new nginx master process if the controller is not being stopped - cmd = exec.Command(n.binary, "-c", cfgPath) + cmd = nginxExecCommand() cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, Pgid: 0, @@ -361,9 +344,9 @@ func (n *NGINXController) Stop() error { n.syncStatus.Shutdown() } - // send stop signal to NGINX - glog.Info("Stopping NGINX process") - cmd := exec.Command(n.binary, "-c", cfgPath, "-s", "quit") + // Send stop signal to Nginx + glog.Info("stopping NGINX process...") + cmd := nginxExecCommand("-s", "quit") cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr err := cmd.Run() @@ -422,7 +405,7 @@ func (n NGINXController) testTemplate(cfg []byte) error { if err != nil { return err } - out, err := exec.Command(n.binary, "-t", "-c", tmpfile.Name()).CombinedOutput() + out, err := nginxTestCommand(tmpfile.Name()).CombinedOutput() if err != nil { // this error is different from the rest because it must be clear why nginx is not working oe := fmt.Sprintf(` @@ -483,17 +466,10 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { n.Proxy.ServerList = servers } - // we need to check if the status module configuration changed - if cfg.EnableVtsStatus { - n.setupMonitor(vtsStatusModule) - } else { - n.setupMonitor(defaultStatusModule) - } - - // NGINX cannot resize the hash tables used to store server names. For - // this reason we check if the current size is correct for the host - // names defined in the Ingress rules and adjust the value if - // necessary. + // NGINX cannot resize the hash tables used to store server names. + // For this reason we check if the defined size defined is correct + // for the FQDN defined in the ingress rules adjusting the value + // if is required. // https://trac.nginx.org/nginx/ticket/352 // https://trac.nginx.org/nginx/ticket/631 var longestName int @@ -659,7 +635,7 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { return err } - o, err := exec.Command(n.binary, "-s", "reload", "-c", cfgPath).CombinedOutput() + o, err := nginxExecCommand("-s", "reload").CombinedOutput() if err != nil { return fmt.Errorf("%v\n%v", err, string(o)) } diff --git a/internal/ingress/controller/stat_collector.go b/internal/ingress/controller/stat_collector.go deleted file mode 100644 index ad3434d15..000000000 --- a/internal/ingress/controller/stat_collector.go +++ /dev/null @@ -1,97 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus" - - "k8s.io/ingress-nginx/internal/ingress/controller/metric/collector" -) - -const ( - ngxStatusPath = "/nginx_status" - ngxVtsPath = "/nginx_status/format/json" -) - -func (n *NGINXController) setupMonitor(sm statusModule) { - csm := n.statusModule - if csm != sm { - glog.Infof("changing prometheus collector from %v to %v", csm, sm) - n.stats.stop(csm) - n.stats.start(sm) - n.statusModule = sm - } -} - -type statsCollector struct { - process prometheus.Collector - basic collector.Stopable - vts collector.Stopable - - namespace string - watchClass string - - port int -} - -func (s *statsCollector) stop(sm statusModule) { - switch sm { - case defaultStatusModule: - s.basic.Stop() - prometheus.Unregister(s.basic) - case vtsStatusModule: - s.vts.Stop() - prometheus.Unregister(s.vts) - } -} - -func (s *statsCollector) start(sm statusModule) { - switch sm { - case defaultStatusModule: - s.basic = collector.NewNginxStatus(s.namespace, s.watchClass, s.port, ngxStatusPath) - prometheus.Register(s.basic) - break - case vtsStatusModule: - s.vts = collector.NewNGINXVTSCollector(s.namespace, s.watchClass, s.port, ngxVtsPath) - prometheus.Register(s.vts) - break - } -} - -func newStatsCollector(ns, class, binary string, port int) *statsCollector { - glog.Infof("starting new nginx stats collector for Ingress controller running in namespace %v (class %v)", ns, class) - glog.Infof("collector extracting information from port %v", port) - pc, err := collector.NewNamedProcess(true, collector.BinaryNameMatcher{ - Name: "nginx", - Binary: binary, - }) - if err != nil { - glog.Fatalf("unexpected error registering nginx collector: %v", err) - } - err = prometheus.Register(pc) - if err != nil { - glog.Fatalf("unexpected error registering nginx collector: %v", err) - } - - return &statsCollector{ - namespace: ns, - watchClass: class, - process: pc, - port: port, - } -} diff --git a/internal/ingress/controller/util.go b/internal/ingress/controller/util.go index 5984b2fc3..28005bd84 100644 --- a/internal/ingress/controller/util.go +++ b/internal/ingress/controller/util.go @@ -17,6 +17,8 @@ limitations under the License. package controller import ( + "os" + "os/exec" "syscall" "github.com/golang/glog" @@ -66,3 +68,28 @@ func sysctlFSFileMax() int { glog.V(2).Infof("rlimit.max=%v", rLimit.Max) return int(rLimit.Max) } + +const ( + defBinary = "/usr/sbin/nginx" + cfgPath = "/etc/nginx/nginx.conf" +) + +func nginxExecCommand(args ...string) *exec.Cmd { + ngx := os.Getenv("NGINX_BINARY") + if ngx == "" { + ngx = defBinary + } + + cmdArgs := []string{"-c", cfgPath} + cmdArgs = append(cmdArgs, args...) + return exec.Command(ngx, cmdArgs...) +} + +func nginxTestCommand(cfg string) *exec.Cmd { + ngx := os.Getenv("NGINX_BINARY") + if ngx == "" { + ngx = defBinary + } + + return exec.Command(ngx, "-c", cfg, "-t") +} diff --git a/internal/ingress/metric/collector/listener.go b/internal/ingress/metric/collector/listener.go new file mode 100644 index 000000000..ce985db2c --- /dev/null +++ b/internal/ingress/metric/collector/listener.go @@ -0,0 +1,51 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package collector + +import ( + "fmt" + "net" +) + +const packetSize = 1024 * 65 + +// newUDPListener creates a new UDP listener used to process messages +// from the NGINX log phase containing information about a request +func newUDPListener(port int) (*net.UDPConn, error) { + service := fmt.Sprintf("127.0.0.1:%v", port) + + udpAddr, err := net.ResolveUDPAddr("udp4", service) + if err != nil { + return nil, err + } + + return net.ListenUDP("udp", udpAddr) +} + +// handleMessages process packets received in an UDP connection +func handleMessages(conn *net.UDPConn, fn func([]byte)) { + msg := make([]byte, packetSize) + + for { + s, _, err := conn.ReadFrom(msg[0:]) + if err != nil { + continue + } + + fn(msg[0:s]) + } +} diff --git a/internal/ingress/metric/collector/listener_test.go b/internal/ingress/metric/collector/listener_test.go new file mode 100644 index 000000000..6c159181c --- /dev/null +++ b/internal/ingress/metric/collector/listener_test.go @@ -0,0 +1,70 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package collector + +import ( + "fmt" + "net" + "sync/atomic" + "testing" + "time" +) + +func TestNewUDPLogListener(t *testing.T) { + port := freeUDPPort() + + var count uint64 + + fn := func(message []byte) { + t.Logf("message: %v", string(message)) + atomic.AddUint64(&count, 1) + } + + t.Logf("UDP Port: %v", port) + + l, err := newUDPListener(port) + if err != nil { + t.Errorf("unexpected error creating UDP listener: %v", err) + } + if l == nil { + t.Errorf("expected a listener but none returned") + } + + go handleMessages(l, fn) + + conn, _ := net.Dial("udp", fmt.Sprintf(":%v", port)) + conn.Write([]byte("message")) + conn.Close() + + time.Sleep(1 * time.Millisecond) + if count != 1 { + t.Errorf("expected only one message from the UDP listern but %v returned", count) + } +} + +func freeUDPPort() int { + l, err := net.ListenUDP("udp", &net.UDPAddr{}) + if err != nil { + return 0 + } + + if err := l.Close(); err != nil { + return 0 + } + + return l.LocalAddr().(*net.UDPAddr).Port +} diff --git a/internal/ingress/controller/metric/collector/nginx.go b/internal/ingress/metric/collector/nginx_status_collector.go similarity index 53% rename from internal/ingress/controller/metric/collector/nginx.go rename to internal/ingress/metric/collector/nginx_status_collector.go index 74d47a357..156dc3e70 100644 --- a/internal/ingress/controller/metric/collector/nginx.go +++ b/internal/ingress/metric/collector/nginx_status_collector.go @@ -17,16 +17,30 @@ limitations under the License. package collector import ( + "fmt" + "io/ioutil" + "net/http" + "regexp" + "strconv" + "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" ) +var ( + ac = regexp.MustCompile(`Active connections: (\d+)`) + sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`) + reading = regexp.MustCompile(`Reading: (\d+)`) + writing = regexp.MustCompile(`Writing: (\d+)`) + waiting = regexp.MustCompile(`Waiting: (\d+)`) +) + type ( nginxStatusCollector struct { scrapeChan chan scrapeRequest ngxHealthPort int - ngxVtsPath string + ngxStatusPath string data *nginxStatusData watchNamespace string ingressClass string @@ -37,15 +51,33 @@ type ( requestsTotal *prometheus.Desc connections *prometheus.Desc } + + basicStatus struct { + // Active total number of active connections + Active int + // Accepted total number of accepted client connections + Accepted int + // Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit). + Handled int + // Requests total number of client requests. + Requests int + // Reading current number of connections where nginx is reading the request header. + Reading int + // Writing current number of connections where nginx is writing the response back to the client. + Writing int + // Waiting current number of idle client connections waiting for a request. + Waiting int + } ) -// NewNginxStatus returns a new prometheus collector the default nginx status module -func NewNginxStatus(watchNamespace, ingressClass string, ngxHealthPort int, ngxVtsPath string) Stopable { - +// InitNGINXStatusCollector returns a new prometheus collector the default nginx status module +func InitNGINXStatusCollector(watchNamespace, ingressClass string, ngxHealthPort int) error { + const ns string = "nginx" + const ngxStatusPath = "/nginx_status" p := nginxStatusCollector{ scrapeChan: make(chan scrapeRequest), ngxHealthPort: ngxHealthPort, - ngxVtsPath: ngxVtsPath, + ngxStatusPath: ngxStatusPath, watchNamespace: watchNamespace, ingressClass: ingressClass, } @@ -62,14 +94,20 @@ func NewNginxStatus(watchNamespace, ingressClass string, ngxHealthPort int, ngxV []string{"ingress_class", "namespace"}, nil), connections: prometheus.NewDesc( - prometheus.BuildFQName(ns, "", "connnections"), + prometheus.BuildFQName(ns, "", "connections"), "current number of client connections with state {reading, writing, waiting}", []string{"ingress_class", "namespace", "state"}, nil), } - go p.start() + err := prometheus.Register(p) - return p + if err != nil { + return fmt.Errorf("error while registering nginx status collector : %v", err) + } + + go p.Run() + + return nil } // Describe implements prometheus.Collector. @@ -86,7 +124,7 @@ func (p nginxStatusCollector) Collect(ch chan<- prometheus.Metric) { <-req.done } -func (p nginxStatusCollector) start() { +func (p nginxStatusCollector) Run() { for req := range p.scrapeChan { ch := req.results p.scrape(ch) @@ -98,9 +136,71 @@ func (p nginxStatusCollector) Stop() { close(p.scrapeChan) } +func httpBody(url string) ([]byte, error) { + resp, err := http.DefaultClient.Get(url) + if err != nil { + return nil, fmt.Errorf("unexpected error scraping nginx : %v", err) + } + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 400 { + return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode) + } + + return data, nil +} + +func toInt(data []string, pos int) int { + if len(data) == 0 { + return 0 + } + if pos > len(data) { + return 0 + } + if v, err := strconv.Atoi(data[pos]); err == nil { + return v + } + return 0 +} + +func parse(data string) *basicStatus { + acr := ac.FindStringSubmatch(data) + sahrr := sahr.FindStringSubmatch(data) + readingr := reading.FindStringSubmatch(data) + writingr := writing.FindStringSubmatch(data) + waitingr := waiting.FindStringSubmatch(data) + + return &basicStatus{ + toInt(acr, 1), + toInt(sahrr, 1), + toInt(sahrr, 2), + toInt(sahrr, 3), + toInt(readingr, 1), + toInt(writingr, 1), + toInt(waitingr, 1), + } +} + +func getNginxStatus(port int, path string) (*basicStatus, error) { + url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path) + glog.V(3).Infof("start scraping url: %v", url) + + data, err := httpBody(url) + + if err != nil { + return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err) + } + + return parse(string(data)), nil +} + // nginxStatusCollector scrape the nginx status func (p nginxStatusCollector) scrape(ch chan<- prometheus.Metric) { - s, err := getNginxStatus(p.ngxHealthPort, p.ngxVtsPath) + s, err := getNginxStatus(p.ngxHealthPort, p.ngxStatusPath) if err != nil { glog.Warningf("unexpected error obtaining nginx status info: %v", err) return diff --git a/internal/ingress/controller/metric/collector/process.go b/internal/ingress/metric/collector/process_collector.go similarity index 92% rename from internal/ingress/controller/metric/collector/process.go rename to internal/ingress/metric/collector/process_collector.go index 016798525..679b9149f 100644 --- a/internal/ingress/controller/metric/collector/process.go +++ b/internal/ingress/metric/collector/process_collector.go @@ -26,6 +26,17 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +type scrapeRequest struct { + results chan<- prometheus.Metric + done chan struct{} +} + +// Stopable defines a prometheus collector that can be stopped +type Stopable interface { + prometheus.Collector + Stop() +} + // BinaryNameMatcher ... type BinaryNameMatcher struct { Name string @@ -60,8 +71,8 @@ type namedProcess struct { data namedProcessData } -// NewNamedProcess returns a new prometheus collector for the nginx process -func NewNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) { +// newNamedProcess returns a new prometheus collector for the nginx process +func newNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) { fs, err := proc.NewFS("/proc") if err != nil { return nil, err diff --git a/internal/ingress/metric/collector/udp_collector.go b/internal/ingress/metric/collector/udp_collector.go new file mode 100644 index 000000000..69af70b33 --- /dev/null +++ b/internal/ingress/metric/collector/udp_collector.go @@ -0,0 +1,277 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package collector + +import ( + "encoding/json" + "net" + "strings" + "time" + + "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" +) + +type udpData struct { + Host string `json:"host"` // Label + Status string `json:"status"` // Label + + RealIPAddress string `json:"realIpAddr"` // Label + RemoteAddress string `json:"remoteAddr"` // Label + RemoteUser string `json:"remoteUser"` // Label + + BytesSent float64 `json:"bytesSent"` // Metric + + Protocol string `json:"protocol"` // Label + Method string `json:"method"` // Label + URI string `json:"uri"` // Label + + RequestLength float64 `json:"requestLength"` // Metric + RequestTime float64 `json:"requestTime"` // Metric + + UpstreamName string `json:"upstreamName"` // Label + UpstreamIP string `json:"upstreamIP"` // Label + UpstreamResponseTime float64 `json:"upstreamResponseTime"` // Metric + UpstreamStatus string `json:"upstreamStatus"` // Label + + Namespace string `json:"namespace"` // Label + Ingress string `json:"ingress"` // Label + Service string `json:"service"` // Label +} + +// UDPCollector stores prometheus metrics and ingress meta-data +type UDPCollector struct { + upstreamResponseTime *prometheus.HistogramVec + requestTime *prometheus.HistogramVec + requestLength *prometheus.HistogramVec + bytesSent *prometheus.HistogramVec + collectorSuccess *prometheus.GaugeVec + collectorSuccessTime *prometheus.GaugeVec + requests *prometheus.CounterVec + listener *net.UDPConn + ns string + ingressClass string + port int +} + +// InitUDPCollector creates a new UDPCollector instance +func InitUDPCollector(ns string, class string, port int) error { + sc := UDPCollector{} + + ns = strings.Replace(ns, "-", "_", -1) + + listener, err := newUDPListener(port) + + if err != nil { + return err + } + + sc.listener = listener + sc.ns = ns + sc.ingressClass = class + sc.port = port + + requestTags := []string{"host", "status", "remote_address", "real_ip_address", "remote_user", "protocol", "method", "uri", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"} + collectorTags := []string{"namespace", "ingress_class"} + + sc.upstreamResponseTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "upstream_response_time_seconds", + Help: "The time spent on receiving the response from the upstream server", + Namespace: ns, + }, + requestTags, + ) + + sc.requestTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "request_duration_seconds", + Help: "The request processing time in seconds", + Namespace: ns, + }, + requestTags, + ) + + sc.requestLength = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "request_length_bytes", + Help: "The request length (including request line, header, and request body)", + Namespace: ns, + Buckets: prometheus.LinearBuckets(10, 10, 10), // 10 buckets, each 10 bytes wide. + }, + requestTags, + ) + + sc.requests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "requests", + Help: "The total number of client requests.", + Namespace: ns, + }, + collectorTags, + ) + + sc.bytesSent = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "bytes_sent", + Help: "The the number of bytes sent to a client", + Namespace: ns, + Buckets: prometheus.ExponentialBuckets(10, 10, 7), // 7 buckets, exponential factor of 10. + }, + requestTags, + ) + + sc.collectorSuccess = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "collector_last_run_successful", + Help: "Whether the last collector run was successful (success = 1, failure = 0).", + Namespace: ns, + }, + collectorTags, + ) + + sc.collectorSuccessTime = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "collector_last_run_successful_timestamp_seconds", + Help: "Timestamp of the last successful collector run", + Namespace: ns, + }, + collectorTags, + ) + + prometheus.MustRegister(sc.upstreamResponseTime) + prometheus.MustRegister(sc.requestTime) + prometheus.MustRegister(sc.requestLength) + prometheus.MustRegister(sc.requests) + prometheus.MustRegister(sc.bytesSent) + prometheus.MustRegister(sc.collectorSuccess) + prometheus.MustRegister(sc.collectorSuccessTime) + + go sc.Run() + + return nil +} + +func (sc *UDPCollector) handleMessage(msg []byte) { + glog.V(5).Infof("msg: %v", string(msg)) + + collectorSuccess := true + + // Unmarshall bytes + var stats udpData + err := json.Unmarshal(msg, &stats) + if err != nil { + glog.Errorf("Unexpected error deserializing JSON paylod: %v", err) + collectorSuccess = false + return + } + + // Create Request Labels Map + requestLabels := prometheus.Labels{ + "host": stats.Host, + "status": stats.Status, + "remote_address": stats.RemoteAddress, + "real_ip_address": stats.RealIPAddress, + "remote_user": stats.RemoteUser, + "protocol": stats.Protocol, + "method": stats.Method, + "uri": stats.URI, + "upstream_name": stats.UpstreamName, + "upstream_ip": stats.UpstreamIP, + "upstream_status": stats.UpstreamStatus, + "namespace": stats.Namespace, + "ingress": stats.Ingress, + "service": stats.Service, + } + + // Create Collector Labels Map + collectorLabels := prometheus.Labels{ + "namespace": sc.ns, + "ingress_class": sc.ingressClass, + } + + // Emit metrics + requestsMetric, err := sc.requests.GetMetricWith(collectorLabels) + if err != nil { + glog.Errorf("Error fetching requests metric: %v", err) + collectorSuccess = false + } else { + requestsMetric.Inc() + } + + if stats.UpstreamResponseTime != -1 { + upstreamResponseTimeMetric, err := sc.upstreamResponseTime.GetMetricWith(requestLabels) + if err != nil { + glog.Errorf("Error fetching upstream response time metric: %v", err) + collectorSuccess = false + } else { + upstreamResponseTimeMetric.Observe(stats.UpstreamResponseTime) + } + } + + if stats.RequestTime != -1 { + requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels) + if err != nil { + glog.Errorf("Error fetching request duration metric: %v", err) + collectorSuccess = false + } else { + requestTimeMetric.Observe(stats.RequestTime) + } + } + + if stats.RequestLength != -1 { + requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels) + if err != nil { + glog.Errorf("Error fetching request length metric: %v", err) + collectorSuccess = false + } else { + requestLengthMetric.Observe(stats.RequestLength) + } + } + + if stats.BytesSent != -1 { + bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels) + if err != nil { + glog.Errorf("Error fetching bytes sent metric: %v", err) + collectorSuccess = false + } else { + bytesSentMetric.Observe(stats.BytesSent) + } + } + + collectorSuccessMetric, err := sc.collectorSuccess.GetMetricWith(collectorLabels) + if err != nil { + glog.Errorf("Error fetching collector success metric: %v", err) + } else { + if collectorSuccess { + collectorSuccessMetric.Set(1) + collectorSuccessTimeMetric, err := sc.collectorSuccessTime.GetMetricWith(collectorLabels) + if err != nil { + glog.Errorf("Error fetching collector success time metric: %v", err) + } else { + collectorSuccessTimeMetric.Set(float64(time.Now().Unix())) + } + } else { + collectorSuccessMetric.Set(0) + } + } +} + +// Run adds a message handler to a UDP listener +func (sc *UDPCollector) Run() { + handleMessages(sc.listener, sc.handleMessage) +}