diff --git a/rootfs/etc/nginx/lua/balancer/ewma.lua b/rootfs/etc/nginx/lua/balancer/ewma.lua index 0ab4e6544..7f0bd7740 100644 --- a/rootfs/etc/nginx/lua/balancer/ewma.lua +++ b/rootfs/etc/nginx/lua/balancer/ewma.lua @@ -11,9 +11,6 @@ local split = require("util.split") local DECAY_TIME = 10 -- this value is in seconds local PICK_SET_SIZE = 2 -local balancer_ewma = {} -local balancer_ewma_last_touched_at = {} - local _M = { name = "ewma" } local function decay_ewma(ewma, last_touched_at, rtt, now) @@ -25,28 +22,28 @@ local function decay_ewma(ewma, last_touched_at, rtt, now) return ewma end -local function get_or_update_ewma(upstream, rtt, update) - local ewma = balancer_ewma[upstream] or 0 +local function get_or_update_ewma(self, upstream, rtt, update) + local ewma = self.ewma[upstream] or 0 local now = ngx.now() - local last_touched_at = balancer_ewma_last_touched_at[upstream] or 0 + local last_touched_at = self.ewma_last_touched_at[upstream] or 0 ewma = decay_ewma(ewma, last_touched_at, rtt, now) if not update then return ewma, nil end - balancer_ewma_last_touched_at[upstream] = now - balancer_ewma[upstream] = ewma + self.ewma[upstream] = ewma + self.ewma_last_touched_at[upstream] = now return ewma, nil end -local function score(upstream) +local function score(self, upstream) -- Original implementation used names -- Endpoints don't have names, so passing in IP:Port as key instead local upstream_name = upstream.address .. ":" .. upstream.port - return get_or_update_ewma(upstream_name, 0, false) + return get_or_update_ewma(self, upstream_name, 0, false) end -- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle @@ -62,12 +59,12 @@ local function shuffle_peers(peers, k) -- peers[1 .. k] will now contain a randomly selected k from #peers end -local function pick_and_score(peers, k) +local function pick_and_score(self, peers, k) shuffle_peers(peers, k) local lowest_score_index = 1 - local lowest_score = score(peers[lowest_score_index]) + local lowest_score = score(self, peers[lowest_score_index]) for i = 2, k do - local new_score = score(peers[i]) + local new_score = score(self, peers[i]) if new_score < lowest_score then lowest_score_index, lowest_score = i, new_score end @@ -82,14 +79,14 @@ function _M.balance(self) if #peers > 1 then local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE local peer_copy = util.deepcopy(peers) - endpoint = pick_and_score(peer_copy, k) + endpoint = pick_and_score(self, peer_copy, k) end -- TODO(elvinefendi) move this processing to _M.sync return endpoint.address .. ":" .. endpoint.port end -function _M.after_balance(_) +function _M.after_balance(self) local response_time = tonumber(split.get_first_value(ngx.var.upstream_response_time)) or 0 local connect_time = tonumber(split.get_first_value(ngx.var.upstream_connect_time)) or 0 local rtt = connect_time + response_time @@ -98,7 +95,7 @@ function _M.after_balance(_) if util.is_blank(upstream) then return end - get_or_update_ewma(upstream, rtt, true) + get_or_update_ewma(self, upstream, rtt, true) end function _M.sync(self, backend) @@ -111,15 +108,15 @@ function _M.sync(self, backend) end self.peers = backend.endpoints - - -- TODO: Reset state of EWMA per backend - balancer_ewma = {} - balancer_ewma_last_touched_at = {} + self.ewma = {} + self.ewma_last_touched_at = {} end function _M.new(self, backend) local o = { peers = backend.endpoints, + ewma = {}, + ewma_last_touched_at = {}, traffic_shaping_policy = backend.trafficShapingPolicy, alternative_backends = backend.alternativeBackends, }