Merge pull request #2957 from ElvinEfendi/batch-metrics
Batch metrics and flush periodically
This commit is contained in:
commit
a982713b3b
7 changed files with 298 additions and 265 deletions
|
@ -20,6 +20,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
@ -206,92 +207,94 @@ func (sc *SocketCollector) handleMessage(msg []byte) {
|
||||||
glog.V(5).Infof("msg: %v", string(msg))
|
glog.V(5).Infof("msg: %v", string(msg))
|
||||||
|
|
||||||
// Unmarshall bytes
|
// Unmarshall bytes
|
||||||
var stats socketData
|
var statsBatch []socketData
|
||||||
err := json.Unmarshal(msg, &stats)
|
err := json.Unmarshal(msg, &statsBatch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
|
glog.Errorf("Unexpected error deserializing JSON paylod: %v. Payload:\n%v", err, string(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
requestLabels := prometheus.Labels{
|
for _, stats := range statsBatch {
|
||||||
"host": stats.Host,
|
requestLabels := prometheus.Labels{
|
||||||
"status": stats.Status,
|
"host": stats.Host,
|
||||||
"method": stats.Method,
|
"status": stats.Status,
|
||||||
"path": stats.Path,
|
"method": stats.Method,
|
||||||
//"endpoint": stats.Endpoint,
|
"path": stats.Path,
|
||||||
"namespace": stats.Namespace,
|
//"endpoint": stats.Endpoint,
|
||||||
"ingress": stats.Ingress,
|
"namespace": stats.Namespace,
|
||||||
"service": stats.Service,
|
"ingress": stats.Ingress,
|
||||||
}
|
"service": stats.Service,
|
||||||
|
|
||||||
collectorLabels := prometheus.Labels{
|
|
||||||
"namespace": stats.Namespace,
|
|
||||||
"ingress": stats.Ingress,
|
|
||||||
"status": stats.Status,
|
|
||||||
}
|
|
||||||
|
|
||||||
latencyLabels := prometheus.Labels{
|
|
||||||
"namespace": stats.Namespace,
|
|
||||||
"ingress": stats.Ingress,
|
|
||||||
"service": stats.Service,
|
|
||||||
}
|
|
||||||
|
|
||||||
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error fetching requests metric: %v", err)
|
|
||||||
} else {
|
|
||||||
requestsMetric.Inc()
|
|
||||||
}
|
|
||||||
|
|
||||||
if stats.Latency != -1 {
|
|
||||||
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error fetching latency metric: %v", err)
|
|
||||||
} else {
|
|
||||||
latencyMetric.Observe(stats.Latency)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if stats.RequestTime != -1 {
|
|
||||||
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error fetching request duration metric: %v", err)
|
|
||||||
} else {
|
|
||||||
requestTimeMetric.Observe(stats.RequestTime)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if stats.RequestLength != -1 {
|
|
||||||
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error fetching request length metric: %v", err)
|
|
||||||
} else {
|
|
||||||
requestLengthMetric.Observe(stats.RequestLength)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if stats.ResponseTime != -1 {
|
|
||||||
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error fetching upstream response time metric: %v", err)
|
|
||||||
} else {
|
|
||||||
responseTimeMetric.Observe(stats.ResponseTime)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if stats.ResponseLength != -1 {
|
|
||||||
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Error fetching bytes sent metric: %v", err)
|
|
||||||
} else {
|
|
||||||
bytesSentMetric.Observe(stats.ResponseLength)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
|
collectorLabels := prometheus.Labels{
|
||||||
|
"namespace": stats.Namespace,
|
||||||
|
"ingress": stats.Ingress,
|
||||||
|
"status": stats.Status,
|
||||||
|
}
|
||||||
|
|
||||||
|
latencyLabels := prometheus.Labels{
|
||||||
|
"namespace": stats.Namespace,
|
||||||
|
"ingress": stats.Ingress,
|
||||||
|
"service": stats.Service,
|
||||||
|
}
|
||||||
|
|
||||||
|
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error fetching bytes sent metric: %v", err)
|
glog.Errorf("Error fetching requests metric: %v", err)
|
||||||
} else {
|
} else {
|
||||||
responseSizeMetric.Observe(stats.ResponseLength)
|
requestsMetric.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
if stats.Latency != -1 {
|
||||||
|
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error fetching latency metric: %v", err)
|
||||||
|
} else {
|
||||||
|
latencyMetric.Observe(stats.Latency)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if stats.RequestTime != -1 {
|
||||||
|
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error fetching request duration metric: %v", err)
|
||||||
|
} else {
|
||||||
|
requestTimeMetric.Observe(stats.RequestTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if stats.RequestLength != -1 {
|
||||||
|
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error fetching request length metric: %v", err)
|
||||||
|
} else {
|
||||||
|
requestLengthMetric.Observe(stats.RequestLength)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if stats.ResponseTime != -1 {
|
||||||
|
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error fetching upstream response time metric: %v", err)
|
||||||
|
} else {
|
||||||
|
responseTimeMetric.Observe(stats.ResponseTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if stats.ResponseLength != -1 {
|
||||||
|
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error fetching bytes sent metric: %v", err)
|
||||||
|
} else {
|
||||||
|
bytesSentMetric.Observe(stats.ResponseLength)
|
||||||
|
}
|
||||||
|
|
||||||
|
responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error fetching bytes sent metric: %v", err)
|
||||||
|
} else {
|
||||||
|
responseSizeMetric.Observe(stats.ResponseLength)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -408,19 +411,15 @@ func (sc SocketCollector) Collect(ch chan<- prometheus.Metric) {
|
||||||
sc.bytesSent.Collect(ch)
|
sc.bytesSent.Collect(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
const packetSize = 1024 * 65
|
|
||||||
|
|
||||||
// handleMessages process the content received in a network connection
|
// handleMessages process the content received in a network connection
|
||||||
func handleMessages(conn io.ReadCloser, fn func([]byte)) {
|
func handleMessages(conn io.ReadCloser, fn func([]byte)) {
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
data, err := ioutil.ReadAll(conn)
|
||||||
msg := make([]byte, packetSize)
|
|
||||||
s, err := conn.Read(msg[0:])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fn(msg[0:s])
|
fn(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteConstants(labels prometheus.Labels) {
|
func deleteConstants(labels prometheus.Labels) {
|
||||||
|
|
|
@ -100,7 +100,7 @@ func TestCollector(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "valid metric object should update prometheus metrics",
|
name: "valid metric object should update prometheus metrics",
|
||||||
data: []string{`{
|
data: []string{`[{
|
||||||
"host":"testshop.com",
|
"host":"testshop.com",
|
||||||
"status":"200",
|
"status":"200",
|
||||||
"bytesSent":150.0,
|
"bytesSent":150.0,
|
||||||
|
@ -115,7 +115,7 @@ func TestCollector(t *testing.T) {
|
||||||
"namespace":"test-app-production",
|
"namespace":"test-app-production",
|
||||||
"ingress":"web-yml",
|
"ingress":"web-yml",
|
||||||
"service":"test-app"
|
"service":"test-app"
|
||||||
}`},
|
}]`},
|
||||||
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
|
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
|
||||||
wantBefore: `
|
wantBefore: `
|
||||||
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
|
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
|
||||||
|
@ -142,7 +142,7 @@ func TestCollector(t *testing.T) {
|
||||||
|
|
||||||
{
|
{
|
||||||
name: "multiple messages should increase prometheus metric by two",
|
name: "multiple messages should increase prometheus metric by two",
|
||||||
data: []string{`{
|
data: []string{`[{
|
||||||
"host":"testshop.com",
|
"host":"testshop.com",
|
||||||
"status":"200",
|
"status":"200",
|
||||||
"bytesSent":150.0,
|
"bytesSent":150.0,
|
||||||
|
@ -157,7 +157,7 @@ func TestCollector(t *testing.T) {
|
||||||
"namespace":"test-app-production",
|
"namespace":"test-app-production",
|
||||||
"ingress":"web-yml",
|
"ingress":"web-yml",
|
||||||
"service":"test-app"
|
"service":"test-app"
|
||||||
}`, `{
|
}]`, `[{
|
||||||
"host":"testshop.com",
|
"host":"testshop.com",
|
||||||
"status":"200",
|
"status":"200",
|
||||||
"bytesSent":150.0,
|
"bytesSent":150.0,
|
||||||
|
@ -172,7 +172,7 @@ func TestCollector(t *testing.T) {
|
||||||
"namespace":"test-app-qa",
|
"namespace":"test-app-qa",
|
||||||
"ingress":"web-yml-qa",
|
"ingress":"web-yml-qa",
|
||||||
"service":"test-app-qa"
|
"service":"test-app-qa"
|
||||||
}`, `{
|
}]`, `[{
|
||||||
"host":"testshop.com",
|
"host":"testshop.com",
|
||||||
"status":"200",
|
"status":"200",
|
||||||
"bytesSent":150.0,
|
"bytesSent":150.0,
|
||||||
|
@ -187,7 +187,7 @@ func TestCollector(t *testing.T) {
|
||||||
"namespace":"test-app-qa",
|
"namespace":"test-app-qa",
|
||||||
"ingress":"web-yml-qa",
|
"ingress":"web-yml-qa",
|
||||||
"service":"test-app-qa"
|
"service":"test-app-qa"
|
||||||
}`},
|
}]`},
|
||||||
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
|
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
|
||||||
wantBefore: `
|
wantBefore: `
|
||||||
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
|
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
|
||||||
|
@ -222,6 +222,65 @@ func TestCollector(t *testing.T) {
|
||||||
nginx_ingress_controller_response_duration_seconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200"} 2
|
nginx_ingress_controller_response_duration_seconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200"} 2
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
name: "collector should be able to handle batched metrics correctly",
|
||||||
|
data: []string{`[
|
||||||
|
{
|
||||||
|
"host":"testshop.com",
|
||||||
|
"status":"200",
|
||||||
|
"bytesSent":150.0,
|
||||||
|
"method":"GET",
|
||||||
|
"path":"/admin",
|
||||||
|
"requestLength":300.0,
|
||||||
|
"requestTime":60.0,
|
||||||
|
"upstreamName":"test-upstream",
|
||||||
|
"upstreamIP":"1.1.1.1:8080",
|
||||||
|
"upstreamResponseTime":200,
|
||||||
|
"upstreamStatus":"220",
|
||||||
|
"namespace":"test-app-production",
|
||||||
|
"ingress":"web-yml",
|
||||||
|
"service":"test-app"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"host":"testshop.com",
|
||||||
|
"status":"200",
|
||||||
|
"bytesSent":150.0,
|
||||||
|
"method":"GET",
|
||||||
|
"path":"/admin",
|
||||||
|
"requestLength":300.0,
|
||||||
|
"requestTime":60.0,
|
||||||
|
"upstreamName":"test-upstream",
|
||||||
|
"upstreamIP":"1.1.1.1:8080",
|
||||||
|
"upstreamResponseTime":100,
|
||||||
|
"upstreamStatus":"220",
|
||||||
|
"namespace":"test-app-production",
|
||||||
|
"ingress":"web-yml",
|
||||||
|
"service":"test-app"
|
||||||
|
}]`},
|
||||||
|
metrics: []string{"nginx_ingress_controller_response_duration_seconds"},
|
||||||
|
wantBefore: `
|
||||||
|
# HELP nginx_ingress_controller_response_duration_seconds The time spent on receiving the response from the upstream server
|
||||||
|
# TYPE nginx_ingress_controller_response_duration_seconds histogram
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.005"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.01"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.025"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.05"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.1"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.25"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.5"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="1"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="2.5"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="5"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="10"} 0
|
||||||
|
nginx_ingress_controller_response_duration_seconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="+Inf"} 2
|
||||||
|
nginx_ingress_controller_response_duration_seconds_sum{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 300
|
||||||
|
nginx_ingress_controller_response_duration_seconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 2
|
||||||
|
`,
|
||||||
|
removeIngresses: []string{"test-app-production/web-yml"},
|
||||||
|
wantAfter: `
|
||||||
|
`,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
|
|
|
@ -1,50 +1,83 @@
|
||||||
local socket = ngx.socket.tcp
|
local socket = ngx.socket.tcp
|
||||||
local cjson = require('cjson')
|
local cjson = require('cjson')
|
||||||
local defer = require('util.defer')
|
|
||||||
local assert = assert
|
local assert = assert
|
||||||
|
|
||||||
|
local metrics_batch = {}
|
||||||
|
-- if an Nginx worker processes more than (MAX_BATCH_SIZE/FLUSH_INTERVAL) RPS then it will start dropping metrics
|
||||||
|
local MAX_BATCH_SIZE = 10000
|
||||||
|
local FLUSH_INTERVAL = 1 -- second
|
||||||
|
|
||||||
local _M = {}
|
local _M = {}
|
||||||
|
|
||||||
local function send_data(jsonData)
|
local function send(payload)
|
||||||
local s = assert(socket())
|
local s = assert(socket())
|
||||||
assert(s:connect('unix:/tmp/prometheus-nginx.socket'))
|
assert(s:connect("unix:/tmp/prometheus-nginx.socket"))
|
||||||
assert(s:send(jsonData))
|
assert(s:send(payload))
|
||||||
assert(s:close())
|
assert(s:close())
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.encode_nginx_stats()
|
local function metrics()
|
||||||
return cjson.encode({
|
return {
|
||||||
host = ngx.var.host or "-",
|
host = ngx.var.host or "-",
|
||||||
|
namespace = ngx.var.namespace or "-",
|
||||||
method = ngx.var.request_method or "-",
|
ingress = ngx.var.ingress_name or "-",
|
||||||
|
service = ngx.var.service_name or "-",
|
||||||
path = ngx.var.location_path or "-",
|
path = ngx.var.location_path or "-",
|
||||||
|
|
||||||
|
method = ngx.var.request_method or "-",
|
||||||
status = ngx.var.status or "-",
|
status = ngx.var.status or "-",
|
||||||
|
|
||||||
requestLength = tonumber(ngx.var.request_length) or -1,
|
requestLength = tonumber(ngx.var.request_length) or -1,
|
||||||
requestTime = tonumber(ngx.var.request_time) or -1,
|
requestTime = tonumber(ngx.var.request_time) or -1,
|
||||||
|
|
||||||
responseLength = tonumber(ngx.var.bytes_sent) or -1,
|
responseLength = tonumber(ngx.var.bytes_sent) or -1,
|
||||||
|
|
||||||
endpoint = ngx.var.upstream_addr or "-",
|
endpoint = ngx.var.upstream_addr or "-",
|
||||||
|
|
||||||
upstreamLatency = tonumber(ngx.var.upstream_connect_time) or -1,
|
upstreamLatency = tonumber(ngx.var.upstream_connect_time) or -1,
|
||||||
upstreamResponseTime = tonumber(ngx.var.upstream_response_time) or -1,
|
upstreamResponseTime = tonumber(ngx.var.upstream_response_time) or -1,
|
||||||
upstreamResponseLength = tonumber(ngx.var.upstream_response_length) or -1,
|
upstreamResponseLength = tonumber(ngx.var.upstream_response_length) or -1,
|
||||||
upstreamStatus = ngx.var.upstream_status or "-",
|
upstreamStatus = ngx.var.upstream_status or "-",
|
||||||
|
}
|
||||||
namespace = ngx.var.namespace or "-",
|
|
||||||
ingress = ngx.var.ingress_name or "-",
|
|
||||||
service = ngx.var.service_name or "-",
|
|
||||||
})
|
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.call()
|
local function flush(premature)
|
||||||
local ok, err = defer.to_timer_phase(send_data, _M.encode_nginx_stats())
|
if premature then
|
||||||
if not ok then
|
|
||||||
ngx.log(ngx.ERR, "failed to defer send_data to timer phase: ", err)
|
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if #metrics_batch == 0 then
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
local current_metrics_batch = metrics_batch
|
||||||
|
metrics_batch = {}
|
||||||
|
|
||||||
|
local ok, payload = pcall(cjson.encode, current_metrics_batch)
|
||||||
|
if not ok then
|
||||||
|
ngx.log(ngx.ERR, "error while encoding metrics: " .. tostring(payload))
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
send(payload)
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.init_worker()
|
||||||
|
local _, err = ngx.timer.every(FLUSH_INTERVAL, flush)
|
||||||
|
if err then
|
||||||
|
ngx.log(ngx.ERR, string.format("error when setting up timer.every: %s", tostring(err)))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.call()
|
||||||
|
if #metrics_batch >= MAX_BATCH_SIZE then
|
||||||
|
ngx.log(ngx.WARN, "omiting metrics for the request, current batch is full")
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
table.insert(metrics_batch, metrics())
|
||||||
|
end
|
||||||
|
|
||||||
|
if _TEST then
|
||||||
|
_M.flush = flush
|
||||||
|
_M.get_metrics_batch = function() return metrics_batch end
|
||||||
end
|
end
|
||||||
|
|
||||||
return _M
|
return _M
|
||||||
|
|
|
@ -1,12 +0,0 @@
|
||||||
_G._TEST = true
|
|
||||||
local defer = require("util.defer")
|
|
||||||
|
|
||||||
describe("Defer", function()
|
|
||||||
describe("to_timer_phase", function()
|
|
||||||
it("executes passed callback immediately if called on timer phase", function()
|
|
||||||
defer.counter = 0
|
|
||||||
defer.to_timer_phase(function() defer.counter = defer.counter + 1 end)
|
|
||||||
assert.equal(defer.counter, 1)
|
|
||||||
end)
|
|
||||||
end)
|
|
||||||
end)
|
|
|
@ -1,96 +1,106 @@
|
||||||
_G._TEST = true
|
_G._TEST = true
|
||||||
local cjson = require("cjson")
|
|
||||||
|
local original_ngx = ngx
|
||||||
|
local function reset_ngx()
|
||||||
|
_G.ngx = original_ngx
|
||||||
|
end
|
||||||
|
|
||||||
|
local function mock_ngx(mock)
|
||||||
|
local _ngx = mock
|
||||||
|
setmetatable(_ngx, { __index = ngx })
|
||||||
|
_G.ngx = _ngx
|
||||||
|
end
|
||||||
|
|
||||||
|
local function mock_ngx_socket_tcp()
|
||||||
|
local tcp_mock = {}
|
||||||
|
stub(tcp_mock, "connect", true)
|
||||||
|
stub(tcp_mock, "send", true)
|
||||||
|
stub(tcp_mock, "close", true)
|
||||||
|
|
||||||
|
local socket_mock = {}
|
||||||
|
stub(socket_mock, "tcp", tcp_mock)
|
||||||
|
mock_ngx({ socket = socket_mock })
|
||||||
|
|
||||||
|
return tcp_mock
|
||||||
|
end
|
||||||
|
|
||||||
describe("Monitor", function()
|
describe("Monitor", function()
|
||||||
|
after_each(function()
|
||||||
|
reset_ngx()
|
||||||
|
package.loaded["monitor"] = nil
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("batches metrics", function()
|
||||||
local monitor = require("monitor")
|
local monitor = require("monitor")
|
||||||
describe("encode_nginx_stats()", function()
|
mock_ngx({ var = {} })
|
||||||
it("successfuly encodes the current stats of nginx to JSON", function()
|
|
||||||
local nginx_environment = {
|
|
||||||
host = "testshop.com",
|
|
||||||
status = "200",
|
|
||||||
bytes_sent = "150",
|
|
||||||
server_protocol = "HTTP",
|
|
||||||
request_method = "GET",
|
|
||||||
location_path = "/admin",
|
|
||||||
request_length = "300",
|
|
||||||
request_time = "210",
|
|
||||||
proxy_upstream_name = "test-upstream",
|
|
||||||
upstream_addr = "2.2.2.2",
|
|
||||||
upstream_response_time = "200",
|
|
||||||
upstream_response_length = "150",
|
|
||||||
upstream_connect_time = "1",
|
|
||||||
upstream_status = "220",
|
|
||||||
namespace = "test-app-production",
|
|
||||||
ingress_name = "web-yml",
|
|
||||||
service_name = "test-app",
|
|
||||||
}
|
|
||||||
ngx.var = nginx_environment
|
|
||||||
|
|
||||||
local encode_nginx_stats = monitor.encode_nginx_stats
|
for i = 1,10,1 do
|
||||||
local encoded_json_stats = encode_nginx_stats()
|
monitor.call()
|
||||||
local decoded_json_stats = cjson.decode(encoded_json_stats)
|
end
|
||||||
|
|
||||||
local expected_json_stats = {
|
assert.equal(10, #monitor.get_metrics_batch())
|
||||||
host = "testshop.com",
|
end)
|
||||||
status = "200",
|
|
||||||
responseLength = 150.0,
|
|
||||||
method = "GET",
|
|
||||||
path = "/admin",
|
|
||||||
requestLength = 300.0,
|
|
||||||
requestTime = 210.0,
|
|
||||||
endpoint = "2.2.2.2",
|
|
||||||
upstreamResponseTime = 200,
|
|
||||||
upstreamStatus = "220",
|
|
||||||
upstreamLatency = 1.0,
|
|
||||||
upstreamResponseLength = 150.0,
|
|
||||||
namespace = "test-app-production",
|
|
||||||
ingress = "web-yml",
|
|
||||||
service = "test-app",
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.are.same(decoded_json_stats,expected_json_stats)
|
describe("flush", function()
|
||||||
end)
|
it("short circuits when premmature is true (when worker is shutting down)", function()
|
||||||
|
local tcp_mock = mock_ngx_socket_tcp()
|
||||||
|
local monitor = require("monitor")
|
||||||
|
mock_ngx({ var = {} })
|
||||||
|
|
||||||
it("replaces empty numeric keys with -1 and missing string keys with -", function()
|
for i = 1,10,1 do
|
||||||
local nginx_environment = {
|
monitor.call()
|
||||||
remote_addr = "10.10.10.10",
|
end
|
||||||
realip_remote_addr = "5.5.5.5",
|
monitor.flush(true)
|
||||||
remote_user = "francisco",
|
assert.stub(tcp_mock.connect).was_not_called()
|
||||||
server_protocol = "HTTP",
|
|
||||||
request_method = "GET",
|
|
||||||
location_path = "/admin",
|
|
||||||
request_time = "202",
|
|
||||||
proxy_upstream_name = "test-upstream",
|
|
||||||
upstream_addr = "2.2.2.2",
|
|
||||||
upstream_response_time = "201",
|
|
||||||
upstream_status = "220",
|
|
||||||
ingress_name = "web-yml",
|
|
||||||
}
|
|
||||||
ngx.var = nginx_environment
|
|
||||||
|
|
||||||
local encode_nginx_stats = monitor.encode_nginx_stats
|
|
||||||
local encoded_json_stats = encode_nginx_stats()
|
|
||||||
local decoded_json_stats = cjson.decode(encoded_json_stats)
|
|
||||||
|
|
||||||
local expected_json_stats = {
|
|
||||||
host = "-",
|
|
||||||
status = "-",
|
|
||||||
responseLength = -1,
|
|
||||||
method = "GET",
|
|
||||||
path = "/admin",
|
|
||||||
requestLength = -1,
|
|
||||||
requestTime = 202.0,
|
|
||||||
endpoint = "2.2.2.2",
|
|
||||||
upstreamStatus = "220",
|
|
||||||
namespace = "-",
|
|
||||||
ingress = "web-yml",
|
|
||||||
upstreamLatency = -1,
|
|
||||||
upstreamResponseTime = 201,
|
|
||||||
upstreamResponseLength = -1,
|
|
||||||
responseLength = -1,
|
|
||||||
service = "-",
|
|
||||||
}
|
|
||||||
assert.are.same(decoded_json_stats,expected_json_stats)
|
|
||||||
end)
|
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
it("short circuits when there's no metrics batched", function()
|
||||||
|
local tcp_mock = mock_ngx_socket_tcp()
|
||||||
|
local monitor = require("monitor")
|
||||||
|
|
||||||
|
monitor.flush()
|
||||||
|
assert.stub(tcp_mock.connect).was_not_called()
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("JSON encodes and sends the batched metrics", function()
|
||||||
|
local tcp_mock = mock_ngx_socket_tcp()
|
||||||
|
local monitor = require("monitor")
|
||||||
|
|
||||||
|
local ngx_var_mock = {
|
||||||
|
host = "example.com",
|
||||||
|
namespace = "default",
|
||||||
|
ingress_name = "example",
|
||||||
|
service_name = "http-svc",
|
||||||
|
location_path = "/",
|
||||||
|
|
||||||
|
request_method = "GET",
|
||||||
|
status = "200",
|
||||||
|
request_length = "256",
|
||||||
|
request_time = "0.04",
|
||||||
|
bytes_sent = "512",
|
||||||
|
|
||||||
|
upstream_addr = "10.10.0.1",
|
||||||
|
upstream_connect_time = "0.01",
|
||||||
|
upstream_response_time = "0.02",
|
||||||
|
upstream_response_length = "456",
|
||||||
|
upstream_status = "200",
|
||||||
|
}
|
||||||
|
mock_ngx({ var = ngx_var_mock })
|
||||||
|
monitor.call()
|
||||||
|
|
||||||
|
local ngx_var_mock1 = ngx_var_mock
|
||||||
|
ngx_var_mock1.status = "201"
|
||||||
|
ngx_var_mock1.request_method = "POST"
|
||||||
|
mock_ngx({ var = ngx_var_mock })
|
||||||
|
monitor.call()
|
||||||
|
|
||||||
|
monitor.flush()
|
||||||
|
|
||||||
|
local expected_payload = '[{"upstreamStatus":"200","requestLength":256,"ingress":"example","status":"200","service":"http-svc","requestTime":0.04,"namespace":"default","host":"example.com","method":"GET","upstreamResponseTime":0.02,"upstreamResponseLength":456,"endpoint":"10.10.0.1","upstreamLatency":0.01,"path":"\\/","responseLength":512},{"upstreamStatus":"200","requestLength":256,"ingress":"example","status":"201","service":"http-svc","requestTime":0.04,"namespace":"default","host":"example.com","method":"POST","upstreamResponseTime":0.02,"upstreamResponseLength":456,"endpoint":"10.10.0.1","upstreamLatency":0.01,"path":"\\/","responseLength":512}]'
|
||||||
|
|
||||||
|
assert.stub(tcp_mock.connect).was_called_with(tcp_mock, "unix:/tmp/prometheus-nginx.socket")
|
||||||
|
assert.stub(tcp_mock.send).was_called_with(tcp_mock, expected_payload)
|
||||||
|
assert.stub(tcp_mock.close).was_called_with(tcp_mock)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
end)
|
end)
|
||||||
|
|
|
@ -1,57 +0,0 @@
|
||||||
local util = require("util")
|
|
||||||
|
|
||||||
local timer_started = false
|
|
||||||
local queue = {}
|
|
||||||
local MAX_QUEUE_SIZE = 10000
|
|
||||||
|
|
||||||
local _M = {}
|
|
||||||
|
|
||||||
local function flush_queue(premature)
|
|
||||||
-- TODO Investigate if we should actually still flush the queue when we're
|
|
||||||
-- shutting down.
|
|
||||||
if premature then return end
|
|
||||||
|
|
||||||
local current_queue = queue
|
|
||||||
queue = {}
|
|
||||||
timer_started = false
|
|
||||||
|
|
||||||
for _,v in ipairs(current_queue) do
|
|
||||||
v.func(unpack(v.args))
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
-- `to_timer_phase` will enqueue a function that will be executed in a timer
|
|
||||||
-- context, at a later point in time. The purpose is that some APIs (such as
|
|
||||||
-- sockets) are not available during some nginx request phases (such as the
|
|
||||||
-- logging phase), but are available for use in timers. There are no ordering
|
|
||||||
-- guarantees for when a function will be executed.
|
|
||||||
function _M.to_timer_phase(func, ...)
|
|
||||||
if ngx.get_phase() == "timer" then
|
|
||||||
func(...)
|
|
||||||
return true
|
|
||||||
end
|
|
||||||
|
|
||||||
if #queue >= MAX_QUEUE_SIZE then
|
|
||||||
ngx.log(ngx.ERR, "deferred timer queue full")
|
|
||||||
return nil, "deferred timer queue full"
|
|
||||||
end
|
|
||||||
|
|
||||||
table.insert(queue, { func = func, args = {...} })
|
|
||||||
if not timer_started then
|
|
||||||
local ok, err = ngx.timer.at(0, flush_queue)
|
|
||||||
if ok then
|
|
||||||
-- unfortunately this is to deal with tests - when running unit tests, we
|
|
||||||
-- dont actually run the timer, we call the function inline
|
|
||||||
if util.tablelength(queue) > 0 then
|
|
||||||
timer_started = true
|
|
||||||
end
|
|
||||||
else
|
|
||||||
local msg = "failed to create timer: " .. tostring(err)
|
|
||||||
ngx.log(ngx.ERR, msg)
|
|
||||||
return nil, msg
|
|
||||||
end
|
|
||||||
end
|
|
||||||
return true
|
|
||||||
end
|
|
||||||
|
|
||||||
return _M
|
|
|
@ -90,6 +90,7 @@ http {
|
||||||
{{ if $all.DynamicConfigurationEnabled }}
|
{{ if $all.DynamicConfigurationEnabled }}
|
||||||
init_worker_by_lua_block {
|
init_worker_by_lua_block {
|
||||||
balancer.init_worker()
|
balancer.init_worker()
|
||||||
|
monitor.init_worker()
|
||||||
}
|
}
|
||||||
{{ end }}
|
{{ end }}
|
||||||
{{ end }}
|
{{ end }}
|
||||||
|
|
Loading…
Reference in a new issue