EWMA score adjusted by success rate when backend services fail fast
This commit is contained in:
parent
7b64e477ee
commit
2c8bd9e314
3 changed files with 300 additions and 21 deletions
|
@ -106,7 +106,7 @@ local function get_or_update_ewma(upstream, rtt, failed, update)
|
||||||
local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
|
local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
|
||||||
local total_ewma = ngx.shared.balancer_ewma_total:get(upstream) or 0
|
local total_ewma = ngx.shared.balancer_ewma_total:get(upstream) or 0
|
||||||
local failed_ewma = ngx.shared.balancer_ewma_failed:get(upstream) or 0
|
local failed_ewma = ngx.shared.balancer_ewma_failed:get(upstream) or 0
|
||||||
|
|
||||||
if lock_err ~= nil then
|
if lock_err ~= nil then
|
||||||
return ewma, lock_err
|
return ewma, lock_err
|
||||||
end
|
end
|
||||||
|
@ -114,35 +114,29 @@ local function get_or_update_ewma(upstream, rtt, failed, update)
|
||||||
local now = ngx.now()
|
local now = ngx.now()
|
||||||
local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
|
local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
|
||||||
|
|
||||||
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
|
local failed_delta = 0
|
||||||
|
|
||||||
if rtt > 0 then
|
|
||||||
total_ewma = decay_ewma(total_ewma, last_touched_at, 1, now)
|
|
||||||
end
|
|
||||||
|
|
||||||
if failed then
|
if failed then
|
||||||
failed_ewma = decay_ewma(failed_ewma, last_touched_at, 1, now)
|
failed_delta = 1
|
||||||
end
|
end
|
||||||
|
|
||||||
if not update then
|
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
|
||||||
return ewma, nil
|
-- make sure failed rate is always decayed
|
||||||
end
|
total_ewma = decay_ewma(total_ewma, last_touched_at, 1, now)
|
||||||
|
failed_ewma = decay_ewma(failed_ewma, last_touched_at, failed_delta, now)
|
||||||
|
|
||||||
store_stats(upstream, ewma, total_ewma, failed_ewma, now)
|
if update then
|
||||||
|
store_stats(upstream, ewma, total_ewma, failed_ewma, now)
|
||||||
|
end
|
||||||
|
|
||||||
unlock()
|
unlock()
|
||||||
|
|
||||||
if rtt == 0 then
|
local success_rate = 1 - failed_ewma / total_ewma
|
||||||
return ewma, nil
|
|
||||||
else
|
|
||||||
local success_rate = 1 - failed_ewma / total_ewma
|
|
||||||
|
|
||||||
if success_rate < MIN_SUCCESS_RATE then
|
if success_rate < MIN_SUCCESS_RATE then
|
||||||
success_rate = MIN_SUCCESS_RATE
|
success_rate = MIN_SUCCESS_RATE
|
||||||
end
|
|
||||||
|
|
||||||
return ewma / success_rate, nil
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
return ewma / success_rate, nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
@ -307,6 +301,10 @@ function _M.sync(self, backend)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function _M.score(self, upstream)
|
||||||
|
return score(upstream)
|
||||||
|
end
|
||||||
|
|
||||||
function _M.new(self, backend)
|
function _M.new(self, backend)
|
||||||
local o = {
|
local o = {
|
||||||
peers = backend.endpoints,
|
peers = backend.endpoints,
|
||||||
|
|
266
rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua
Normal file
266
rootfs/etc/nginx/lua/test/balancer/ewma_spec.lua
Normal file
|
@ -0,0 +1,266 @@
|
||||||
|
local util = require("util")
|
||||||
|
|
||||||
|
local original_ngx = ngx
|
||||||
|
local function reset_ngx()
|
||||||
|
_G.ngx = original_ngx
|
||||||
|
end
|
||||||
|
|
||||||
|
local function mock_ngx(mock)
|
||||||
|
local _ngx = mock
|
||||||
|
setmetatable(_ngx, { __index = ngx })
|
||||||
|
_G.ngx = _ngx
|
||||||
|
end
|
||||||
|
|
||||||
|
local function flush_all_ewma_stats()
|
||||||
|
ngx.shared.balancer_ewma:flush_all()
|
||||||
|
ngx.shared.balancer_ewma_last_touched_at:flush_all()
|
||||||
|
end
|
||||||
|
|
||||||
|
local function store_ewma_stats(endpoint_string, ewma, total_ewma, failed_ewma, touched_at)
|
||||||
|
ngx.shared.balancer_ewma:set(endpoint_string, ewma)
|
||||||
|
ngx.shared.balancer_ewma_total:set(endpoint_string, total_ewma)
|
||||||
|
ngx.shared.balancer_ewma_failed:set(endpoint_string, failed_ewma)
|
||||||
|
ngx.shared.balancer_ewma_last_touched_at:set(endpoint_string, touched_at)
|
||||||
|
end
|
||||||
|
|
||||||
|
local function assert_ewma_stats(endpoint_string, ewma, touched_at)
|
||||||
|
assert.are.equals(ewma, ngx.shared.balancer_ewma:get(endpoint_string))
|
||||||
|
assert.are.equals(touched_at, ngx.shared.balancer_ewma_last_touched_at:get(endpoint_string))
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
describe("Balancer ewma", function()
|
||||||
|
local balancer_ewma = require("balancer.ewma")
|
||||||
|
local ngx_now = 1543238266
|
||||||
|
local backend, instance
|
||||||
|
|
||||||
|
before_each(function()
|
||||||
|
mock_ngx({ now = function() return ngx_now end, var = { balancer_ewma_score = -1 } })
|
||||||
|
package.loaded["balancer.ewma"] = nil
|
||||||
|
balancer_ewma = require("balancer.ewma")
|
||||||
|
|
||||||
|
backend = {
|
||||||
|
name = "namespace-service-port", ["load-balance"] = "ewma",
|
||||||
|
endpoints = {
|
||||||
|
{ address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 },
|
||||||
|
{ address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 },
|
||||||
|
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
store_ewma_stats("10.10.10.1:8080", 0.2, 0, 0, ngx_now - 1)
|
||||||
|
store_ewma_stats("10.10.10.2:8080", 0.3, 0, 0, ngx_now - 5)
|
||||||
|
store_ewma_stats("10.10.10.3:8080", 1.2, 0, 0, ngx_now - 20)
|
||||||
|
|
||||||
|
instance = balancer_ewma:new(backend)
|
||||||
|
end)
|
||||||
|
|
||||||
|
after_each(function()
|
||||||
|
reset_ngx()
|
||||||
|
flush_all_ewma_stats()
|
||||||
|
end)
|
||||||
|
|
||||||
|
describe("after_balance()", function()
|
||||||
|
it("updates EWMA stats", function()
|
||||||
|
ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1" }
|
||||||
|
|
||||||
|
instance:after_balance()
|
||||||
|
|
||||||
|
local weight = math.exp(-5 / 10)
|
||||||
|
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
|
||||||
|
|
||||||
|
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
|
||||||
|
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("updates EWMA stats with the latest result", function()
|
||||||
|
ngx.var = { upstream_addr = "10.10.10.1:8080, 10.10.10.2:8080", upstream_connect_time = "0.05, 0.02", upstream_response_time = "0.2, 0.1" }
|
||||||
|
|
||||||
|
instance:after_balance()
|
||||||
|
|
||||||
|
local weight = math.exp(-5 / 10)
|
||||||
|
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
|
||||||
|
|
||||||
|
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080"))
|
||||||
|
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080"))
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("updates EWMA stats with failed", function()
|
||||||
|
ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" }
|
||||||
|
|
||||||
|
local score = instance:after_balance()
|
||||||
|
|
||||||
|
local weight = math.exp(-5 / 10)
|
||||||
|
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
|
||||||
|
|
||||||
|
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
|
||||||
|
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
|
||||||
|
assert.are.equals(score, expected_ewma / 0.1)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("updates EWMA stats with multi failed requests", function()
|
||||||
|
ngx.var = { upstream_addr = "10.10.10.2:8080", upstream_connect_time = "0.02", upstream_response_time = "0.1", status="400" }
|
||||||
|
|
||||||
|
store_ewma_stats("10.10.10.2:8080", 0.3, 1, 0, ngx_now - 5)
|
||||||
|
|
||||||
|
local score = instance:after_balance()
|
||||||
|
|
||||||
|
local weight = math.exp(-5 / 10)
|
||||||
|
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
|
||||||
|
|
||||||
|
local total_ewma = ngx.shared.balancer_ewma_total:get(ngx.var.upstream_addr)
|
||||||
|
local failed_ewma = ngx.shared.balancer_ewma_failed:get(ngx.var.upstream_addr)
|
||||||
|
local success_rate = 1 - failed_ewma/total_ewma
|
||||||
|
|
||||||
|
assert.are.not_equals(0.1, success_rate)
|
||||||
|
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
|
||||||
|
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
|
||||||
|
assert.are.equals(score, expected_ewma / success_rate)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
describe("balance()", function()
|
||||||
|
it("returns single endpoint when the given backend has only one endpoint", function()
|
||||||
|
local single_endpoint_backend = util.deepcopy(backend)
|
||||||
|
table.remove(single_endpoint_backend.endpoints, 3)
|
||||||
|
table.remove(single_endpoint_backend.endpoints, 2)
|
||||||
|
local single_endpoint_instance = balancer_ewma:new(single_endpoint_backend)
|
||||||
|
|
||||||
|
local peer = single_endpoint_instance:balance()
|
||||||
|
|
||||||
|
assert.are.equals("10.10.10.1:8080", peer)
|
||||||
|
assert.are.equals(-1, ngx.var.balancer_ewma_score)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("picks the endpoint with lowest decayed score", function()
|
||||||
|
local two_endpoints_backend = util.deepcopy(backend)
|
||||||
|
table.remove(two_endpoints_backend.endpoints, 2)
|
||||||
|
local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend)
|
||||||
|
|
||||||
|
local peer = two_endpoints_instance:balance()
|
||||||
|
|
||||||
|
-- even though 10.10.10.1:8080 has a lower ewma score
|
||||||
|
-- algorithm picks 10.10.10.3:8080 because its decayed score is even lower
|
||||||
|
assert.equal("10.10.10.3:8080", peer)
|
||||||
|
assert.equal(true, ngx.ctx.balancer_ewma_tried_endpoints["10.10.10.3:8080"])
|
||||||
|
assert.are.equals(0.16240233988393523723, ngx.var.balancer_ewma_score)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("doesn't pick the tried endpoint while retry", function()
|
||||||
|
local two_endpoints_backend = util.deepcopy(backend)
|
||||||
|
table.remove(two_endpoints_backend.endpoints, 2)
|
||||||
|
local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend)
|
||||||
|
|
||||||
|
ngx.ctx.balancer_ewma_tried_endpoints = {
|
||||||
|
["10.10.10.3:8080"] = true,
|
||||||
|
}
|
||||||
|
local peer = two_endpoints_instance:balance()
|
||||||
|
assert.equal("10.10.10.1:8080", peer)
|
||||||
|
assert.equal(true, ngx.ctx.balancer_ewma_tried_endpoints["10.10.10.1:8080"])
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("all the endpoints are tried, pick the one with lowest score", function()
|
||||||
|
local two_endpoints_backend = util.deepcopy(backend)
|
||||||
|
table.remove(two_endpoints_backend.endpoints, 2)
|
||||||
|
local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend)
|
||||||
|
|
||||||
|
ngx.ctx.balancer_ewma_tried_endpoints = {
|
||||||
|
["10.10.10.1:8080"] = true,
|
||||||
|
["10.10.10.3:8080"] = true,
|
||||||
|
}
|
||||||
|
local peer = two_endpoints_instance:balance()
|
||||||
|
assert.equal("10.10.10.3:8080", peer)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("the success rate should keep increasing", function()
|
||||||
|
local new_backend = util.deepcopy(backend)
|
||||||
|
table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 })
|
||||||
|
|
||||||
|
local offsets = {-15, -10, -5}
|
||||||
|
local pre = 0;
|
||||||
|
for _, offset in ipairs(offsets) do
|
||||||
|
store_ewma_stats("10.10.10.4:8080", 1, 10, 1, ngx_now + offset)
|
||||||
|
local score = balancer_ewma:score({ address = "10.10.10.4", port = "8080" })
|
||||||
|
assert.is.True(score > pre)
|
||||||
|
pre = score
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
describe("sync()", function()
|
||||||
|
it("does not reset stats when endpoints do not change", function()
|
||||||
|
local new_backend = util.deepcopy(backend)
|
||||||
|
|
||||||
|
instance:sync(new_backend)
|
||||||
|
|
||||||
|
assert.are.same(new_backend.endpoints, instance.peers)
|
||||||
|
|
||||||
|
assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1)
|
||||||
|
assert_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5)
|
||||||
|
assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("resets alternative backends and traffic shaping policy even if endpoints do not change", function()
|
||||||
|
assert.are.same(nil, instance.alternativeBackends)
|
||||||
|
assert.are.same(nil, instance.trafficShapingPolicy)
|
||||||
|
|
||||||
|
local new_backend = util.deepcopy(backend)
|
||||||
|
new_backend.alternativeBackends = {"my-canary-namespace-my-canary-service-my-port"}
|
||||||
|
new_backend.trafficShapingPolicy = {
|
||||||
|
cookie = "",
|
||||||
|
header = "",
|
||||||
|
headerPattern = "",
|
||||||
|
headerValue = "",
|
||||||
|
weight = 20,
|
||||||
|
}
|
||||||
|
|
||||||
|
instance:sync(new_backend)
|
||||||
|
|
||||||
|
assert.are.same(new_backend.alternativeBackends, instance.alternative_backends)
|
||||||
|
assert.are.same(new_backend.trafficShapingPolicy, instance.traffic_shaping_policy)
|
||||||
|
assert.are.same(new_backend.endpoints, instance.peers)
|
||||||
|
|
||||||
|
assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1)
|
||||||
|
assert_ewma_stats("10.10.10.2:8080", 0.3, ngx_now - 5)
|
||||||
|
assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("updates peers, deletes stats for old endpoints and sets average ewma score to new ones", function()
|
||||||
|
local new_backend = util.deepcopy(backend)
|
||||||
|
|
||||||
|
-- existing endpoint 10.10.10.2 got deleted
|
||||||
|
-- and replaced with 10.10.10.4
|
||||||
|
new_backend.endpoints[2].address = "10.10.10.4"
|
||||||
|
-- and there's one new extra endpoint
|
||||||
|
table.insert(new_backend.endpoints, { address = "10.10.10.5", port = "8080", maxFails = 0, failTimeout = 0 })
|
||||||
|
|
||||||
|
instance:sync(new_backend)
|
||||||
|
|
||||||
|
assert.are.same(new_backend.endpoints, instance.peers)
|
||||||
|
|
||||||
|
assert_ewma_stats("10.10.10.1:8080", 0.2, ngx_now - 1)
|
||||||
|
assert_ewma_stats("10.10.10.2:8080", nil, nil)
|
||||||
|
assert_ewma_stats("10.10.10.3:8080", 1.2, ngx_now - 20)
|
||||||
|
|
||||||
|
local slow_start_ewma = (0.2 + 1.2) / 2
|
||||||
|
assert_ewma_stats("10.10.10.4:8080", slow_start_ewma, ngx_now)
|
||||||
|
assert_ewma_stats("10.10.10.5:8080", slow_start_ewma, ngx_now)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("does not set slow_start_ewma when there is no existing ewma", function()
|
||||||
|
local new_backend = util.deepcopy(backend)
|
||||||
|
table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 })
|
||||||
|
|
||||||
|
-- when the LB algorithm instance is just instantiated it won't have any
|
||||||
|
-- ewma value set for the initial endpoints (because it has not processed any request yet),
|
||||||
|
-- this test is trying to simulate that by flushing existing ewma values
|
||||||
|
flush_all_ewma_stats()
|
||||||
|
|
||||||
|
instance:sync(new_backend)
|
||||||
|
|
||||||
|
assert_ewma_stats("10.10.10.1:8080", nil, nil)
|
||||||
|
assert_ewma_stats("10.10.10.2:8080", nil, nil)
|
||||||
|
assert_ewma_stats("10.10.10.3:8080", nil, nil)
|
||||||
|
assert_ewma_stats("10.10.10.4:8080", nil, nil)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
end)
|
|
@ -170,6 +170,21 @@ describe("Balancer ewma", function()
|
||||||
local peer = two_endpoints_instance:balance()
|
local peer = two_endpoints_instance:balance()
|
||||||
assert.equal("10.10.10.3:8080", peer)
|
assert.equal("10.10.10.3:8080", peer)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
|
||||||
|
it("the success rate should keep increasing", function()
|
||||||
|
local new_backend = util.deepcopy(backend)
|
||||||
|
table.insert(new_backend.endpoints, { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 })
|
||||||
|
|
||||||
|
local offsets = {-5, -10, -15}
|
||||||
|
local pre = 0;
|
||||||
|
for _, offset in ipairs(offsets) do
|
||||||
|
store_ewma_stats("10.10.10.4:8080", 1, 10, 1, ngx_now + offset)
|
||||||
|
local score = balancer_ewma:score({ address = "10.10.10.4", port = "8080" })
|
||||||
|
assert.is.True(score < pre)
|
||||||
|
pre = score
|
||||||
|
end
|
||||||
|
end)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
describe("sync()", function()
|
describe("sync()", function()
|
||||||
|
|
Loading…
Reference in a new issue