Merge pull request #2611 from fmejia97/add-metric-emitter-module
Add metric emitter lua module
This commit is contained in:
commit
43cf56c442
6 changed files with 266 additions and 0 deletions
57
rootfs/etc/nginx/lua/defer.lua
Normal file
57
rootfs/etc/nginx/lua/defer.lua
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
local util = require("util")
|
||||||
|
|
||||||
|
local timer_started = false
|
||||||
|
local queue = {}
|
||||||
|
local MAX_QUEUE_SIZE = 10000
|
||||||
|
|
||||||
|
local _M = {}
|
||||||
|
|
||||||
|
local function flush_queue(premature)
|
||||||
|
-- TODO Investigate if we should actually still flush the queue when we're
|
||||||
|
-- shutting down.
|
||||||
|
if premature then return end
|
||||||
|
|
||||||
|
local current_queue = queue
|
||||||
|
queue = {}
|
||||||
|
timer_started = false
|
||||||
|
|
||||||
|
for _,v in ipairs(current_queue) do
|
||||||
|
v.func(unpack(v.args))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
-- `to_timer_phase` will enqueue a function that will be executed in a timer
|
||||||
|
-- context, at a later point in time. The purpose is that some APIs (such as
|
||||||
|
-- sockets) are not available during some nginx request phases (such as the
|
||||||
|
-- logging phase), but are available for use in timers. There are no ordering
|
||||||
|
-- guarantees for when a function will be executed.
|
||||||
|
function _M.to_timer_phase(func, ...)
|
||||||
|
if ngx.get_phase() == "timer" then
|
||||||
|
func(...)
|
||||||
|
return true
|
||||||
|
end
|
||||||
|
|
||||||
|
if #queue >= MAX_QUEUE_SIZE then
|
||||||
|
ngx.log(ngx.ERR, "deferred timer queue full")
|
||||||
|
return nil, "deferred timer queue full"
|
||||||
|
end
|
||||||
|
|
||||||
|
table.insert(queue, { func = func, args = {...} })
|
||||||
|
if not timer_started then
|
||||||
|
local ok, err = ngx.timer.at(0, flush_queue)
|
||||||
|
if ok then
|
||||||
|
-- unfortunately this is to deal with tests - when running unit tests, we
|
||||||
|
-- dont actually run the timer, we call the function inline
|
||||||
|
if util.tablelength(queue) > 0 then
|
||||||
|
timer_started = true
|
||||||
|
end
|
||||||
|
else
|
||||||
|
local msg = "failed to create timer: " .. tostring(err)
|
||||||
|
ngx.log(ngx.ERR, msg)
|
||||||
|
return nil, msg
|
||||||
|
end
|
||||||
|
end
|
||||||
|
return true
|
||||||
|
end
|
||||||
|
|
||||||
|
return _M
|
46
rootfs/etc/nginx/lua/monitor.lua
Normal file
46
rootfs/etc/nginx/lua/monitor.lua
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
local socket = ngx.socket.udp
|
||||||
|
local cjson = require('cjson')
|
||||||
|
local defer = require('defer')
|
||||||
|
local assert = assert
|
||||||
|
|
||||||
|
local _M = {}
|
||||||
|
|
||||||
|
local function send_data(jsonData)
|
||||||
|
local s = assert(socket())
|
||||||
|
assert(s:setpeername("127.0.0.1", 8000))
|
||||||
|
assert(s:send(jsonData))
|
||||||
|
assert(s:close())
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.encode_nginx_stats()
|
||||||
|
return cjson.encode({
|
||||||
|
host = ngx.var.host or "-",
|
||||||
|
status = ngx.var.status or "-",
|
||||||
|
remoteAddr = ngx.var.remote_addr or "-",
|
||||||
|
realIpAddr = ngx.var.realip_remote_addr or "-",
|
||||||
|
remoteUser = ngx.var.remote_user or "-",
|
||||||
|
bytesSent = tonumber(ngx.var.bytes_sent) or -1,
|
||||||
|
protocol = ngx.var.server_protocol or "-",
|
||||||
|
method = ngx.var.request_method or "-",
|
||||||
|
uri = ngx.var.uri or "-",
|
||||||
|
requestLength = tonumber(ngx.var.request_length) or -1,
|
||||||
|
requestTime = tonumber(ngx.var.request_time) or -1,
|
||||||
|
upstreamName = ngx.var.proxy_upstream_name or "-",
|
||||||
|
upstreamIP = ngx.var.upstream_addr or "-",
|
||||||
|
upstreamResponseTime = tonumber(ngx.var.upstream_response_time) or -1,
|
||||||
|
upstreamStatus = ngx.var.upstream_status or "-",
|
||||||
|
namespace = ngx.var.namespace or "-",
|
||||||
|
ingress = ngx.var.ingress_name or "-",
|
||||||
|
service = ngx.var.service_name or "-",
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.call()
|
||||||
|
local ok, err = defer.to_timer_phase(send_data, _M.encode_nginx_stats())
|
||||||
|
if not ok then
|
||||||
|
ngx.log(ngx.ERR, "failed to defer send_data to timer phase: ", err)
|
||||||
|
return
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
return _M
|
20
rootfs/etc/nginx/lua/test/defer/defer_test.lua
Normal file
20
rootfs/etc/nginx/lua/test/defer/defer_test.lua
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package.path = "./rootfs/etc/nginx/lua/?.lua;./rootfs/etc/nginx/lua/test/mocks/?.lua;" .. package.path
|
||||||
|
_G._TEST = true
|
||||||
|
local defer = require('defer')
|
||||||
|
|
||||||
|
local _ngx = {
|
||||||
|
shared = {},
|
||||||
|
log = function(...) end,
|
||||||
|
get_phase = function() return "timer" end,
|
||||||
|
}
|
||||||
|
_G.ngx = _ngx
|
||||||
|
|
||||||
|
describe("Defer", function()
|
||||||
|
describe("to_timer_phase", function()
|
||||||
|
it("executes passed callback immediately if called on timer phase", function()
|
||||||
|
defer.counter = 0
|
||||||
|
defer.to_timer_phase(function() defer.counter = defer.counter + 1 end)
|
||||||
|
assert.equal(defer.counter, 1)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
end)
|
122
rootfs/etc/nginx/lua/test/monitor/monitor_test.lua
Normal file
122
rootfs/etc/nginx/lua/test/monitor/monitor_test.lua
Normal file
|
@ -0,0 +1,122 @@
|
||||||
|
package.path = "./rootfs/etc/nginx/lua/?.lua;./rootfs/etc/nginx/lua/test/mocks/?.lua;" .. package.path
|
||||||
|
_G._TEST = true
|
||||||
|
local cjson = require('cjson')
|
||||||
|
|
||||||
|
local function udp_mock()
|
||||||
|
return {
|
||||||
|
setpeername = function(...) return true end,
|
||||||
|
send = function(payload) return payload end,
|
||||||
|
close = function(...) return true end
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
local _ngx = {
|
||||||
|
shared = {},
|
||||||
|
log = function(...) end,
|
||||||
|
socket = {
|
||||||
|
udp = udp_mock
|
||||||
|
},
|
||||||
|
get_phase = function() return "timer" end,
|
||||||
|
var = {}
|
||||||
|
}
|
||||||
|
_G.ngx = _ngx
|
||||||
|
|
||||||
|
describe("Monitor", function()
|
||||||
|
local monitor = require("monitor")
|
||||||
|
describe("encode_nginx_stats()", function()
|
||||||
|
it("successfuly encodes the current stats of nginx to JSON", function()
|
||||||
|
local nginx_environment = {
|
||||||
|
host = "testshop.com",
|
||||||
|
status = "200",
|
||||||
|
remote_addr = "10.10.10.10",
|
||||||
|
realip_remote_addr = "5.5.5.5",
|
||||||
|
remote_user = "admin",
|
||||||
|
bytes_sent = "150",
|
||||||
|
server_protocol = "HTTP",
|
||||||
|
request_method = "GET",
|
||||||
|
uri = "/admin",
|
||||||
|
request_length = "300",
|
||||||
|
request_time = "60",
|
||||||
|
proxy_upstream_name = "test-upstream",
|
||||||
|
upstream_addr = "2.2.2.2",
|
||||||
|
upstream_response_time = "200",
|
||||||
|
upstream_status = "220",
|
||||||
|
namespace = "test-app-production",
|
||||||
|
ingress_name = "web-yml",
|
||||||
|
service_name = "test-app",
|
||||||
|
}
|
||||||
|
ngx.var = nginx_environment
|
||||||
|
|
||||||
|
local encode_nginx_stats = monitor.encode_nginx_stats
|
||||||
|
local encoded_json_stats = encode_nginx_stats()
|
||||||
|
local decoded_json_stats = cjson.decode(encoded_json_stats)
|
||||||
|
|
||||||
|
local expected_json_stats = {
|
||||||
|
host = "testshop.com",
|
||||||
|
status = "200",
|
||||||
|
remoteAddr = "10.10.10.10",
|
||||||
|
realIpAddr = "5.5.5.5",
|
||||||
|
remoteUser = "admin",
|
||||||
|
bytesSent = 150.0,
|
||||||
|
protocol = "HTTP",
|
||||||
|
method = "GET",
|
||||||
|
uri = "/admin",
|
||||||
|
requestLength = 300.0,
|
||||||
|
requestTime = 60.0,
|
||||||
|
upstreamName = "test-upstream",
|
||||||
|
upstreamIP = "2.2.2.2",
|
||||||
|
upstreamResponseTime = 200,
|
||||||
|
upstreamStatus = "220",
|
||||||
|
namespace = "test-app-production",
|
||||||
|
ingress = "web-yml",
|
||||||
|
service = "test-app",
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.are.same(decoded_json_stats,expected_json_stats)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("replaces empty numeric keys with -1 and missing string keys with -", function()
|
||||||
|
local nginx_environment = {
|
||||||
|
remote_addr = "10.10.10.10",
|
||||||
|
realip_remote_addr = "5.5.5.5",
|
||||||
|
remote_user = "francisco",
|
||||||
|
server_protocol = "HTTP",
|
||||||
|
request_method = "GET",
|
||||||
|
uri = "/admin",
|
||||||
|
request_time = "60",
|
||||||
|
proxy_upstream_name = "test-upstream",
|
||||||
|
upstream_addr = "2.2.2.2",
|
||||||
|
upstream_response_time = "200",
|
||||||
|
upstream_status = "220",
|
||||||
|
ingress_name = "web-yml",
|
||||||
|
}
|
||||||
|
ngx.var = nginx_environment
|
||||||
|
|
||||||
|
local encode_nginx_stats = monitor.encode_nginx_stats
|
||||||
|
local encoded_json_stats = encode_nginx_stats()
|
||||||
|
local decoded_json_stats = cjson.decode(encoded_json_stats)
|
||||||
|
|
||||||
|
local expected_json_stats = {
|
||||||
|
host = "-",
|
||||||
|
status = "-",
|
||||||
|
remoteAddr = "10.10.10.10",
|
||||||
|
realIpAddr = "5.5.5.5",
|
||||||
|
remoteUser = "francisco",
|
||||||
|
bytesSent = -1,
|
||||||
|
protocol = "HTTP",
|
||||||
|
method = "GET",
|
||||||
|
uri = "/admin",
|
||||||
|
requestLength = -1,
|
||||||
|
requestTime = 60.0,
|
||||||
|
upstreamName = "test-upstream",
|
||||||
|
upstreamIP = "2.2.2.2",
|
||||||
|
upstreamResponseTime = 200,
|
||||||
|
upstreamStatus = "220",
|
||||||
|
namespace = "-",
|
||||||
|
ingress = "web-yml",
|
||||||
|
service = "-",
|
||||||
|
}
|
||||||
|
assert.are.same(decoded_json_stats,expected_json_stats)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
end)
|
|
@ -130,4 +130,13 @@ local function deepcopy(orig)
|
||||||
end
|
end
|
||||||
_M.deepcopy = deepcopy
|
_M.deepcopy = deepcopy
|
||||||
|
|
||||||
|
local function tablelength(T)
|
||||||
|
local count = 0
|
||||||
|
for _ in pairs(T) do
|
||||||
|
count = count + 1
|
||||||
|
end
|
||||||
|
return count
|
||||||
|
end
|
||||||
|
_M.tablelength = tablelength
|
||||||
|
|
||||||
return _M
|
return _M
|
||||||
|
|
|
@ -69,6 +69,13 @@ http {
|
||||||
balancer = res
|
balancer = res
|
||||||
end
|
end
|
||||||
{{ end }}
|
{{ end }}
|
||||||
|
|
||||||
|
ok, res = pcall(require, "monitor")
|
||||||
|
if not ok then
|
||||||
|
error("require failed: " .. tostring(res))
|
||||||
|
else
|
||||||
|
monitor = res
|
||||||
|
end
|
||||||
}
|
}
|
||||||
|
|
||||||
{{ if $all.DynamicConfigurationEnabled }}
|
{{ if $all.DynamicConfigurationEnabled }}
|
||||||
|
@ -903,6 +910,11 @@ stream {
|
||||||
{{ if $all.DynamicConfigurationEnabled}}
|
{{ if $all.DynamicConfigurationEnabled}}
|
||||||
balancer.log()
|
balancer.log()
|
||||||
{{ end }}
|
{{ end }}
|
||||||
|
|
||||||
|
ok, res = pcall(monitor.call)
|
||||||
|
if not ok then
|
||||||
|
ngx.log(ngx.ERR, "request_id failed: " .. tostring(res))
|
||||||
|
end
|
||||||
}
|
}
|
||||||
{{ end }}
|
{{ end }}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue