ewma improvements

This commit is contained in:
Elvin Efendi 2019-08-15 11:04:32 -04:00
parent 0b375989f3
commit 30b64df10a
6 changed files with 354 additions and 98 deletions

View file

@ -29,4 +29,5 @@ resty \
--shdict "certificate_data 16M" \
--shdict "balancer_ewma 1M" \
--shdict "balancer_ewma_last_touched_at 1M" \
--shdict "balancer_ewma_locks 512k" \
./rootfs/etc/nginx/lua/test/run.lua ${BUSTED_ARGS} ./rootfs/etc/nginx/lua/test/

View file

@ -65,8 +65,11 @@ const (
var (
validRedirectCodes = sets.NewInt([]int{301, 302, 307, 308}...)
defaultLuaSharedDicts = map[string]int{
"configuration_data": 20,
"certificate_data": 20,
"configuration_data": 20,
"certificate_data": 20,
"balancer_ewma": 10,
"balancer_ewma_last_touched_at": 10,
"balancer_ewma_locks": 1,
}
)

View file

@ -5,6 +5,7 @@
-- /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
local resty_lock = require("resty.lock")
local util = require("util")
local split = require("util.split")
@ -13,10 +14,36 @@ local ngx_log = ngx.log
local INFO = ngx.INFO
local DECAY_TIME = 10 -- this value is in seconds
local LOCK_KEY = ":ewma_key"
local PICK_SET_SIZE = 2
local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1})
if not ewma_lock then
error(ewma_lock_err)
end
local _M = { name = "ewma" }
local function lock(upstream)
local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
if err then
if err ~= "timeout" then
ngx.log(ngx.ERR, string.format("EWMA Balancer failed to lock: %s", tostring(err)))
end
end
return err
end
local function unlock()
local ok, err = ewma_lock:unlock()
if not ok then
ngx.log(ngx.ERR, string.format("EWMA Balancer failed to unlock: %s", tostring(err)))
end
return err
end
local function decay_ewma(ewma, last_touched_at, rtt, now)
local td = now - last_touched_at
td = (td > 0) and td or 0
@ -26,28 +53,55 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
return ewma
end
local function get_or_update_ewma(self, upstream, rtt, update)
local ewma = self.ewma[upstream] or 0
local function store_stats(upstream, ewma, now)
local success, err, forcible = ngx.shared.balancer_ewma_last_touched_at:set(upstream, now)
if not success then
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set failed " .. err)
end
if forcible then
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set valid items forcibly overwritten")
end
success, err, forcible = ngx.shared.balancer_ewma:set(upstream, ewma)
if not success then
ngx.log(ngx.WARN, "balancer_ewma:set failed " .. err)
end
if forcible then
ngx.log(ngx.WARN, "balancer_ewma:set valid items forcibly overwritten")
end
end
local function get_or_update_ewma(upstream, rtt, update)
local lock_err = nil
if update then
lock_err = lock(upstream)
end
local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
if lock_err ~= nil then
return ewma, lock_err
end
local now = ngx.now()
local last_touched_at = self.ewma_last_touched_at[upstream] or 0
local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
if not update then
return ewma, nil
end
self.ewma[upstream] = ewma
self.ewma_last_touched_at[upstream] = now
store_stats(upstream, ewma, now)
unlock()
return ewma, nil
end
local function score(self, upstream)
local function score(upstream)
-- Original implementation used names
-- Endpoints don't have names, so passing in IP:Port as key instead
local upstream_name = upstream.address .. ":" .. upstream.port
return get_or_update_ewma(self, upstream_name, 0, false)
return get_or_update_ewma(upstream_name, 0, false)
end
-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
@ -63,12 +117,12 @@ local function shuffle_peers(peers, k)
-- peers[1 .. k] will now contain a randomly selected k from #peers
end
local function pick_and_score(self, peers, k)
local function pick_and_score(peers, k)
shuffle_peers(peers, k)
local lowest_score_index = 1
local lowest_score = score(self, peers[lowest_score_index])
local lowest_score = score(peers[lowest_score_index])
for i = 2, k do
local new_score = score(self, peers[i])
local new_score = score(peers[i])
if new_score < lowest_score then
lowest_score_index, lowest_score = i, new_score
end
@ -76,6 +130,31 @@ local function pick_and_score(self, peers, k)
return peers[lowest_score_index], lowest_score
end
-- slow_start_ewma is something we use to avoid sending too many requests
-- to the newly introduced endpoints. We currently use average ewma values
-- of existing endpoints.
local function calculate_slow_start_ewma(self)
local total_ewma = 0
local endpoints_count = 0
for _, endpoint in pairs(self.peers) do
local endpoint_string = endpoint.address .. ":" .. endpoint.port
local ewma = ngx.shared.balancer_ewma:get(endpoint_string)
if ewma then
endpoints_count = endpoints_count + 1
total_ewma = total_ewma + ewma
end
end
if endpoints_count == 0 then
ngx.log(ngx.INFO, "no ewma value exists for the endpoints")
return nil
end
return total_ewma / endpoints_count
end
function _M.balance(self)
local peers = self.peers
local endpoint, ewma_score = peers[1], -1
@ -83,7 +162,7 @@ function _M.balance(self)
if #peers > 1 then
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
local peer_copy = util.deepcopy(peers)
endpoint, ewma_score = pick_and_score(self, peer_copy, k)
endpoint, ewma_score = pick_and_score(peer_copy, k)
end
ngx.var.balancer_ewma_score = ewma_score
@ -92,7 +171,7 @@ function _M.balance(self)
return endpoint.address .. ":" .. endpoint.port
end
function _M.after_balance(self)
function _M.after_balance(_)
local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0
local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0
local rtt = connect_time + response_time
@ -101,30 +180,41 @@ function _M.after_balance(self)
if util.is_blank(upstream) then
return
end
get_or_update_ewma(self, upstream, rtt, true)
get_or_update_ewma(upstream, rtt, true)
end
function _M.sync(self, backend)
self.traffic_shaping_policy = backend.trafficShapingPolicy
self.alternative_backends = backend.alternativeBackends
local normalized_endpoints_added, normalized_endpoints_removed = util.diff_endpoints(self.peers, backend.endpoints)
local changed = not util.deep_compare(self.peers, backend.endpoints)
if not changed then
if #normalized_endpoints_added == 0 and #normalized_endpoints_removed == 0 then
ngx.log(ngx.INFO, "endpoints did not change for backend " .. tostring(backend.name))
return
end
ngx_log(INFO, string_format("[%s] peers have changed for backend %s", self.name, backend.name))
self.traffic_shaping_policy = backend.trafficShapingPolicy
self.alternative_backends = backend.alternativeBackends
self.peers = backend.endpoints
self.ewma = {}
self.ewma_last_touched_at = {}
for _, endpoint_string in ipairs(normalized_endpoints_removed) do
ngx.shared.balancer_ewma:delete(endpoint_string)
ngx.shared.balancer_ewma_last_touched_at:delete(endpoint_string)
end
local slow_start_ewma = calculate_slow_start_ewma(self)
if slow_start_ewma ~= nil then
local now = ngx.now()
for _, endpoint_string in ipairs(normalized_endpoints_added) do
store_stats(endpoint_string, slow_start_ewma, now)
end
end
end
function _M.new(self, backend)
local o = {
peers = backend.endpoints,
ewma = {},
ewma_last_touched_at = {},
traffic_shaping_policy = backend.trafficShapingPolicy,
alternative_backends = backend.alternativeBackends,
}

View file

@ -1,91 +1,151 @@
local util = require("util")
local original_ngx = ngx
local function reset_ngx()
_G.ngx = original_ngx
end
local function mock_ngx(mock)
local _ngx = mock
setmetatable(_ngx, { __index = ngx })
_G.ngx = _ngx
end
local function flush_all_ewma_stats()
ngx.shared.balancer_ewma:flush_all()
ngx.shared.balancer_ewma_last_touched_at:flush_all()
end
local function store_ewma_stats(endpoint_string, ewma, touched_at)
ngx.shared.balancer_ewma:set(endpoint_string, ewma)
ngx.shared.balancer_ewma_last_touched_at:set(endpoint_string, touched_at)
end
local function assert_ewma_stats(endpoint_string, ewma, touched_at)
assert.are.equals(ewma, ngx.shared.balancer_ewma:get(endpoint_string))
assert.are.equals(touched_at, ngx.shared.balancer_ewma_last_touched_at:get(endpoint_string))
end
describe("Balancer ewma", function()
local balancer_ewma = require("balancer.ewma")
local ngx_now = 1543238266
local backend, instance
before_each(function()
mock_ngx({ now = function() return ngx_now end, var = { balancer_ewma_score = -1 } })
backend = {
name = "namespace-service-port", ["load-balance"] = "ewma",
endpoints = {
{ address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
store_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1)
store_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5)
store_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20)
instance = balancer_ewma:new(backend)
end)
after_each(function()
reset_ngx()
flush_all_ewma_stats()
end)
describe("after_balance()", function()
local ngx_now = 1543238266
_G.ngx.now = function() return ngx_now end
_G.ngx.var = { upstream_response_time = "0.25", upstream_connect_time = "0.02", upstream_addr = "10.184.7.40:8080" }
it("updates EWMA stats", function()
local backend = {
name = "my-dummy-backend", ["load-balance"] = "ewma",
endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } }
}
local instance = balancer_ewma:new(backend)
ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1" }
instance:after_balance()
assert.equal(0.27, instance.ewma[ngx.var.upstream_addr])
assert.equal(ngx_now, instance.ewma_last_touched_at[ngx.var.upstream_addr])
local weight = math.exp(-5 / 10)
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
end)
end)
describe("balance()", function()
it("returns single endpoint when the given backend has only one endpoint", function()
local backend = {
name = "my-dummy-backend", ["load-balance"] = "ewma",
endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } }
}
local instance = balancer_ewma:new(backend)
local single_endpoint_backend = util.deepcopy(backend)
table.remove(single_endpoint_backend.endpoints, 3)
table.remove(single_endpoint_backend.endpoints, 2)
local single_endpoint_instance = balancer_ewma:new(single_endpoint_backend)
local peer = instance:balance()
assert.equal("10.184.7.40:8080", peer)
local peer = single_endpoint_instance:balance()
assert.are.equals("10.10.10.1:8080", peer)
assert.are.equals(-1, ngx.var.balancer_ewma_score)
end)
it("picks the endpoint with lowest score when there two of them", function()
local backend = {
name = "my-dummy-backend", ["load-balance"] = "ewma",
endpoints = {
{ address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.184.97.100", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
local instance = balancer_ewma:new(backend)
instance.ewma = { ["10.184.7.40:8080"] = 0.5, ["10.184.97.100:8080"] = 0.3 }
instance.ewma_last_touched_at = { ["10.184.7.40:8080"] = ngx.now(), ["10.184.97.100:8080"] = ngx.now() }
it("picks the endpoint with lowest decayed score", function()
local two_endpoints_backend = util.deepcopy(backend)
table.remove(two_endpoints_backend.endpoints, 2)
local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend)
local peer = instance:balance()
assert.equal("10.184.97.100:8080", peer)
local peer = two_endpoints_instance:balance()
-- even though 10.10.10.1:8080 has a lower ewma score
-- algorithm picks 10.10.10.3:8080 because its decayed score is even lower
assert.equal("10.10.10.3:8080", peer)
assert.are.equals(0.16240233988393523723, ngx.var.balancer_ewma_score)
end)
end)
describe("sync()", function()
local backend, instance
before_each(function()
backend = {
name = "my-dummy-backend", ["load-balance"] = "ewma",
endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } }
}
instance = balancer_ewma:new(backend)
end)
it("does nothing when endpoints do not change", function()
local new_backend = {
endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } }
}
instance:sync(new_backend)
end)
it("updates endpoints", function()
local new_backend = {
endpoints = {
{ address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.184.97.100", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
instance:sync(new_backend)
assert.are.same(new_backend.endpoints, instance.peers)
end)
it("resets stats", function()
it("does not reset stats when endpoints do not change", function()
local new_backend = util.deepcopy(backend)
new_backend.endpoints[1].maxFails = 3
instance:sync(new_backend)
assert.are.same(new_backend.endpoints, instance.peers)
assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1)
assert_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5)
assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20)
end)
it("updates peers, deletes stats for old endpoints and sets average ewma score to new ones", function()
local new_backend = util.deepcopy(backend)
-- existing endpoint 10.10.10.2 got deleted
-- and replaced with 10.10.10.4
new_backend.endpoints[2].address = "10.10.10.4"
-- and there's one new extra endpoint
table.insert(new_backend.endpoints, { address = "10.10.10.5", port = "8080", maxFails = 0, failTimeout = 0 })
instance:sync(new_backend)
assert.are.same(new_backend.endpoints, instance.peers)
assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1)
assert_ewma_stats("10.10.10.2:8080", nil, nil)
assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20)
local slow_start_ewma = (0.2 + 1.2) / 2
assert_ewma_stats("10.10.10.4:8080", slow_start_ewma, ngx_now)
assert_ewma_stats("10.10.10.5:8080", slow_start_ewma, ngx_now)
end)
it("does not set slow_start_ewma when there is no existing ewma", function()
local new_backend = util.deepcopy(backend)
table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 })
-- when the LB algorithm instance is just instantiated it won't have any
-- ewma value set for the initial endpoints (because it has not processed any request yet),
-- this test is trying to simulate that by flushing existing ewma values
flush_all_ewma_stats()
instance:sync(new_backend)
assert_ewma_stats("10.10.10.1:8080", nil, nil)
assert_ewma_stats("10.10.10.2:8080", nil, nil)
assert_ewma_stats("10.10.10.3:8080", nil, nil)
assert_ewma_stats("10.10.10.4:8080", nil, nil)
end)
end)
end)

View file

@ -12,24 +12,87 @@ end
describe("lua_ngx_var", function()
local util = require("util")
before_each(function()
mock_ngx({ var = { remote_addr = "192.168.1.1", [1] = "nginx/regexp/1/group/capturing" } })
end)
after_each(function()
reset_ngx()
package.loaded["monitor"] = nil
end)
it("returns value of nginx var by key", function()
assert.equal("192.168.1.1", util.lua_ngx_var("$remote_addr"))
describe("lua_ngx_var", function()
before_each(function()
mock_ngx({ var = { remote_addr = "192.168.1.1", [1] = "nginx/regexp/1/group/capturing" } })
end)
it("returns value of nginx var by key", function()
assert.equal("192.168.1.1", util.lua_ngx_var("$remote_addr"))
end)
it("returns value of nginx var when key is number", function()
assert.equal("nginx/regexp/1/group/capturing", util.lua_ngx_var("$1"))
end)
it("returns nil when variable is not defined", function()
assert.equal(nil, util.lua_ngx_var("$foo_bar"))
end)
end)
it("returns value of nginx var when key is number", function()
assert.equal("nginx/regexp/1/group/capturing", util.lua_ngx_var("$1"))
end)
describe("diff_endpoints", function()
it("returns removed and added endpoints", function()
local old = {
{ address = "10.10.10.1", port = "8080" },
{ address = "10.10.10.2", port = "8080" },
{ address = "10.10.10.3", port = "8080" },
}
local new = {
{ address = "10.10.10.1", port = "8080" },
{ address = "10.10.10.2", port = "8081" },
{ address = "11.10.10.2", port = "8080" },
{ address = "11.10.10.3", port = "8080" },
}
local expected_added = { "10.10.10.2:8081", "11.10.10.2:8080", "11.10.10.3:8080" }
table.sort(expected_added)
local expected_removed = { "10.10.10.2:8080", "10.10.10.3:8080" }
table.sort(expected_removed)
it("returns nil when variable is not defined", function()
assert.equal(nil, util.lua_ngx_var("$foo_bar"))
local added, removed = util.diff_endpoints(old, new)
table.sort(added)
table.sort(removed)
assert.are.same(expected_added, added)
assert.are.same(expected_removed, removed)
end)
it("returns empty results for empty inputs", function()
local added, removed = util.diff_endpoints({}, {})
assert.are.same({}, added)
assert.are.same({}, removed)
end)
it("returns empty results for same inputs", function()
local old = {
{ address = "10.10.10.1", port = "8080" },
{ address = "10.10.10.2", port = "8080" },
{ address = "10.10.10.3", port = "8080" },
}
local new = util.deepcopy(old)
local added, removed = util.diff_endpoints(old, new)
assert.are.same({}, added)
assert.are.same({}, removed)
end)
it("handles endpoints with nil attribute", function()
local old = {
{ address = nil, port = "8080" },
{ address = "10.10.10.2", port = "8080" },
{ address = "10.10.10.3", port = "8080" },
}
local new = util.deepcopy(old)
new[2].port = nil
local added, removed = util.diff_endpoints(old, new)
assert.are.same({ "10.10.10.2:nil" }, added)
assert.are.same({ "10.10.10.2:8080" }, removed)
end)
end)
end)

View file

@ -1,5 +1,6 @@
local string_len = string.len
local string_sub = string.sub
local string_format = string.format
local _M = {}
@ -26,6 +27,44 @@ function _M.lua_ngx_var(ngx_var)
return ngx.var[var_name]
end
-- normalize_endpoints takes endpoints as an array of endpoint objects
-- and returns a table where keys are string that's endpoint.address .. ":" .. endpoint.port
-- and values are all true
local function normalize_endpoints(endpoints)
local normalized_endpoints = {}
for _, endpoint in pairs(endpoints) do
local endpoint_string = string_format("%s:%s", endpoint.address, endpoint.port)
normalized_endpoints[endpoint_string] = true
end
return normalized_endpoints
end
-- diff_endpoints compares old and new
-- and as a first argument returns what endpoints are in new
-- but are not in old, and as a second argument it returns
-- what endpoints are in old but are in new.
-- Both return values are normalized (ip:port).
function _M.diff_endpoints(old, new)
local endpoints_added, endpoints_removed = {}, {}
local normalized_old, normalized_new = normalize_endpoints(old), normalize_endpoints(new)
for endpoint_string, _ in pairs(normalized_old) do
if not normalized_new[endpoint_string] then
table.insert(endpoints_removed, endpoint_string)
end
end
for endpoint_string, _ in pairs(normalized_new) do
if not normalized_old[endpoint_string] then
table.insert(endpoints_added, endpoint_string)
end
end
return endpoints_added, endpoints_removed
end
-- this implementation is taken from
-- https://web.archive.org/web/20131225070434/http://snippets.luacode.org/snippets/Deep_Comparison_of_Two_Values_3
-- and modified for use in this project