diff --git a/internal/ingress/controller/template/configmap.go b/internal/ingress/controller/template/configmap.go index febf20be0..6ffff4280 100644 --- a/internal/ingress/controller/template/configmap.go +++ b/internal/ingress/controller/template/configmap.go @@ -80,6 +80,8 @@ var ( "balancer_ewma": 10240, "balancer_ewma_last_touched_at": 10240, "balancer_ewma_locks": 1024, + "balancer_ewma_total": 10240, + "balancer_ewma_failed": 10240, "certificate_servers": 5120, "ocsp_response_cache": 5120, // keep this same as certificate_servers } diff --git a/rootfs/etc/nginx/lua/balancer/ewma.lua b/rootfs/etc/nginx/lua/balancer/ewma.lua index 681866dc1..203b76412 100644 --- a/rootfs/etc/nginx/lua/balancer/ewma.lua +++ b/rootfs/etc/nginx/lua/balancer/ewma.lua @@ -25,6 +25,7 @@ local INFO = ngx.INFO local DECAY_TIME = 10 -- this value is in seconds local LOCK_KEY = ":ewma_key" local PICK_SET_SIZE = 2 +local MIN_SUCCESS_RATE = 0.1 local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1}) if not ewma_lock then @@ -62,7 +63,7 @@ local function decay_ewma(ewma, last_touched_at, rtt, now) return ewma end -local function store_stats(upstream, ewma, now) +local function store_stats(upstream, ewma, total_ewma, failed_ewma, now) local success, err, forcible = ngx.shared.balancer_ewma_last_touched_at:set(upstream, now) if not success then ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set failed " .. err) @@ -78,31 +79,64 @@ local function store_stats(upstream, ewma, now) if forcible then ngx.log(ngx.WARN, "balancer_ewma:set valid items forcibly overwritten") end + + success, err, forcible = ngx.shared.balancer_ewma_total:set(upstream, total_ewma) + if not success then + ngx.log(ngx.WARN, "balancer_ewma_total:set failed " .. err) + end + if forcible then + ngx.log(ngx.WARN, "balancer_ewma_total:set valid items forcibly overwritten") + end + + success, err, forcible = ngx.shared.balancer_ewma_failed:set(upstream, failed_ewma) + if not success then + ngx.log(ngx.WARN, "balancer_ewma_failed:set failed " .. err) + end + if forcible then + ngx.log(ngx.WARN, "balancer_ewma_failed:set valid items forcibly overwritten") + end end -local function get_or_update_ewma(upstream, rtt, update) +local function get_or_update_ewma(upstream, rtt, failed, update) local lock_err = nil if update then lock_err = lock(upstream) end + local ewma = ngx.shared.balancer_ewma:get(upstream) or 0 + local total_ewma = ngx.shared.balancer_ewma_total:get(upstream) or 0 + local failed_ewma = ngx.shared.balancer_ewma_failed:get(upstream) or 0 + if lock_err ~= nil then return ewma, lock_err end local now = ngx.now() local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0 - ewma = decay_ewma(ewma, last_touched_at, rtt, now) - if not update then - return ewma, nil + local failed_delta = 0 + if failed then + failed_delta = 1 end - store_stats(upstream, ewma, now) + ewma = decay_ewma(ewma, last_touched_at, rtt, now) + -- make sure failed rate is always decayed + total_ewma = decay_ewma(total_ewma, last_touched_at, 1, now) + failed_ewma = decay_ewma(failed_ewma, last_touched_at, failed_delta, now) + + if update then + store_stats(upstream, ewma, total_ewma, failed_ewma, now) + end unlock() - return ewma, nil + local success_rate = 1 - failed_ewma / total_ewma + + if success_rate < MIN_SUCCESS_RATE then + success_rate = MIN_SUCCESS_RATE + end + + return ewma / success_rate, nil end @@ -115,7 +149,7 @@ local function score(upstream) -- Original implementation used names -- Endpoints don't have names, so passing in IP:Port as key instead local upstream_name = get_upstream_name(upstream) - return get_or_update_ewma(upstream_name, 0, false) + return get_or_update_ewma(upstream_name, 0, false, false) end -- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle @@ -222,14 +256,17 @@ end function _M.after_balance(_) local response_time = tonumber(split.get_last_value(ngx.var.upstream_response_time)) or 0 local connect_time = tonumber(split.get_last_value(ngx.var.upstream_connect_time)) or 0 + local status = tonumber(split.get_last_value(ngx.var.status)) or 0 + local rtt = connect_time + response_time local upstream = split.get_last_value(ngx.var.upstream_addr) + local failed = status >= 400 if util.is_blank(upstream) then return end - get_or_update_ewma(upstream, rtt, true) + return get_or_update_ewma(upstream, rtt, failed, true) end function _M.sync(self, backend) @@ -250,6 +287,8 @@ function _M.sync(self, backend) for _, endpoint_string in ipairs(normalized_endpoints_removed) do ngx.shared.balancer_ewma:delete(endpoint_string) + ngx.shared.balancer_ewma_total:delete(endpoint_string) + ngx.shared.balancer_ewma_failed:delete(endpoint_string) ngx.shared.balancer_ewma_last_touched_at:delete(endpoint_string) end @@ -257,11 +296,15 @@ function _M.sync(self, backend) if slow_start_ewma ~= nil then local now = ngx.now() for _, endpoint_string in ipairs(normalized_endpoints_added) do - store_stats(endpoint_string, slow_start_ewma, now) + store_stats(endpoint_string, slow_start_ewma, 0, 0, now) end end end +function _M.score(upstream) + return score(upstream) +end + function _M.new(self, backend) local o = { peers = backend.endpoints, diff --git a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua index 6af588396..c27a17dbe 100644 --- a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua @@ -16,8 +16,10 @@ local function flush_all_ewma_stats() ngx.shared.balancer_ewma_last_touched_at:flush_all() end -local function store_ewma_stats(endpoint_string, ewma, touched_at) +local function store_ewma_stats(endpoint_string, ewma, total_ewma, failed_ewma, touched_at) ngx.shared.balancer_ewma:set(endpoint_string, ewma) + ngx.shared.balancer_ewma_total:set(endpoint_string, total_ewma) + ngx.shared.balancer_ewma_failed:set(endpoint_string, failed_ewma) ngx.shared.balancer_ewma_last_touched_at:set(endpoint_string, touched_at) end @@ -45,9 +47,9 @@ describe("Balancer ewma", function() { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, } } - store_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1) - store_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5) - store_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20) + store_ewma_stats("10.10.10.1:8080", 0.2, 0, 0, ngx_now - 1) + store_ewma_stats("10.10.10.2:8080", 0.3, 0, 0, ngx_now - 5) + store_ewma_stats("10.10.10.3:8080", 1.2, 0, 0, ngx_now - 20) instance = balancer_ewma:new(backend) end) @@ -81,6 +83,39 @@ describe("Balancer ewma", function() assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080")) assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080")) end) + + it("updates EWMA stats with failed", function() + ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" } + + local score = instance:after_balance() + + local weight = math.exp(-5 / 10) + local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) + + assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) + assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) + assert.are.equals(score, expected_ewma / 0.1) + end) + + it("updates EWMA stats with multi failed requests", function() + ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" } + + store_ewma_stats("10.10.10.2:8080", 0.3, 1, 0, ngx_now - 5) + + local score = instance:after_balance() + + local weight = math.exp(-5 / 10) + local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) + + local total_ewma = ngx.shared.balancer_ewma_total:get(ngx.var.upstream_addr) + local failed_ewma = ngx.shared.balancer_ewma_failed:get(ngx.var.upstream_addr) + local success_rate = 1 - failed_ewma/total_ewma + + assert.are.not_equals(0.1, success_rate) + assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) + assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) + assert.are.equals(score, expected_ewma / success_rate) + end) end) describe("balance()", function() @@ -135,6 +170,20 @@ describe("Balancer ewma", function() local peer = two_endpoints_instance:balance() assert.equal("10.10.10.3:8080", peer) end) + + it("the success rate should keep increasing", function() + local new_backend = util.deepcopy(backend) + table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }) + + local offsets = {-15, -10, -5} + local pre = 0; + for _, offset in ipairs(offsets) do + store_ewma_stats("10.10.10.4:8080", 1, 10, 1, ngx_now + offset) + local score = balancer_ewma.score({ address = "10.10.10.4", port = "8080" }) + assert.is.True(score > pre) + pre = score + end + end) end) describe("sync()", function() diff --git a/test/data/cleanConf.expected.conf b/test/data/cleanConf.expected.conf index 9c0513b37..d826c6126 100644 --- a/test/data/cleanConf.expected.conf +++ b/test/data/cleanConf.expected.conf @@ -21,6 +21,8 @@ http { lua_package_path "/etc/nginx/lua/?.lua;;"; lua_shared_dict balancer_ewma 10M; + lua_shared_dict balancer_ewma_total 10M; + lua_shared_dict balancer_ewma_failed 10M; lua_shared_dict balancer_ewma_last_touched_at 10M; lua_shared_dict balancer_ewma_locks 1M; lua_shared_dict certificate_data 20M; diff --git a/test/data/cleanConf.src.conf b/test/data/cleanConf.src.conf index 6da578106..a1640c7ce 100644 --- a/test/data/cleanConf.src.conf +++ b/test/data/cleanConf.src.conf @@ -38,6 +38,8 @@ http { lua_package_path "/etc/nginx/lua/?.lua;;"; lua_shared_dict balancer_ewma 10M; +lua_shared_dict balancer_ewma_total 10M; +lua_shared_dict balancer_ewma_failed 10M; lua_shared_dict balancer_ewma_last_touched_at 10M; lua_shared_dict balancer_ewma_locks 1M; lua_shared_dict certificate_data 20M; diff --git a/test/test-lua.sh b/test/test-lua.sh index e7ee5843e..57ed24a4c 100755 --- a/test/test-lua.sh +++ b/test/test-lua.sh @@ -32,6 +32,8 @@ SHDICT_ARGS=( "--shdict" "certificate_servers 1M" "--shdict" "ocsp_response_cache 1M" "--shdict" "balancer_ewma 1M" + "--shdict" "balancer_ewma_total 1M" + "--shdict" "balancer_ewma_failed 1M" "--shdict" "quota_tracker 1M" "--shdict" "high_throughput_tracker 1M" "--shdict" "balancer_ewma_last_touched_at 1M"