Merge pull request #3467 from Shopify/ewma-stats-per-backend

store ewma stats per backend
This commit is contained in:
k8s-ci-robot 2018-11-26 13:50:09 -08:00 committed by GitHub
commit 118fdad6b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 20 deletions

View file

@ -11,9 +11,6 @@ local split = require("util.split")
local DECAY_TIME = 10 -- this value is in seconds local DECAY_TIME = 10 -- this value is in seconds
local PICK_SET_SIZE = 2 local PICK_SET_SIZE = 2
local balancer_ewma = {}
local balancer_ewma_last_touched_at = {}
local _M = { name = "ewma" } local _M = { name = "ewma" }
local function decay_ewma(ewma, last_touched_at, rtt, now) local function decay_ewma(ewma, last_touched_at, rtt, now)
@ -25,28 +22,28 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
return ewma return ewma
end end
local function get_or_update_ewma(upstream, rtt, update) local function get_or_update_ewma(self, upstream, rtt, update)
local ewma = balancer_ewma[upstream] or 0 local ewma = self.ewma[upstream] or 0
local now = ngx.now() local now = ngx.now()
local last_touched_at = balancer_ewma_last_touched_at[upstream] or 0 local last_touched_at = self.ewma_last_touched_at[upstream] or 0
ewma = decay_ewma(ewma, last_touched_at, rtt, now) ewma = decay_ewma(ewma, last_touched_at, rtt, now)
if not update then if not update then
return ewma, nil return ewma, nil
end end
balancer_ewma_last_touched_at[upstream] = now self.ewma[upstream] = ewma
balancer_ewma[upstream] = ewma self.ewma_last_touched_at[upstream] = now
return ewma, nil return ewma, nil
end end
local function score(upstream) local function score(self, upstream)
-- Original implementation used names -- Original implementation used names
-- Endpoints don't have names, so passing in IP:Port as key instead -- Endpoints don't have names, so passing in IP:Port as key instead
local upstream_name = upstream.address .. ":" .. upstream.port local upstream_name = upstream.address .. ":" .. upstream.port
return get_or_update_ewma(upstream_name, 0, false) return get_or_update_ewma(self, upstream_name, 0, false)
end end
-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle -- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
@ -62,12 +59,12 @@ local function shuffle_peers(peers, k)
-- peers[1 .. k] will now contain a randomly selected k from #peers -- peers[1 .. k] will now contain a randomly selected k from #peers
end end
local function pick_and_score(peers, k) local function pick_and_score(self, peers, k)
shuffle_peers(peers, k) shuffle_peers(peers, k)
local lowest_score_index = 1 local lowest_score_index = 1
local lowest_score = score(peers[lowest_score_index]) local lowest_score = score(self, peers[lowest_score_index])
for i = 2, k do for i = 2, k do
local new_score = score(peers[i]) local new_score = score(self, peers[i])
if new_score < lowest_score then if new_score < lowest_score then
lowest_score_index, lowest_score = i, new_score lowest_score_index, lowest_score = i, new_score
end end
@ -82,14 +79,14 @@ function _M.balance(self)
if #peers > 1 then if #peers > 1 then
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
local peer_copy = util.deepcopy(peers) local peer_copy = util.deepcopy(peers)
endpoint = pick_and_score(peer_copy, k) endpoint = pick_and_score(self, peer_copy, k)
end end
-- TODO(elvinefendi) move this processing to _M.sync -- TODO(elvinefendi) move this processing to _M.sync
return endpoint.address .. ":" .. endpoint.port return endpoint.address .. ":" .. endpoint.port
end end
function _M.after_balance(_) function _M.after_balance(self)
local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0 local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0
local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0 local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0
local rtt = connect_time + response_time local rtt = connect_time + response_time
@ -98,7 +95,7 @@ function _M.after_balance(_)
if util.is_blank(upstream) then if util.is_blank(upstream) then
return return
end end
get_or_update_ewma(upstream, rtt, true) get_or_update_ewma(self, upstream, rtt, true)
end end
function _M.sync(self, backend) function _M.sync(self, backend)
@ -111,15 +108,15 @@ function _M.sync(self, backend)
end end
self.peers = backend.endpoints self.peers = backend.endpoints
self.ewma = {}
-- TODO: Reset state of EWMA per backend self.ewma_last_touched_at = {}
balancer_ewma = {}
balancer_ewma_last_touched_at = {}
end end
function _M.new(self, backend) function _M.new(self, backend)
local o = { local o = {
peers = backend.endpoints, peers = backend.endpoints,
ewma = {},
ewma_last_touched_at = {},
traffic_shaping_policy = backend.trafficShapingPolicy, traffic_shaping_policy = backend.trafficShapingPolicy,
alternative_backends = backend.alternativeBackends, alternative_backends = backend.alternativeBackends,
} }

View file

@ -3,6 +3,24 @@ local util = require("util")
describe("Balancer ewma", function() describe("Balancer ewma", function()
local balancer_ewma = require("balancer.ewma") local balancer_ewma = require("balancer.ewma")
describe("after_balance()", function()
local ngx_now = 1543238266
_G.ngx.now = function() return ngx_now end
_G.ngx.var = { upstream_response_time = "0.25", upstream_connect_time = "0.02", upstream_addr = "10.184.7.40:8080" }
it("updates EWMA stats", function()
local backend = {
name = "my-dummy-backend", ["load-balance"] = "ewma",
endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } }
}
local instance = balancer_ewma:new(backend)
instance:after_balance()
assert.equal(0.27, instance.ewma[ngx.var.upstream_addr])
assert.equal(ngx_now, instance.ewma_last_touched_at[ngx.var.upstream_addr])
end)
end)
describe("balance()", function() describe("balance()", function()
it("returns single endpoint when the given backend has only one endpoint", function() it("returns single endpoint when the given backend has only one endpoint", function()
local backend = { local backend = {