This commit is contained in:
纪卓志 2025-02-17 09:50:35 -08:00 committed by GitHub
commit 19a1fd74d8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 114 additions and 14 deletions

View file

@ -80,6 +80,8 @@ var (
"balancer_ewma": 10240,
"balancer_ewma_last_touched_at": 10240,
"balancer_ewma_locks": 1024,
"balancer_ewma_total": 10240,
"balancer_ewma_failed": 10240,
"certificate_servers": 5120,
"ocsp_response_cache": 5120, // keep this same as certificate_servers
}

View file

@ -25,6 +25,7 @@ local INFO = ngx.INFO
local DECAY_TIME = 10 -- this value is in seconds
local LOCK_KEY = ":ewma_key"
local PICK_SET_SIZE = 2
local MIN_SUCCESS_RATE = 0.1
local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1})
if not ewma_lock then
@ -62,7 +63,7 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
return ewma
end
local function store_stats(upstream, ewma, now)
local function store_stats(upstream, ewma, total_ewma, failed_ewma, now)
local success, err, forcible = ngx.shared.balancer_ewma_last_touched_at:set(upstream, now)
if not success then
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set failed " .. err)
@ -78,31 +79,64 @@ local function store_stats(upstream, ewma, now)
if forcible then
ngx.log(ngx.WARN, "balancer_ewma:set valid items forcibly overwritten")
end
success, err, forcible = ngx.shared.balancer_ewma_total:set(upstream, total_ewma)
if not success then
ngx.log(ngx.WARN, "balancer_ewma_total:set failed " .. err)
end
if forcible then
ngx.log(ngx.WARN, "balancer_ewma_total:set valid items forcibly overwritten")
end
success, err, forcible = ngx.shared.balancer_ewma_failed:set(upstream, failed_ewma)
if not success then
ngx.log(ngx.WARN, "balancer_ewma_failed:set failed " .. err)
end
if forcible then
ngx.log(ngx.WARN, "balancer_ewma_failed:set valid items forcibly overwritten")
end
end
local function get_or_update_ewma(upstream, rtt, update)
local function get_or_update_ewma(upstream, rtt, failed, update)
local lock_err = nil
if update then
lock_err = lock(upstream)
end
local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
local total_ewma = ngx.shared.balancer_ewma_total:get(upstream) or 0
local failed_ewma = ngx.shared.balancer_ewma_failed:get(upstream) or 0
if lock_err ~= nil then
return ewma, lock_err
end
local now = ngx.now()
local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
if not update then
return ewma, nil
local failed_delta = 0
if failed then
failed_delta = 1
end
store_stats(upstream, ewma, now)
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
-- make sure failed rate is always decayed
total_ewma = decay_ewma(total_ewma, last_touched_at, 1, now)
failed_ewma = decay_ewma(failed_ewma, last_touched_at, failed_delta, now)
if update then
store_stats(upstream, ewma, total_ewma, failed_ewma, now)
end
unlock()
return ewma, nil
local success_rate = 1 - failed_ewma / total_ewma
if success_rate < MIN_SUCCESS_RATE then
success_rate = MIN_SUCCESS_RATE
end
return ewma / success_rate, nil
end
@ -115,7 +149,7 @@ local function score(upstream)
-- Original implementation used names
-- Endpoints don't have names, so passing in IP:Port as key instead
local upstream_name = get_upstream_name(upstream)
return get_or_update_ewma(upstream_name, 0, false)
return get_or_update_ewma(upstream_name, 0, false, false)
end
-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
@ -222,14 +256,17 @@ end
function _M.after_balance(_)
local response_time = tonumber(split.get_last_value(ngx.var.upstream_response_time)) or 0
local connect_time = tonumber(split.get_last_value(ngx.var.upstream_connect_time)) or 0
local status = tonumber(split.get_last_value(ngx.var.status)) or 0
local rtt = connect_time + response_time
local upstream = split.get_last_value(ngx.var.upstream_addr)
local failed = status >= 400
if util.is_blank(upstream) then
return
end
get_or_update_ewma(upstream, rtt, true)
return get_or_update_ewma(upstream, rtt, failed, true)
end
function _M.sync(self, backend)
@ -250,6 +287,8 @@ function _M.sync(self, backend)
for _, endpoint_string in ipairs(normalized_endpoints_removed) do
ngx.shared.balancer_ewma:delete(endpoint_string)
ngx.shared.balancer_ewma_total:delete(endpoint_string)
ngx.shared.balancer_ewma_failed:delete(endpoint_string)
ngx.shared.balancer_ewma_last_touched_at:delete(endpoint_string)
end
@ -257,11 +296,15 @@ function _M.sync(self, backend)
if slow_start_ewma ~= nil then
local now = ngx.now()
for _, endpoint_string in ipairs(normalized_endpoints_added) do
store_stats(endpoint_string, slow_start_ewma, now)
store_stats(endpoint_string, slow_start_ewma, 0, 0, now)
end
end
end
function _M.score(upstream)
return score(upstream)
end
function _M.new(self, backend)
local o = {
peers = backend.endpoints,

View file

@ -16,8 +16,10 @@ local function flush_all_ewma_stats()
ngx.shared.balancer_ewma_last_touched_at:flush_all()
end
local function store_ewma_stats(endpoint_string, ewma, touched_at)
local function store_ewma_stats(endpoint_string, ewma, total_ewma, failed_ewma, touched_at)
ngx.shared.balancer_ewma:set(endpoint_string, ewma)
ngx.shared.balancer_ewma_total:set(endpoint_string, total_ewma)
ngx.shared.balancer_ewma_failed:set(endpoint_string, failed_ewma)
ngx.shared.balancer_ewma_last_touched_at:set(endpoint_string, touched_at)
end
@ -45,9 +47,9 @@ describe("Balancer ewma", function()
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
store_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1)
store_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5)
store_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20)
store_ewma_stats("10.10.10.1:8080", 0.2, 0, 0, ngx_now - 1)
store_ewma_stats("10.10.10.2:8080", 0.3, 0, 0, ngx_now - 5)
store_ewma_stats("10.10.10.3:8080", 1.2, 0, 0, ngx_now - 20)
instance = balancer_ewma:new(backend)
end)
@ -81,6 +83,39 @@ describe("Balancer ewma", function()
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080"))
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080"))
end)
it("updates EWMA stats with failed", function()
ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" }
local score = instance:after_balance()
local weight = math.exp(-5 / 10)
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
assert.are.equals(score, expected_ewma / 0.1)
end)
it("updates EWMA stats with multi failed requests", function()
ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" }
store_ewma_stats("10.10.10.2:8080", 0.3, 1, 0, ngx_now - 5)
local score = instance:after_balance()
local weight = math.exp(-5 / 10)
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
local total_ewma = ngx.shared.balancer_ewma_total:get(ngx.var.upstream_addr)
local failed_ewma = ngx.shared.balancer_ewma_failed:get(ngx.var.upstream_addr)
local success_rate = 1 - failed_ewma/total_ewma
assert.are.not_equals(0.1, success_rate)
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
assert.are.equals(score, expected_ewma / success_rate)
end)
end)
describe("balance()", function()
@ -135,6 +170,20 @@ describe("Balancer ewma", function()
local peer = two_endpoints_instance:balance()
assert.equal("10.10.10.3:8080", peer)
end)
it("the success rate should keep increasing", function()
local new_backend = util.deepcopy(backend)
table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 })
local offsets = {-15, -10, -5}
local pre = 0;
for _, offset in ipairs(offsets) do
store_ewma_stats("10.10.10.4:8080", 1, 10, 1, ngx_now + offset)
local score = balancer_ewma.score({ address = "10.10.10.4", port = "8080" })
assert.is.True(score > pre)
pre = score
end
end)
end)
describe("sync()", function()

View file

@ -21,6 +21,8 @@ http {
lua_package_path "/etc/nginx/lua/?.lua;;";
lua_shared_dict balancer_ewma 10M;
lua_shared_dict balancer_ewma_total 10M;
lua_shared_dict balancer_ewma_failed 10M;
lua_shared_dict balancer_ewma_last_touched_at 10M;
lua_shared_dict balancer_ewma_locks 1M;
lua_shared_dict certificate_data 20M;

View file

@ -38,6 +38,8 @@ http {
lua_package_path "/etc/nginx/lua/?.lua;;";
lua_shared_dict balancer_ewma 10M;
lua_shared_dict balancer_ewma_total 10M;
lua_shared_dict balancer_ewma_failed 10M;
lua_shared_dict balancer_ewma_last_touched_at 10M;
lua_shared_dict balancer_ewma_locks 1M;
lua_shared_dict certificate_data 20M;

View file

@ -32,6 +32,8 @@ SHDICT_ARGS=(
"--shdict" "certificate_servers 1M"
"--shdict" "ocsp_response_cache 1M"
"--shdict" "balancer_ewma 1M"
"--shdict" "balancer_ewma_total 1M"
"--shdict" "balancer_ewma_failed 1M"
"--shdict" "quota_tracker 1M"
"--shdict" "high_throughput_tracker 1M"
"--shdict" "balancer_ewma_last_touched_at 1M"