store ewma stats per backend
This commit is contained in:
parent
b65b85cd99
commit
f81f06151d
1 changed files with 17 additions and 20 deletions
|
@ -11,9 +11,6 @@ local split = require("util.split")
|
||||||
local DECAY_TIME = 10 -- this value is in seconds
|
local DECAY_TIME = 10 -- this value is in seconds
|
||||||
local PICK_SET_SIZE = 2
|
local PICK_SET_SIZE = 2
|
||||||
|
|
||||||
local balancer_ewma = {}
|
|
||||||
local balancer_ewma_last_touched_at = {}
|
|
||||||
|
|
||||||
local _M = { name = "ewma" }
|
local _M = { name = "ewma" }
|
||||||
|
|
||||||
local function decay_ewma(ewma, last_touched_at, rtt, now)
|
local function decay_ewma(ewma, last_touched_at, rtt, now)
|
||||||
|
@ -25,28 +22,28 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
|
||||||
return ewma
|
return ewma
|
||||||
end
|
end
|
||||||
|
|
||||||
local function get_or_update_ewma(upstream, rtt, update)
|
local function get_or_update_ewma(self, upstream, rtt, update)
|
||||||
local ewma = balancer_ewma[upstream] or 0
|
local ewma = self.ewma[upstream] or 0
|
||||||
|
|
||||||
local now = ngx.now()
|
local now = ngx.now()
|
||||||
local last_touched_at = balancer_ewma_last_touched_at[upstream] or 0
|
local last_touched_at = self.ewma_last_touched_at[upstream] or 0
|
||||||
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
|
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
|
||||||
|
|
||||||
if not update then
|
if not update then
|
||||||
return ewma, nil
|
return ewma, nil
|
||||||
end
|
end
|
||||||
|
|
||||||
balancer_ewma_last_touched_at[upstream] = now
|
self.ewma[upstream] = ewma
|
||||||
balancer_ewma[upstream] = ewma
|
self.ewma_last_touched_at[upstream] = now
|
||||||
return ewma, nil
|
return ewma, nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
local function score(upstream)
|
local function score(self, upstream)
|
||||||
-- Original implementation used names
|
-- Original implementation used names
|
||||||
-- Endpoints don't have names, so passing in IP:Port as key instead
|
-- Endpoints don't have names, so passing in IP:Port as key instead
|
||||||
local upstream_name = upstream.address .. ":" .. upstream.port
|
local upstream_name = upstream.address .. ":" .. upstream.port
|
||||||
return get_or_update_ewma(upstream_name, 0, false)
|
return get_or_update_ewma(self, upstream_name, 0, false)
|
||||||
end
|
end
|
||||||
|
|
||||||
-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
|
-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
|
||||||
|
@ -62,12 +59,12 @@ local function shuffle_peers(peers, k)
|
||||||
-- peers[1 .. k] will now contain a randomly selected k from #peers
|
-- peers[1 .. k] will now contain a randomly selected k from #peers
|
||||||
end
|
end
|
||||||
|
|
||||||
local function pick_and_score(peers, k)
|
local function pick_and_score(self, peers, k)
|
||||||
shuffle_peers(peers, k)
|
shuffle_peers(peers, k)
|
||||||
local lowest_score_index = 1
|
local lowest_score_index = 1
|
||||||
local lowest_score = score(peers[lowest_score_index])
|
local lowest_score = score(self, peers[lowest_score_index])
|
||||||
for i = 2, k do
|
for i = 2, k do
|
||||||
local new_score = score(peers[i])
|
local new_score = score(self, peers[i])
|
||||||
if new_score < lowest_score then
|
if new_score < lowest_score then
|
||||||
lowest_score_index, lowest_score = i, new_score
|
lowest_score_index, lowest_score = i, new_score
|
||||||
end
|
end
|
||||||
|
@ -82,14 +79,14 @@ function _M.balance(self)
|
||||||
if #peers > 1 then
|
if #peers > 1 then
|
||||||
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
|
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
|
||||||
local peer_copy = util.deepcopy(peers)
|
local peer_copy = util.deepcopy(peers)
|
||||||
endpoint = pick_and_score(peer_copy, k)
|
endpoint = pick_and_score(self, peer_copy, k)
|
||||||
end
|
end
|
||||||
|
|
||||||
-- TODO(elvinefendi) move this processing to _M.sync
|
-- TODO(elvinefendi) move this processing to _M.sync
|
||||||
return endpoint.address .. ":" .. endpoint.port
|
return endpoint.address .. ":" .. endpoint.port
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.after_balance(_)
|
function _M.after_balance(self)
|
||||||
local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0
|
local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0
|
||||||
local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0
|
local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0
|
||||||
local rtt = connect_time + response_time
|
local rtt = connect_time + response_time
|
||||||
|
@ -98,7 +95,7 @@ function _M.after_balance(_)
|
||||||
if util.is_blank(upstream) then
|
if util.is_blank(upstream) then
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
get_or_update_ewma(upstream, rtt, true)
|
get_or_update_ewma(self, upstream, rtt, true)
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.sync(self, backend)
|
function _M.sync(self, backend)
|
||||||
|
@ -111,15 +108,15 @@ function _M.sync(self, backend)
|
||||||
end
|
end
|
||||||
|
|
||||||
self.peers = backend.endpoints
|
self.peers = backend.endpoints
|
||||||
|
self.ewma = {}
|
||||||
-- TODO: Reset state of EWMA per backend
|
self.ewma_last_touched_at = {}
|
||||||
balancer_ewma = {}
|
|
||||||
balancer_ewma_last_touched_at = {}
|
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.new(self, backend)
|
function _M.new(self, backend)
|
||||||
local o = {
|
local o = {
|
||||||
peers = backend.endpoints,
|
peers = backend.endpoints,
|
||||||
|
ewma = {},
|
||||||
|
ewma_last_touched_at = {},
|
||||||
traffic_shaping_policy = backend.trafficShapingPolicy,
|
traffic_shaping_policy = backend.trafficShapingPolicy,
|
||||||
alternative_backends = backend.alternativeBackends,
|
alternative_backends = backend.alternativeBackends,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue