Don't pick tried endpoint & count the latest in ewma balancer
fixes https://github.com/kubernetes/ingress-nginx/issues/6632
This commit is contained in:
parent
f1124aaf04
commit
e118ebc08a
4 changed files with 95 additions and 7 deletions
|
@ -18,6 +18,7 @@ local string = string
|
||||||
local tonumber = tonumber
|
local tonumber = tonumber
|
||||||
local setmetatable = setmetatable
|
local setmetatable = setmetatable
|
||||||
local string_format = string.format
|
local string_format = string.format
|
||||||
|
local table_insert = table.insert
|
||||||
local ngx_log = ngx.log
|
local ngx_log = ngx.log
|
||||||
local INFO = ngx.INFO
|
local INFO = ngx.INFO
|
||||||
|
|
||||||
|
@ -105,10 +106,15 @@ local function get_or_update_ewma(upstream, rtt, update)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
local function get_upstream_name(upstream)
|
||||||
|
return upstream.address .. ":" .. upstream.port
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
local function score(upstream)
|
local function score(upstream)
|
||||||
-- Original implementation used names
|
-- Original implementation used names
|
||||||
-- Endpoints don't have names, so passing in IP:Port as key instead
|
-- Endpoints don't have names, so passing in IP:Port as key instead
|
||||||
local upstream_name = upstream.address .. ":" .. upstream.port
|
local upstream_name = get_upstream_name(upstream)
|
||||||
return get_or_update_ewma(upstream_name, 0, false)
|
return get_or_update_ewma(upstream_name, 0, false)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -135,6 +141,7 @@ local function pick_and_score(peers, k)
|
||||||
lowest_score_index, lowest_score = i, new_score
|
lowest_score_index, lowest_score = i, new_score
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
return peers[lowest_score_index], lowest_score
|
return peers[lowest_score_index], lowest_score
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -146,7 +153,7 @@ local function calculate_slow_start_ewma(self)
|
||||||
local endpoints_count = 0
|
local endpoints_count = 0
|
||||||
|
|
||||||
for _, endpoint in pairs(self.peers) do
|
for _, endpoint in pairs(self.peers) do
|
||||||
local endpoint_string = endpoint.address .. ":" .. endpoint.port
|
local endpoint_string = get_upstream_name(endpoint)
|
||||||
local ewma = ngx.shared.balancer_ewma:get(endpoint_string)
|
local ewma = ngx.shared.balancer_ewma:get(endpoint_string)
|
||||||
|
|
||||||
if ewma then
|
if ewma then
|
||||||
|
@ -170,20 +177,50 @@ function _M.balance(self)
|
||||||
if #peers > 1 then
|
if #peers > 1 then
|
||||||
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
|
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
|
||||||
local peer_copy = util.deepcopy(peers)
|
local peer_copy = util.deepcopy(peers)
|
||||||
endpoint, ewma_score = pick_and_score(peer_copy, k)
|
|
||||||
|
local tried_endpoints
|
||||||
|
if not ngx.ctx.balancer_ewma_tried_endpoints then
|
||||||
|
tried_endpoints = {}
|
||||||
|
ngx.ctx.balancer_ewma_tried_endpoints = tried_endpoints
|
||||||
|
else
|
||||||
|
tried_endpoints = ngx.ctx.balancer_ewma_tried_endpoints
|
||||||
|
end
|
||||||
|
|
||||||
|
local filtered_peers
|
||||||
|
for _, peer in ipairs(peer_copy) do
|
||||||
|
if not tried_endpoints[get_upstream_name(peer)] then
|
||||||
|
if not filtered_peers then
|
||||||
|
filtered_peers = {}
|
||||||
|
end
|
||||||
|
table_insert(filtered_peers, peer)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
if not filtered_peers then
|
||||||
|
ngx.log(ngx.WARN, "all endpoints have been retried")
|
||||||
|
filtered_peers = peer_copy
|
||||||
|
end
|
||||||
|
|
||||||
|
if #filtered_peers > 1 then
|
||||||
|
endpoint, ewma_score = pick_and_score(filtered_peers, k)
|
||||||
|
else
|
||||||
|
endpoint, ewma_score = filtered_peers[1], score(filtered_peers[1])
|
||||||
|
end
|
||||||
|
|
||||||
|
tried_endpoints[get_upstream_name(endpoint)] = true
|
||||||
end
|
end
|
||||||
|
|
||||||
ngx.var.balancer_ewma_score = ewma_score
|
ngx.var.balancer_ewma_score = ewma_score
|
||||||
|
|
||||||
-- TODO(elvinefendi) move this processing to _M.sync
|
-- TODO(elvinefendi) move this processing to _M.sync
|
||||||
return endpoint.address .. ":" .. endpoint.port
|
return get_upstream_name(endpoint)
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.after_balance(_)
|
function _M.after_balance(_)
|
||||||
local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0
|
local response_time = tonumber(split.get_last_value(ngx.var.upstream_response_time)) or 0
|
||||||
local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0
|
local connect_time = tonumber(split.get_last_value(ngx.var.upstream_connect_time)) or 0
|
||||||
local rtt = connect_time + response_time
|
local rtt = connect_time + response_time
|
||||||
local upstream = split.get_first_value(ngx.var.upstream_addr)
|
local upstream = split.get_last_value(ngx.var.upstream_addr)
|
||||||
|
|
||||||
if util.is_blank(upstream) then
|
if util.is_blank(upstream) then
|
||||||
return
|
return
|
||||||
|
|
|
@ -69,6 +69,18 @@ describe("Balancer ewma", function()
|
||||||
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
|
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get(ngx.var.upstream_addr))
|
||||||
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
|
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get(ngx.var.upstream_addr))
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
it("updates EWMA stats with the latest result", function()
|
||||||
|
ngx.var = { upstream_addr = "10.10.10.1:8080, 10.10.10.2:8080", upstream_connect_time = "0.05, 0.02", upstream_response_time = "0.2, 0.1" }
|
||||||
|
|
||||||
|
instance:after_balance()
|
||||||
|
|
||||||
|
local weight = math.exp(-5 / 10)
|
||||||
|
local expected_ewma = 0.3 * weight + 0.12 * (1.0 - weight)
|
||||||
|
|
||||||
|
assert.are.equals(expected_ewma, ngx.shared.balancer_ewma:get("10.10.10.2:8080"))
|
||||||
|
assert.are.equals(ngx_now, ngx.shared.balancer_ewma_last_touched_at:get("10.10.10.2:8080"))
|
||||||
|
end)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
describe("balance()", function()
|
describe("balance()", function()
|
||||||
|
@ -96,6 +108,24 @@ describe("Balancer ewma", function()
|
||||||
assert.equal("10.10.10.3:8080", peer)
|
assert.equal("10.10.10.3:8080", peer)
|
||||||
assert.are.equals(0.16240233988393523723, ngx.var.balancer_ewma_score)
|
assert.are.equals(0.16240233988393523723, ngx.var.balancer_ewma_score)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
it("doesn't pick the tried endpoint while retry", function()
|
||||||
|
local two_endpoints_backend = util.deepcopy(backend)
|
||||||
|
table.remove(two_endpoints_backend.endpoints, 2)
|
||||||
|
local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend)
|
||||||
|
|
||||||
|
local peer = two_endpoints_instance:balance()
|
||||||
|
assert.equal("10.10.10.1:8080", peer)
|
||||||
|
end)
|
||||||
|
|
||||||
|
it("all the endpoints are tried, pick the one with lowest score", function()
|
||||||
|
local two_endpoints_backend = util.deepcopy(backend)
|
||||||
|
table.remove(two_endpoints_backend.endpoints, 2)
|
||||||
|
local two_endpoints_instance = balancer_ewma:new(two_endpoints_backend)
|
||||||
|
|
||||||
|
local peer = two_endpoints_instance:balance()
|
||||||
|
assert.equal("10.10.10.3:8080", peer)
|
||||||
|
end)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
describe("sync()", function()
|
describe("sync()", function()
|
||||||
|
|
15
rootfs/etc/nginx/lua/test/util/split.lua
Normal file
15
rootfs/etc/nginx/lua/test/util/split.lua
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
local split = require("util.split")
|
||||||
|
|
||||||
|
|
||||||
|
describe("split", function()
|
||||||
|
it("get_last_value", function()
|
||||||
|
for _, case in ipairs({
|
||||||
|
{"127.0.0.1:26157 : 127.0.0.1:26158", "127.0.0.1:26158"},
|
||||||
|
{"127.0.0.1:26157, 127.0.0.1:26158", "127.0.0.1:26158"},
|
||||||
|
{"127.0.0.1:26158", "127.0.0.1:26158"},
|
||||||
|
}) do
|
||||||
|
local last = split.get_last_value(case[1])
|
||||||
|
assert.equal(case[2], last)
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end)
|
|
@ -18,6 +18,12 @@ function _M.get_first_value(var)
|
||||||
return t[1]
|
return t[1]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function _M.get_last_value(var)
|
||||||
|
local t = _M.split_upstream_var(var) or {}
|
||||||
|
if #t == 0 then return nil end
|
||||||
|
return t[#t]
|
||||||
|
end
|
||||||
|
|
||||||
-- http://nginx.org/en/docs/http/ngx_http_upstream_module.html#example
|
-- http://nginx.org/en/docs/http/ngx_http_upstream_module.html#example
|
||||||
-- CAVEAT: nginx is giving out : instead of , so the docs are wrong
|
-- CAVEAT: nginx is giving out : instead of , so the docs are wrong
|
||||||
-- 127.0.0.1:26157 : 127.0.0.1:26157 , ngx.var.upstream_addr
|
-- 127.0.0.1:26157 : 127.0.0.1:26157 , ngx.var.upstream_addr
|
||||||
|
|
Loading…
Reference in a new issue