Fix EWMA by the success rate when fail-fast
This commit is contained in:
parent
ec5b4b4fd0
commit
7b64e477ee
6 changed files with 99 additions and 11 deletions
|
@ -78,6 +78,8 @@ var (
|
|||
"balancer_ewma": 10240,
|
||||
"balancer_ewma_last_touched_at": 10240,
|
||||
"balancer_ewma_locks": 1024,
|
||||
"balancer_ewma_total": 10240,
|
||||
"balancer_ewma_failed": 10240,
|
||||
"certificate_servers": 5120,
|
||||
"ocsp_response_cache": 5120, // keep this same as certificate_servers
|
||||
"global_throttle_cache": 10240,
|
||||
|
|
|
@ -25,6 +25,7 @@ local INFO = ngx.INFO
|
|||
local DECAY_TIME = 10 -- this value is in seconds
|
||||
local LOCK_KEY = ":ewma_key"
|
||||
local PICK_SET_SIZE = 2
|
||||
local MIN_SUCCESS_RATE = 0.1
|
||||
|
||||
local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1})
|
||||
if not ewma_lock then
|
||||
|
@ -62,7 +63,7 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
|
|||
return ewma
|
||||
end
|
||||
|
||||
local function store_stats(upstream, ewma, now)
|
||||
local function store_stats(upstream, ewma, total_ewma, failed_ewma, now)
|
||||
local success, err, forcible = ngx.shared.balancer_ewma_last_touched_at:set(upstream, now)
|
||||
if not success then
|
||||
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set failed " .. err)
|
||||
|
@ -78,31 +79,70 @@ local function store_stats(upstream, ewma, now)
|
|||
if forcible then
|
||||
ngx.log(ngx.WARN, "balancer_ewma:set valid items forcibly overwritten")
|
||||
end
|
||||
|
||||
success, err, forcible = ngx.shared.balancer_ewma_total:set(upstream, total_ewma)
|
||||
if not success then
|
||||
ngx.log(ngx.WARN, "balancer_ewma_total:set failed " .. err)
|
||||
end
|
||||
if forcible then
|
||||
ngx.log(ngx.WARN, "balancer_ewma_total:set valid items forcibly overwritten")
|
||||
end
|
||||
|
||||
success, err, forcible = ngx.shared.balancer_ewma_failed:set(upstream, failed_ewma)
|
||||
if not success then
|
||||
ngx.log(ngx.WARN, "balancer_ewma_failed:set failed " .. err)
|
||||
end
|
||||
if forcible then
|
||||
ngx.log(ngx.WARN, "balancer_ewma_failed:set valid items forcibly overwritten")
|
||||
end
|
||||
end
|
||||
|
||||
local function get_or_update_ewma(upstream, rtt, update)
|
||||
local function get_or_update_ewma(upstream, rtt, failed, update)
|
||||
local lock_err = nil
|
||||
if update then
|
||||
lock_err = lock(upstream)
|
||||
end
|
||||
|
||||
local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
|
||||
local total_ewma = ngx.shared.balancer_ewma_total:get(upstream) or 0
|
||||
local failed_ewma = ngx.shared.balancer_ewma_failed:get(upstream) or 0
|
||||
|
||||
if lock_err ~= nil then
|
||||
return ewma, lock_err
|
||||
end
|
||||
|
||||
local now = ngx.now()
|
||||
local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
|
||||
|
||||
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
|
||||
|
||||
if rtt > 0 then
|
||||
total_ewma = decay_ewma(total_ewma, last_touched_at, 1, now)
|
||||
end
|
||||
|
||||
if failed then
|
||||
failed_ewma = decay_ewma(failed_ewma, last_touched_at, 1, now)
|
||||
end
|
||||
|
||||
if not update then
|
||||
return ewma, nil
|
||||
end
|
||||
|
||||
store_stats(upstream, ewma, now)
|
||||
store_stats(upstream, ewma, total_ewma, failed_ewma, now)
|
||||
|
||||
unlock()
|
||||
|
||||
return ewma, nil
|
||||
if rtt == 0 then
|
||||
return ewma, nil
|
||||
else
|
||||
local success_rate = 1 - failed_ewma / total_ewma
|
||||
|
||||
if success_rate < MIN_SUCCESS_RATE then
|
||||
success_rate = MIN_SUCCESS_RATE
|
||||
end
|
||||
|
||||
return ewma / success_rate, nil
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
@ -115,7 +155,7 @@ local function score(upstream)
|
|||
-- Original implementation used names
|
||||
-- Endpoints don't have names, so passing in IP:Port as key instead
|
||||
local upstream_name = get_upstream_name(upstream)
|
||||
return get_or_update_ewma(upstream_name, 0, false)
|
||||
return get_or_update_ewma(upstream_name, 0, false, false)
|
||||
end
|
||||
|
||||
-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
|
||||
|
@ -222,14 +262,17 @@ end
|
|||
function _M.after_balance(_)
|
||||
local response_time = tonumber(split.get_last_value(ngx.var.upstream_response_time)) or 0
|
||||
local connect_time = tonumber(split.get_last_value(ngx.var.upstream_connect_time)) or 0
|
||||
local status = tonumber(split.get_last_value(ngx.var.status)) or 0
|
||||
|
||||
local rtt = connect_time + response_time
|
||||
local upstream = split.get_last_value(ngx.var.upstream_addr)
|
||||
local failed = status >= 400
|
||||
|
||||
if util.is_blank(upstream) then
|
||||
return
|
||||
end
|
||||
|
||||
get_or_update_ewma(upstream, rtt, true)
|
||||
return get_or_update_ewma(upstream, rtt, failed, true)
|
||||
end
|
||||
|
||||
function _M.sync(self, backend)
|
||||
|
@ -250,6 +293,8 @@ function _M.sync(self, backend)
|
|||
|
||||
for _, endpoint_string in ipairs(normalized_endpoints_removed) do
|
||||
ngx.shared.balancer_ewma:delete(endpoint_string)
|
||||
ngx.shared.balancer_ewma_total:delete(endpoint_string)
|
||||
ngx.shared.balancer_ewma_failed:delete(endpoint_string)
|
||||
ngx.shared.balancer_ewma_last_touched_at:delete(endpoint_string)
|
||||
end
|
||||
|
||||
|
@ -257,7 +302,7 @@ function _M.sync(self, backend)
|
|||
if slow_start_ewma ~= nil then
|
||||
local now = ngx.now()
|
||||
for _, endpoint_string in ipairs(normalized_endpoints_added) do
|
||||
store_stats(endpoint_string, slow_start_ewma, now)
|
||||
store_stats(endpoint_string, slow_start_ewma, 0, 0, now)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -16,8 +16,10 @@ local function flush_all_ewma_stats()
|
|||
ngx.shared.balancer_ewma_last_touched_at:flush_all()
|
||||
end
|
||||
|
||||
local function store_ewma_stats(endpoint_string, ewma, touched_at)
|
||||
local function store_ewma_stats(endpoint_string, ewma, total_ewma, failed_ewma, touched_at)
|
||||
ngx.shared.balancer_ewma:set(endpoint_string, ewma)
|
||||
ngx.shared.balancer_ewma_total:set(endpoint_string, total_ewma)
|
||||
ngx.shared.balancer_ewma_failed:set(endpoint_string, failed_ewma)
|
||||
ngx.shared.balancer_ewma_last_touched_at:set(endpoint_string, touched_at)
|
||||
end
|
||||
|
||||
|
@ -45,9 +47,9 @@ describe("Balancer ewma", function()
|
|||
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
|
||||
}
|
||||
}
|
||||
store_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1)
|
||||
store_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5)
|
||||
store_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20)
|
||||
store_ewma_stats("10.10.10.1:8080", 0.2, 0, 0, ngx_now - 1)
|
||||
store_ewma_stats("10.10.10.2:8080", 0.3, 0, 0, ngx_now - 5)
|
||||
store_ewma_stats("10.10.10.3:8080", 1.2, 0, 0, ngx_now - 20)
|
||||
|
||||
instance = balancer_ewma:new(backend)
|
||||
end)
|
||||
|
@ -81,6 +83,39 @@ describe("Balancer ewma", function()
|
|||
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080"))
|
||||
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080"))
|
||||
end)
|
||||
|
||||
it("updates EWMA stats with failed", function()
|
||||
ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" }
|
||||
|
||||
local score = instance:after_balance()
|
||||
|
||||
local weight = math.exp(-5 / 10)
|
||||
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
|
||||
|
||||
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
|
||||
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
|
||||
assert.are.equals(score, expected_ewma / 0.1)
|
||||
end)
|
||||
|
||||
it("updates EWMA stats with multi failed requests", function()
|
||||
ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" }
|
||||
|
||||
store_ewma_stats("10.10.10.2:8080", 0.3, 1, 0, ngx_now - 5)
|
||||
|
||||
local score = instance:after_balance()
|
||||
|
||||
local weight = math.exp(-5 / 10)
|
||||
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
|
||||
|
||||
local total_ewma = ngx.shared.balancer_ewma_total:get(ngx.var.upstream_addr)
|
||||
local failed_ewma = ngx.shared.balancer_ewma_failed:get(ngx.var.upstream_addr)
|
||||
local success_rate = 1 - failed_ewma/total_ewma
|
||||
|
||||
assert.are.not_equals(0.1, success_rate)
|
||||
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
|
||||
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
|
||||
assert.are.equals(score, expected_ewma / success_rate)
|
||||
end)
|
||||
end)
|
||||
|
||||
describe("balance()", function()
|
||||
|
|
|
@ -21,6 +21,8 @@ http {
|
|||
lua_package_path "/etc/nginx/lua/?.lua;;";
|
||||
|
||||
lua_shared_dict balancer_ewma 10M;
|
||||
lua_shared_dict balancer_ewma_total 10M;
|
||||
lua_shared_dict balancer_ewma_failed 10M;
|
||||
lua_shared_dict balancer_ewma_last_touched_at 10M;
|
||||
lua_shared_dict balancer_ewma_locks 1M;
|
||||
lua_shared_dict certificate_data 20M;
|
||||
|
|
|
@ -38,6 +38,8 @@ http {
|
|||
lua_package_path "/etc/nginx/lua/?.lua;;";
|
||||
|
||||
lua_shared_dict balancer_ewma 10M;
|
||||
lua_shared_dict balancer_ewma_total 10M;
|
||||
lua_shared_dict balancer_ewma_failed 10M;
|
||||
lua_shared_dict balancer_ewma_last_touched_at 10M;
|
||||
lua_shared_dict balancer_ewma_locks 1M;
|
||||
lua_shared_dict certificate_data 20M;
|
||||
|
|
|
@ -32,6 +32,8 @@ SHDICT_ARGS=(
|
|||
"--shdict" "certificate_servers 1M"
|
||||
"--shdict" "ocsp_response_cache 1M"
|
||||
"--shdict" "balancer_ewma 1M"
|
||||
"--shdict" "balancer_ewma_total 1M"
|
||||
"--shdict" "balancer_ewma_failed 1M"
|
||||
"--shdict" "quota_tracker 1M"
|
||||
"--shdict" "high_throughput_tracker 1M"
|
||||
"--shdict" "balancer_ewma_last_touched_at 1M"
|
||||
|
|
Loading…
Reference in a new issue