From 7b64e477ee49b1674998e2062a629307c1720782 Mon Sep 17 00:00:00 2001 From: jizhuozhi Date: Fri, 21 Apr 2023 16:53:16 +0800 Subject: [PATCH 1/4] Fix EWMA by the success rate when fail-fast --- .../ingress/controller/template/configmap.go | 2 + rootfs/etc/nginx/lua/balancer/ewma.lua | 59 ++++++++++++++++--- .../etc/nginx/lua/test/balancer/ewma_test.lua | 43 ++++++++++++-- test/data/cleanConf.expected.conf | 2 + test/data/cleanConf.src.conf | 2 + test/test-lua.sh | 2 + 6 files changed, 99 insertions(+), 11 deletions(-) diff --git a/internal/ingress/controller/template/configmap.go b/internal/ingress/controller/template/configmap.go index c73f3b6c0..0f45c3061 100644 --- a/internal/ingress/controller/template/configmap.go +++ b/internal/ingress/controller/template/configmap.go @@ -78,6 +78,8 @@ var ( "balancer_ewma": 10240, "balancer_ewma_last_touched_at": 10240, "balancer_ewma_locks": 1024, + "balancer_ewma_total": 10240, + "balancer_ewma_failed": 10240, "certificate_servers": 5120, "ocsp_response_cache": 5120, // keep this same as certificate_servers "global_throttle_cache": 10240, diff --git a/rootfs/etc/nginx/lua/balancer/ewma.lua b/rootfs/etc/nginx/lua/balancer/ewma.lua index 681866dc1..8b4fd7b4c 100644 --- a/rootfs/etc/nginx/lua/balancer/ewma.lua +++ b/rootfs/etc/nginx/lua/balancer/ewma.lua @@ -25,6 +25,7 @@ local INFO = ngx.INFO local DECAY_TIME = 10 -- this value is in seconds local LOCK_KEY = ":ewma_key" local PICK_SET_SIZE = 2 +local MIN_SUCCESS_RATE = 0.1 local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", {timeout = 0, exptime = 0.1}) if not ewma_lock then @@ -62,7 +63,7 @@ local function decay_ewma(ewma, last_touched_at, rtt, now) return ewma end -local function store_stats(upstream, ewma, now) +local function store_stats(upstream, ewma, total_ewma, failed_ewma, now) local success, err, forcible = ngx.shared.balancer_ewma_last_touched_at:set(upstream, now) if not success then ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set failed " .. err) @@ -78,31 +79,70 @@ local function store_stats(upstream, ewma, now) if forcible then ngx.log(ngx.WARN, "balancer_ewma:set valid items forcibly overwritten") end + + success, err, forcible = ngx.shared.balancer_ewma_total:set(upstream, total_ewma) + if not success then + ngx.log(ngx.WARN, "balancer_ewma_total:set failed " .. err) + end + if forcible then + ngx.log(ngx.WARN, "balancer_ewma_total:set valid items forcibly overwritten") + end + + success, err, forcible = ngx.shared.balancer_ewma_failed:set(upstream, failed_ewma) + if not success then + ngx.log(ngx.WARN, "balancer_ewma_failed:set failed " .. err) + end + if forcible then + ngx.log(ngx.WARN, "balancer_ewma_failed:set valid items forcibly overwritten") + end end -local function get_or_update_ewma(upstream, rtt, update) +local function get_or_update_ewma(upstream, rtt, failed, update) local lock_err = nil if update then lock_err = lock(upstream) end + local ewma = ngx.shared.balancer_ewma:get(upstream) or 0 + local total_ewma = ngx.shared.balancer_ewma_total:get(upstream) or 0 + local failed_ewma = ngx.shared.balancer_ewma_failed:get(upstream) or 0 + if lock_err ~= nil then return ewma, lock_err end local now = ngx.now() local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0 + ewma = decay_ewma(ewma, last_touched_at, rtt, now) + if rtt > 0 then + total_ewma = decay_ewma(total_ewma, last_touched_at, 1, now) + end + + if failed then + failed_ewma = decay_ewma(failed_ewma, last_touched_at, 1, now) + end + if not update then return ewma, nil end - store_stats(upstream, ewma, now) + store_stats(upstream, ewma, total_ewma, failed_ewma, now) unlock() - return ewma, nil + if rtt == 0 then + return ewma, nil + else + local success_rate = 1 - failed_ewma / total_ewma + + if success_rate < MIN_SUCCESS_RATE then + success_rate = MIN_SUCCESS_RATE + end + + return ewma / success_rate, nil + end end @@ -115,7 +155,7 @@ local function score(upstream) -- Original implementation used names -- Endpoints don't have names, so passing in IP:Port as key instead local upstream_name = get_upstream_name(upstream) - return get_or_update_ewma(upstream_name, 0, false) + return get_or_update_ewma(upstream_name, 0, false, false) end -- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle @@ -222,14 +262,17 @@ end function _M.after_balance(_) local response_time = tonumber(split.get_last_value(ngx.var.upstream_response_time)) or 0 local connect_time = tonumber(split.get_last_value(ngx.var.upstream_connect_time)) or 0 + local status = tonumber(split.get_last_value(ngx.var.status)) or 0 + local rtt = connect_time + response_time local upstream = split.get_last_value(ngx.var.upstream_addr) + local failed = status >= 400 if util.is_blank(upstream) then return end - get_or_update_ewma(upstream, rtt, true) + return get_or_update_ewma(upstream, rtt, failed, true) end function _M.sync(self, backend) @@ -250,6 +293,8 @@ function _M.sync(self, backend) for _, endpoint_string in ipairs(normalized_endpoints_removed) do ngx.shared.balancer_ewma:delete(endpoint_string) + ngx.shared.balancer_ewma_total:delete(endpoint_string) + ngx.shared.balancer_ewma_failed:delete(endpoint_string) ngx.shared.balancer_ewma_last_touched_at:delete(endpoint_string) end @@ -257,7 +302,7 @@ function _M.sync(self, backend) if slow_start_ewma ~= nil then local now = ngx.now() for _, endpoint_string in ipairs(normalized_endpoints_added) do - store_stats(endpoint_string, slow_start_ewma, now) + store_stats(endpoint_string, slow_start_ewma, 0, 0, now) end end end diff --git a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua index 6af588396..66f06bad0 100644 --- a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua @@ -16,8 +16,10 @@ local function flush_all_ewma_stats() ngx.shared.balancer_ewma_last_touched_at:flush_all() end -local function store_ewma_stats(endpoint_string, ewma, touched_at) +local function store_ewma_stats(endpoint_string, ewma, total_ewma, failed_ewma, touched_at) ngx.shared.balancer_ewma:set(endpoint_string, ewma) + ngx.shared.balancer_ewma_total:set(endpoint_string, total_ewma) + ngx.shared.balancer_ewma_failed:set(endpoint_string, failed_ewma) ngx.shared.balancer_ewma_last_touched_at:set(endpoint_string, touched_at) end @@ -45,9 +47,9 @@ describe("Balancer ewma", function() { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, } } - store_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1) - store_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5) - store_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20) + store_ewma_stats("10.10.10.1:8080", 0.2, 0, 0, ngx_now - 1) + store_ewma_stats("10.10.10.2:8080", 0.3, 0, 0, ngx_now - 5) + store_ewma_stats("10.10.10.3:8080", 1.2, 0, 0, ngx_now - 20) instance = balancer_ewma:new(backend) end) @@ -81,6 +83,39 @@ describe("Balancer ewma", function() assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080")) assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080")) end) + + it("updates EWMA stats with failed", function() + ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" } + + local score = instance:after_balance() + + local weight = math.exp(-5 / 10) + local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) + + assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) + assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) + assert.are.equals(score, expected_ewma / 0.1) + end) + + it("updates EWMA stats with multi failed requests", function() + ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" } + + store_ewma_stats("10.10.10.2:8080", 0.3, 1, 0, ngx_now - 5) + + local score = instance:after_balance() + + local weight = math.exp(-5 / 10) + local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) + + local total_ewma = ngx.shared.balancer_ewma_total:get(ngx.var.upstream_addr) + local failed_ewma = ngx.shared.balancer_ewma_failed:get(ngx.var.upstream_addr) + local success_rate = 1 - failed_ewma/total_ewma + + assert.are.not_equals(0.1, success_rate) + assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) + assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) + assert.are.equals(score, expected_ewma / success_rate) + end) end) describe("balance()", function() diff --git a/test/data/cleanConf.expected.conf b/test/data/cleanConf.expected.conf index 1666c19f6..972b2678f 100644 --- a/test/data/cleanConf.expected.conf +++ b/test/data/cleanConf.expected.conf @@ -21,6 +21,8 @@ http { lua_package_path "/etc/nginx/lua/?.lua;;"; lua_shared_dict balancer_ewma 10M; + lua_shared_dict balancer_ewma_total 10M; + lua_shared_dict balancer_ewma_failed 10M; lua_shared_dict balancer_ewma_last_touched_at 10M; lua_shared_dict balancer_ewma_locks 1M; lua_shared_dict certificate_data 20M; diff --git a/test/data/cleanConf.src.conf b/test/data/cleanConf.src.conf index 0e572faa5..0101cf5bb 100644 --- a/test/data/cleanConf.src.conf +++ b/test/data/cleanConf.src.conf @@ -38,6 +38,8 @@ http { lua_package_path "/etc/nginx/lua/?.lua;;"; lua_shared_dict balancer_ewma 10M; +lua_shared_dict balancer_ewma_total 10M; +lua_shared_dict balancer_ewma_failed 10M; lua_shared_dict balancer_ewma_last_touched_at 10M; lua_shared_dict balancer_ewma_locks 1M; lua_shared_dict certificate_data 20M; diff --git a/test/test-lua.sh b/test/test-lua.sh index 9c8149eac..02a38c0a2 100755 --- a/test/test-lua.sh +++ b/test/test-lua.sh @@ -32,6 +32,8 @@ SHDICT_ARGS=( "--shdict" "certificate_servers 1M" "--shdict" "ocsp_response_cache 1M" "--shdict" "balancer_ewma 1M" + "--shdict" "balancer_ewma_total 1M" + "--shdict" "balancer_ewma_failed 1M" "--shdict" "quota_tracker 1M" "--shdict" "high_throughput_tracker 1M" "--shdict" "balancer_ewma_last_touched_at 1M" From 2c8bd9e31483c104598f0de4d9268b87ab8be879 Mon Sep 17 00:00:00 2001 From: jizhuozhi Date: Mon, 24 Apr 2023 13:07:39 +0800 Subject: [PATCH 2/4] EWMA score adjusted by success rate when backend services fail fast --- rootfs/etc/nginx/lua/balancer/ewma.lua | 40 ++- .../etc/nginx/lua/test/balancer/ewma_spec.lua | 266 ++++++++++++++++++ .../etc/nginx/lua/test/balancer/ewma_test.lua | 15 + 3 files changed, 300 insertions(+), 21 deletions(-) create mode 100644 rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua diff --git a/rootfs/etc/nginx/lua/balancer/ewma.lua b/rootfs/etc/nginx/lua/balancer/ewma.lua index 8b4fd7b4c..486a72992 100644 --- a/rootfs/etc/nginx/lua/balancer/ewma.lua +++ b/rootfs/etc/nginx/lua/balancer/ewma.lua @@ -106,7 +106,7 @@ local function get_or_update_ewma(upstream, rtt, failed, update) local ewma = ngx.shared.balancer_ewma:get(upstream) or 0 local total_ewma = ngx.shared.balancer_ewma_total:get(upstream) or 0 local failed_ewma = ngx.shared.balancer_ewma_failed:get(upstream) or 0 - + if lock_err ~= nil then return ewma, lock_err end @@ -114,35 +114,29 @@ local function get_or_update_ewma(upstream, rtt, failed, update) local now = ngx.now() local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0 - ewma = decay_ewma(ewma, last_touched_at, rtt, now) - - if rtt > 0 then - total_ewma = decay_ewma(total_ewma, last_touched_at, 1, now) - end - + local failed_delta = 0 if failed then - failed_ewma = decay_ewma(failed_ewma, last_touched_at, 1, now) + failed_delta = 1 end - if not update then - return ewma, nil - end + ewma = decay_ewma(ewma, last_touched_at, rtt, now) + -- make sure failed rate is always decayed + total_ewma = decay_ewma(total_ewma, last_touched_at, 1, now) + failed_ewma = decay_ewma(failed_ewma, last_touched_at, failed_delta, now) - store_stats(upstream, ewma, total_ewma, failed_ewma, now) + if update then + store_stats(upstream, ewma, total_ewma, failed_ewma, now) + end unlock() - if rtt == 0 then - return ewma, nil - else - local success_rate = 1 - failed_ewma / total_ewma + local success_rate = 1 - failed_ewma / total_ewma - if success_rate < MIN_SUCCESS_RATE then - success_rate = MIN_SUCCESS_RATE - end - - return ewma / success_rate, nil + if success_rate < MIN_SUCCESS_RATE then + success_rate = MIN_SUCCESS_RATE end + + return ewma / success_rate, nil end @@ -307,6 +301,10 @@ function _M.sync(self, backend) end end +function _M.score(self, upstream) + return score(upstream) +end + function _M.new(self, backend) local o = { peers = backend.endpoints, diff --git a/rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua b/rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua new file mode 100644 index 000000000..5fd249674 --- /dev/null +++ b/rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua @@ -0,0 +1,266 @@ +local util = require("util") + +local original_ngx = ngx +local function reset_ngx() + _G.ngx = original_ngx +end + +local function mock_ngx(mock) + local _ngx = mock + setmetatable(_ngx, { __index = ngx }) + _G.ngx = _ngx +end + +local function flush_all_ewma_stats() + ngx.shared.balancer_ewma:flush_all() + ngx.shared.balancer_ewma_last_touched_at:flush_all() +end + +local function store_ewma_stats(endpoint_string, ewma, total_ewma, failed_ewma, touched_at) + ngx.shared.balancer_ewma:set(endpoint_string, ewma) + ngx.shared.balancer_ewma_total:set(endpoint_string, total_ewma) + ngx.shared.balancer_ewma_failed:set(endpoint_string, failed_ewma) + ngx.shared.balancer_ewma_last_touched_at:set(endpoint_string, touched_at) +end + +local function assert_ewma_stats(endpoint_string, ewma, touched_at) + assert.are.equals(ewma, ngx.shared.balancer_ewma:get(endpoint_string)) + assert.are.equals(touched_at, ngx.shared.balancer_ewma_last_touched_at:get(endpoint_string)) +end + + +describe("Balancer ewma", function() + local balancer_ewma = require("balancer.ewma") + local ngx_now = 1543238266 + local backend, instance + + before_each(function() + mock_ngx({ now = function() return ngx_now end, var = { balancer_ewma_score = -1 } }) + package.loaded["balancer.ewma"] = nil + balancer_ewma = require("balancer.ewma") + + backend = { + name = "namespace-service-port", ["load-balance"] = "ewma", + endpoints = { + { address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, + } + } + store_ewma_stats("10.10.10.1:8080", 0.2, 0, 0, ngx_now - 1) + store_ewma_stats("10.10.10.2:8080", 0.3, 0, 0, ngx_now - 5) + store_ewma_stats("10.10.10.3:8080", 1.2, 0, 0, ngx_now - 20) + + instance = balancer_ewma:new(backend) + end) + + after_each(function() + reset_ngx() + flush_all_ewma_stats() + end) + + describe("after_balance()", function() + it("updates EWMA stats", function() + ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1" } + + instance:after_balance() + + local weight = math.exp(-5 / 10) + local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) + + assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) + assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) + end) + + it("updates EWMA stats with the latest result", function() + ngx.var = { upstream_addr = "10.10.10.1:8080, 10.10.10.2:8080", upstream_connect_time = "0.05, 0.02", upstream_response_time = "0.2, 0.1" } + + instance:after_balance() + + local weight = math.exp(-5 / 10) + local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) + + assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080")) + assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080")) + end) + + it("updates EWMA stats with failed", function() + ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" } + + local score = instance:after_balance() + + local weight = math.exp(-5 / 10) + local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) + + assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) + assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) + assert.are.equals(score, expected_ewma / 0.1) + end) + + it("updates EWMA stats with multi failed requests", function() + ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" } + + store_ewma_stats("10.10.10.2:8080", 0.3, 1, 0, ngx_now - 5) + + local score = instance:after_balance() + + local weight = math.exp(-5 / 10) + local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) + + local total_ewma = ngx.shared.balancer_ewma_total:get(ngx.var.upstream_addr) + local failed_ewma = ngx.shared.balancer_ewma_failed:get(ngx.var.upstream_addr) + local success_rate = 1 - failed_ewma/total_ewma + + assert.are.not_equals(0.1, success_rate) + assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) + assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) + assert.are.equals(score, expected_ewma / success_rate) + end) + end) + + describe("balance()", function() + it("returns single endpoint when the given backend has only one endpoint", function() + local single_endpoint_backend = util.deepcopy(backend) + table.remove(single_endpoint_backend.endpoints, 3) + table.remove(single_endpoint_backend.endpoints, 2) + local single_endpoint_instance = balancer_ewma:new(single_endpoint_backend) + + local peer = single_endpoint_instance:balance() + + assert.are.equals("10.10.10.1:8080", peer) + assert.are.equals(-1, ngx.var.balancer_ewma_score) + end) + + it("picks the endpoint with lowest decayed score", function() + local two_endpoints_backend = util.deepcopy(backend) + table.remove(two_endpoints_backend.endpoints, 2) + local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend) + + local peer = two_endpoints_instance:balance() + + -- even though 10.10.10.1:8080 has a lower ewma score + -- algorithm picks 10.10.10.3:8080 because its decayed score is even lower + assert.equal("10.10.10.3:8080", peer) + assert.equal(true, ngx.ctx.balancer_ewma_tried_endpoints["10.10.10.3:8080"]) + assert.are.equals(0.16240233988393523723, ngx.var.balancer_ewma_score) + end) + + it("doesn't pick the tried endpoint while retry", function() + local two_endpoints_backend = util.deepcopy(backend) + table.remove(two_endpoints_backend.endpoints, 2) + local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend) + + ngx.ctx.balancer_ewma_tried_endpoints = { + ["10.10.10.3:8080"] = true, + } + local peer = two_endpoints_instance:balance() + assert.equal("10.10.10.1:8080", peer) + assert.equal(true, ngx.ctx.balancer_ewma_tried_endpoints["10.10.10.1:8080"]) + end) + + it("all the endpoints are tried, pick the one with lowest score", function() + local two_endpoints_backend = util.deepcopy(backend) + table.remove(two_endpoints_backend.endpoints, 2) + local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend) + + ngx.ctx.balancer_ewma_tried_endpoints = { + ["10.10.10.1:8080"] = true, + ["10.10.10.3:8080"] = true, + } + local peer = two_endpoints_instance:balance() + assert.equal("10.10.10.3:8080", peer) + end) + + it("the success rate should keep increasing", function() + local new_backend = util.deepcopy(backend) + table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }) + + local offsets = {-15, -10, -5} + local pre = 0; + for _, offset in ipairs(offsets) do + store_ewma_stats("10.10.10.4:8080", 1, 10, 1, ngx_now + offset) + local score = balancer_ewma:score({ address = "10.10.10.4", port = "8080" }) + assert.is.True(score > pre) + pre = score + end + end) + end) + + describe("sync()", function() + it("does not reset stats when endpoints do not change", function() + local new_backend = util.deepcopy(backend) + + instance:sync(new_backend) + + assert.are.same(new_backend.endpoints, instance.peers) + + assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1) + assert_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5) + assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20) + end) + + it("resets alternative backends and traffic shaping policy even if endpoints do not change", function() + assert.are.same(nil, instance.alternativeBackends) + assert.are.same(nil, instance.trafficShapingPolicy) + + local new_backend = util.deepcopy(backend) + new_backend.alternativeBackends = {"my-canary-namespace-my-canary-service-my-port"} + new_backend.trafficShapingPolicy = { + cookie = "", + header = "", + headerPattern = "", + headerValue = "", + weight = 20, + } + + instance:sync(new_backend) + + assert.are.same(new_backend.alternativeBackends, instance.alternative_backends) + assert.are.same(new_backend.trafficShapingPolicy, instance.traffic_shaping_policy) + assert.are.same(new_backend.endpoints, instance.peers) + + assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1) + assert_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5) + assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20) + end) + + it("updates peers, deletes stats for old endpoints and sets average ewma score to new ones", function() + local new_backend = util.deepcopy(backend) + + -- existing endpoint 10.10.10.2 got deleted + -- and replaced with 10.10.10.4 + new_backend.endpoints[2].address = "10.10.10.4" + -- and there's one new extra endpoint + table.insert(new_backend.endpoints, { address = "10.10.10.5", port = "8080", maxFails = 0, failTimeout = 0 }) + + instance:sync(new_backend) + + assert.are.same(new_backend.endpoints, instance.peers) + + assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1) + assert_ewma_stats("10.10.10.2:8080", nil, nil) + assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20) + + local slow_start_ewma = (0.2 + 1.2) / 2 + assert_ewma_stats("10.10.10.4:8080", slow_start_ewma, ngx_now) + assert_ewma_stats("10.10.10.5:8080", slow_start_ewma, ngx_now) + end) + + it("does not set slow_start_ewma when there is no existing ewma", function() + local new_backend = util.deepcopy(backend) + table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }) + + -- when the LB algorithm instance is just instantiated it won't have any + -- ewma value set for the initial endpoints (because it has not processed any request yet), + -- this test is trying to simulate that by flushing existing ewma values + flush_all_ewma_stats() + + instance:sync(new_backend) + + assert_ewma_stats("10.10.10.1:8080", nil, nil) + assert_ewma_stats("10.10.10.2:8080", nil, nil) + assert_ewma_stats("10.10.10.3:8080", nil, nil) + assert_ewma_stats("10.10.10.4:8080", nil, nil) + end) + end) +end) diff --git a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua index 66f06bad0..9ff2c4a84 100644 --- a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua @@ -170,6 +170,21 @@ describe("Balancer ewma", function() local peer = two_endpoints_instance:balance() assert.equal("10.10.10.3:8080", peer) end) + + + it("the success rate should keep increasing", function() + local new_backend = util.deepcopy(backend) + table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }) + + local offsets = {-5, -10, -15} + local pre = 0; + for _, offset in ipairs(offsets) do + store_ewma_stats("10.10.10.4:8080", 1, 10, 1, ngx_now + offset) + local score = balancer_ewma:score({ address = "10.10.10.4", port = "8080" }) + assert.is.True(score < pre) + pre = score + end + end) end) describe("sync()", function() From 15babaa69d371e5047bf5516db44d0c1f1175dc9 Mon Sep 17 00:00:00 2001 From: jizhuozhi Date: Thu, 11 May 2023 09:29:18 +0800 Subject: [PATCH 3/4] EWMA score adjusted by success rate when backend services fail fast --- rootfs/etc/nginx/lua/balancer/ewma.lua | 2 +- .../etc/nginx/lua/test/balancer/ewma_spec.lua | 266 ------------------ .../etc/nginx/lua/test/balancer/ewma_test.lua | 7 +- 3 files changed, 4 insertions(+), 271 deletions(-) delete mode 100644 rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua diff --git a/rootfs/etc/nginx/lua/balancer/ewma.lua b/rootfs/etc/nginx/lua/balancer/ewma.lua index 486a72992..5d772dc16 100644 --- a/rootfs/etc/nginx/lua/balancer/ewma.lua +++ b/rootfs/etc/nginx/lua/balancer/ewma.lua @@ -301,7 +301,7 @@ function _M.sync(self, backend) end end -function _M.score(self, upstream) +function _M.score(upstream) return score(upstream) end diff --git a/rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua b/rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua deleted file mode 100644 index 5fd249674..000000000 --- a/rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua +++ /dev/null @@ -1,266 +0,0 @@ -local util = require("util") - -local original_ngx = ngx -local function reset_ngx() - _G.ngx = original_ngx -end - -local function mock_ngx(mock) - local _ngx = mock - setmetatable(_ngx, { __index = ngx }) - _G.ngx = _ngx -end - -local function flush_all_ewma_stats() - ngx.shared.balancer_ewma:flush_all() - ngx.shared.balancer_ewma_last_touched_at:flush_all() -end - -local function store_ewma_stats(endpoint_string, ewma, total_ewma, failed_ewma, touched_at) - ngx.shared.balancer_ewma:set(endpoint_string, ewma) - ngx.shared.balancer_ewma_total:set(endpoint_string, total_ewma) - ngx.shared.balancer_ewma_failed:set(endpoint_string, failed_ewma) - ngx.shared.balancer_ewma_last_touched_at:set(endpoint_string, touched_at) -end - -local function assert_ewma_stats(endpoint_string, ewma, touched_at) - assert.are.equals(ewma, ngx.shared.balancer_ewma:get(endpoint_string)) - assert.are.equals(touched_at, ngx.shared.balancer_ewma_last_touched_at:get(endpoint_string)) -end - - -describe("Balancer ewma", function() - local balancer_ewma = require("balancer.ewma") - local ngx_now = 1543238266 - local backend, instance - - before_each(function() - mock_ngx({ now = function() return ngx_now end, var = { balancer_ewma_score = -1 } }) - package.loaded["balancer.ewma"] = nil - balancer_ewma = require("balancer.ewma") - - backend = { - name = "namespace-service-port", ["load-balance"] = "ewma", - endpoints = { - { address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 }, - { address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 }, - { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, - } - } - store_ewma_stats("10.10.10.1:8080", 0.2, 0, 0, ngx_now - 1) - store_ewma_stats("10.10.10.2:8080", 0.3, 0, 0, ngx_now - 5) - store_ewma_stats("10.10.10.3:8080", 1.2, 0, 0, ngx_now - 20) - - instance = balancer_ewma:new(backend) - end) - - after_each(function() - reset_ngx() - flush_all_ewma_stats() - end) - - describe("after_balance()", function() - it("updates EWMA stats", function() - ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1" } - - instance:after_balance() - - local weight = math.exp(-5 / 10) - local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) - - assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) - assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) - end) - - it("updates EWMA stats with the latest result", function() - ngx.var = { upstream_addr = "10.10.10.1:8080, 10.10.10.2:8080", upstream_connect_time = "0.05, 0.02", upstream_response_time = "0.2, 0.1" } - - instance:after_balance() - - local weight = math.exp(-5 / 10) - local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) - - assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080")) - assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080")) - end) - - it("updates EWMA stats with failed", function() - ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" } - - local score = instance:after_balance() - - local weight = math.exp(-5 / 10) - local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) - - assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) - assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) - assert.are.equals(score, expected_ewma / 0.1) - end) - - it("updates EWMA stats with multi failed requests", function() - ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" } - - store_ewma_stats("10.10.10.2:8080", 0.3, 1, 0, ngx_now - 5) - - local score = instance:after_balance() - - local weight = math.exp(-5 / 10) - local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight) - - local total_ewma = ngx.shared.balancer_ewma_total:get(ngx.var.upstream_addr) - local failed_ewma = ngx.shared.balancer_ewma_failed:get(ngx.var.upstream_addr) - local success_rate = 1 - failed_ewma/total_ewma - - assert.are.not_equals(0.1, success_rate) - assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr)) - assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr)) - assert.are.equals(score, expected_ewma / success_rate) - end) - end) - - describe("balance()", function() - it("returns single endpoint when the given backend has only one endpoint", function() - local single_endpoint_backend = util.deepcopy(backend) - table.remove(single_endpoint_backend.endpoints, 3) - table.remove(single_endpoint_backend.endpoints, 2) - local single_endpoint_instance = balancer_ewma:new(single_endpoint_backend) - - local peer = single_endpoint_instance:balance() - - assert.are.equals("10.10.10.1:8080", peer) - assert.are.equals(-1, ngx.var.balancer_ewma_score) - end) - - it("picks the endpoint with lowest decayed score", function() - local two_endpoints_backend = util.deepcopy(backend) - table.remove(two_endpoints_backend.endpoints, 2) - local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend) - - local peer = two_endpoints_instance:balance() - - -- even though 10.10.10.1:8080 has a lower ewma score - -- algorithm picks 10.10.10.3:8080 because its decayed score is even lower - assert.equal("10.10.10.3:8080", peer) - assert.equal(true, ngx.ctx.balancer_ewma_tried_endpoints["10.10.10.3:8080"]) - assert.are.equals(0.16240233988393523723, ngx.var.balancer_ewma_score) - end) - - it("doesn't pick the tried endpoint while retry", function() - local two_endpoints_backend = util.deepcopy(backend) - table.remove(two_endpoints_backend.endpoints, 2) - local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend) - - ngx.ctx.balancer_ewma_tried_endpoints = { - ["10.10.10.3:8080"] = true, - } - local peer = two_endpoints_instance:balance() - assert.equal("10.10.10.1:8080", peer) - assert.equal(true, ngx.ctx.balancer_ewma_tried_endpoints["10.10.10.1:8080"]) - end) - - it("all the endpoints are tried, pick the one with lowest score", function() - local two_endpoints_backend = util.deepcopy(backend) - table.remove(two_endpoints_backend.endpoints, 2) - local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend) - - ngx.ctx.balancer_ewma_tried_endpoints = { - ["10.10.10.1:8080"] = true, - ["10.10.10.3:8080"] = true, - } - local peer = two_endpoints_instance:balance() - assert.equal("10.10.10.3:8080", peer) - end) - - it("the success rate should keep increasing", function() - local new_backend = util.deepcopy(backend) - table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }) - - local offsets = {-15, -10, -5} - local pre = 0; - for _, offset in ipairs(offsets) do - store_ewma_stats("10.10.10.4:8080", 1, 10, 1, ngx_now + offset) - local score = balancer_ewma:score({ address = "10.10.10.4", port = "8080" }) - assert.is.True(score > pre) - pre = score - end - end) - end) - - describe("sync()", function() - it("does not reset stats when endpoints do not change", function() - local new_backend = util.deepcopy(backend) - - instance:sync(new_backend) - - assert.are.same(new_backend.endpoints, instance.peers) - - assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1) - assert_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5) - assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20) - end) - - it("resets alternative backends and traffic shaping policy even if endpoints do not change", function() - assert.are.same(nil, instance.alternativeBackends) - assert.are.same(nil, instance.trafficShapingPolicy) - - local new_backend = util.deepcopy(backend) - new_backend.alternativeBackends = {"my-canary-namespace-my-canary-service-my-port"} - new_backend.trafficShapingPolicy = { - cookie = "", - header = "", - headerPattern = "", - headerValue = "", - weight = 20, - } - - instance:sync(new_backend) - - assert.are.same(new_backend.alternativeBackends, instance.alternative_backends) - assert.are.same(new_backend.trafficShapingPolicy, instance.traffic_shaping_policy) - assert.are.same(new_backend.endpoints, instance.peers) - - assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1) - assert_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5) - assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20) - end) - - it("updates peers, deletes stats for old endpoints and sets average ewma score to new ones", function() - local new_backend = util.deepcopy(backend) - - -- existing endpoint 10.10.10.2 got deleted - -- and replaced with 10.10.10.4 - new_backend.endpoints[2].address = "10.10.10.4" - -- and there's one new extra endpoint - table.insert(new_backend.endpoints, { address = "10.10.10.5", port = "8080", maxFails = 0, failTimeout = 0 }) - - instance:sync(new_backend) - - assert.are.same(new_backend.endpoints, instance.peers) - - assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1) - assert_ewma_stats("10.10.10.2:8080", nil, nil) - assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20) - - local slow_start_ewma = (0.2 + 1.2) / 2 - assert_ewma_stats("10.10.10.4:8080", slow_start_ewma, ngx_now) - assert_ewma_stats("10.10.10.5:8080", slow_start_ewma, ngx_now) - end) - - it("does not set slow_start_ewma when there is no existing ewma", function() - local new_backend = util.deepcopy(backend) - table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }) - - -- when the LB algorithm instance is just instantiated it won't have any - -- ewma value set for the initial endpoints (because it has not processed any request yet), - -- this test is trying to simulate that by flushing existing ewma values - flush_all_ewma_stats() - - instance:sync(new_backend) - - assert_ewma_stats("10.10.10.1:8080", nil, nil) - assert_ewma_stats("10.10.10.2:8080", nil, nil) - assert_ewma_stats("10.10.10.3:8080", nil, nil) - assert_ewma_stats("10.10.10.4:8080", nil, nil) - end) - end) -end) diff --git a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua index 9ff2c4a84..c27a17dbe 100644 --- a/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer/ewma_test.lua @@ -171,17 +171,16 @@ describe("Balancer ewma", function() assert.equal("10.10.10.3:8080", peer) end) - it("the success rate should keep increasing", function() local new_backend = util.deepcopy(backend) table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }) - local offsets = {-5, -10, -15} + local offsets = {-15, -10, -5} local pre = 0; for _, offset in ipairs(offsets) do store_ewma_stats("10.10.10.4:8080", 1, 10, 1, ngx_now + offset) - local score = balancer_ewma:score({ address = "10.10.10.4", port = "8080" }) - assert.is.True(score < pre) + local score = balancer_ewma.score({ address = "10.10.10.4", port = "8080" }) + assert.is.True(score > pre) pre = score end end) From 0caa2ddb298b2bf20b20eb1ffcb6a507327f38b6 Mon Sep 17 00:00:00 2001 From: jizhuozhi Date: Thu, 11 May 2023 09:31:38 +0800 Subject: [PATCH 4/4] fix lint --- rootfs/etc/nginx/lua/balancer/ewma.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rootfs/etc/nginx/lua/balancer/ewma.lua b/rootfs/etc/nginx/lua/balancer/ewma.lua index 5d772dc16..203b76412 100644 --- a/rootfs/etc/nginx/lua/balancer/ewma.lua +++ b/rootfs/etc/nginx/lua/balancer/ewma.lua @@ -106,7 +106,7 @@ local function get_or_update_ewma(upstream, rtt, failed, update) local ewma = ngx.shared.balancer_ewma:get(upstream) or 0 local total_ewma = ngx.shared.balancer_ewma_total:get(upstream) or 0 local failed_ewma = ngx.shared.balancer_ewma_failed:get(upstream) or 0 - + if lock_err ~= nil then return ewma, lock_err end