Add EWMA as configurable load balancing algorithm (#2229)
This commit is contained in:
parent
b0a63fe3ff
commit
6e099c5f57
6 changed files with 283 additions and 13 deletions
|
@ -519,6 +519,7 @@ The value can either be:
|
||||||
- round_robin: to use the default round robin loadbalancer
|
- round_robin: to use the default round robin loadbalancer
|
||||||
- least_conn: to use the least connected method
|
- least_conn: to use the least connected method
|
||||||
- ip_hash: to use a hash of the server for routing.
|
- ip_hash: to use a hash of the server for routing.
|
||||||
|
- ewma: to use the peak ewma method for routing (only available with `enable-dynamic-configuration` flag)
|
||||||
|
|
||||||
The default is least_conn.
|
The default is least_conn.
|
||||||
|
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -4,6 +4,7 @@ local configuration = require("configuration")
|
||||||
local util = require("util")
|
local util = require("util")
|
||||||
local lrucache = require("resty.lrucache")
|
local lrucache = require("resty.lrucache")
|
||||||
local resty_lock = require("resty.lock")
|
local resty_lock = require("resty.lock")
|
||||||
|
local ewma = require("balancer.ewma")
|
||||||
|
|
||||||
-- measured in seconds
|
-- measured in seconds
|
||||||
-- for an Nginx worker to pick up the new list of upstream peers
|
-- for an Nginx worker to pick up the new list of upstream peers
|
||||||
|
@ -11,6 +12,7 @@ local resty_lock = require("resty.lock")
|
||||||
local BACKENDS_SYNC_INTERVAL = 1
|
local BACKENDS_SYNC_INTERVAL = 1
|
||||||
|
|
||||||
local ROUND_ROBIN_LOCK_KEY = "round_robin"
|
local ROUND_ROBIN_LOCK_KEY = "round_robin"
|
||||||
|
local DEFAULT_LB_ALG = "round_robin"
|
||||||
|
|
||||||
local round_robin_state = ngx.shared.round_robin_state
|
local round_robin_state = ngx.shared.round_robin_state
|
||||||
|
|
||||||
|
@ -23,28 +25,49 @@ if not backends then
|
||||||
return error("failed to create the cache for backends: " .. (err or "unknown"))
|
return error("failed to create the cache for backends: " .. (err or "unknown"))
|
||||||
end
|
end
|
||||||
|
|
||||||
local function balance()
|
local function get_current_backend()
|
||||||
local backend_name = ngx.var.proxy_upstream_name
|
local backend_name = ngx.var.proxy_upstream_name
|
||||||
local backend = backends:get(backend_name)
|
return backends:get(backend_name)
|
||||||
-- lb_alg field does not exist for ingress.Backend struct for now, so lb_alg
|
end
|
||||||
-- will always be round_robin
|
|
||||||
local lb_alg = backend.lb_alg or "round_robin"
|
local function get_current_lb_alg()
|
||||||
|
local backend = get_current_backend()
|
||||||
|
return backend["load-balance"] or DEFAULT_LB_ALG
|
||||||
|
end
|
||||||
|
|
||||||
|
local function balance()
|
||||||
|
local backend = get_current_backend()
|
||||||
|
local lb_alg = get_current_lb_alg()
|
||||||
|
|
||||||
if lb_alg == "ip_hash" then
|
if lb_alg == "ip_hash" then
|
||||||
-- TODO(elvinefendi) implement me
|
-- TODO(elvinefendi) implement me
|
||||||
return backend.endpoints[0].address, backend.endpoints[0].port
|
return backend.endpoints[0].address, backend.endpoints[0].port
|
||||||
|
elseif lb_alg == "ewma" then
|
||||||
|
local endpoint = ewma.balance(backend.endpoints)
|
||||||
|
return endpoint.address, endpoint.port
|
||||||
|
end
|
||||||
|
|
||||||
|
if lb_alg ~= DEFAULT_LB_ALG then
|
||||||
|
ngx.log(ngx.WARN, tostring(lb_alg) .. " is not supported, falling back to " .. DEFAULT_LB_ALG)
|
||||||
end
|
end
|
||||||
|
|
||||||
-- Round-Robin
|
-- Round-Robin
|
||||||
round_robin_lock:lock(backend_name .. ROUND_ROBIN_LOCK_KEY)
|
round_robin_lock:lock(backend.name .. ROUND_ROBIN_LOCK_KEY)
|
||||||
local last_index = round_robin_state:get(backend_name)
|
local last_index = round_robin_state:get(backend.name)
|
||||||
local index, endpoint = next(backend.endpoints, last_index)
|
local index, endpoint = next(backend.endpoints, last_index)
|
||||||
if not index then
|
if not index then
|
||||||
index = 1
|
index = 1
|
||||||
endpoint = backend.endpoints[index]
|
endpoint = backend.endpoints[index]
|
||||||
end
|
end
|
||||||
round_robin_state:set(backend_name, index)
|
local success, forcible
|
||||||
round_robin_lock:unlock(backend_name .. ROUND_ROBIN_LOCK_KEY)
|
success, err, forcible = round_robin_state:set(backend.name, index)
|
||||||
|
if not success then
|
||||||
|
ngx.log(ngx.WARN, "round_robin_state:set failed " .. err)
|
||||||
|
end
|
||||||
|
if forcible then
|
||||||
|
ngx.log(ngx.WARN, "round_robin_state:set valid items forcibly overwritten")
|
||||||
|
end
|
||||||
|
round_robin_lock:unlock(backend.name .. ROUND_ROBIN_LOCK_KEY)
|
||||||
|
|
||||||
return endpoint.address, endpoint.port
|
return endpoint.address, endpoint.port
|
||||||
end
|
end
|
||||||
|
@ -55,6 +78,13 @@ local function sync_backend(backend)
|
||||||
-- also reset the respective balancer state since backend has changed
|
-- also reset the respective balancer state since backend has changed
|
||||||
round_robin_state:delete(backend.name)
|
round_robin_state:delete(backend.name)
|
||||||
|
|
||||||
|
-- TODO: Reset state of EWMA per backend
|
||||||
|
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
|
||||||
|
if lb_alg == "ewma" then
|
||||||
|
ngx.shared.balancer_ewma:flush_all()
|
||||||
|
ngx.shared.balancer_ewma_last_touched_at:flush_all()
|
||||||
|
end
|
||||||
|
|
||||||
ngx.log(ngx.INFO, "syncronization completed for: " .. backend.name)
|
ngx.log(ngx.INFO, "syncronization completed for: " .. backend.name)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -84,6 +114,13 @@ local function sync_backends()
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
local function after_balance()
|
||||||
|
local lb_alg = get_current_lb_alg()
|
||||||
|
if lb_alg == "ewma" then
|
||||||
|
ewma.after_balance()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
function _M.init_worker()
|
function _M.init_worker()
|
||||||
_, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
|
_, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
|
||||||
if err then
|
if err then
|
||||||
|
@ -92,6 +129,15 @@ function _M.init_worker()
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.call()
|
function _M.call()
|
||||||
|
local phase = ngx.get_phase()
|
||||||
|
if phase == "log" then
|
||||||
|
after_balance()
|
||||||
|
return
|
||||||
|
end
|
||||||
|
if phase ~= "balancer" then
|
||||||
|
return error("must be called in balancer or log, but was called in: " .. phase)
|
||||||
|
end
|
||||||
|
|
||||||
ngx_balancer.set_more_tries(1)
|
ngx_balancer.set_more_tries(1)
|
||||||
|
|
||||||
local host, port = balance()
|
local host, port = balance()
|
||||||
|
@ -99,7 +145,10 @@ function _M.call()
|
||||||
local ok
|
local ok
|
||||||
ok, err = ngx_balancer.set_current_peer(host, port)
|
ok, err = ngx_balancer.set_current_peer(host, port)
|
||||||
if ok then
|
if ok then
|
||||||
ngx.log(ngx.INFO, "current peer is set to " .. host .. ":" .. port)
|
ngx.log(
|
||||||
|
ngx.INFO,
|
||||||
|
"current peer is set to " .. host .. ":" .. port .. " using lb_alg " .. tostring(get_current_lb_alg())
|
||||||
|
)
|
||||||
else
|
else
|
||||||
ngx.log(ngx.ERR, "error while setting current upstream peer to: " .. tostring(err))
|
ngx.log(ngx.ERR, "error while setting current upstream peer to: " .. tostring(err))
|
||||||
end
|
end
|
||||||
|
|
141
rootfs/etc/nginx/lua/balancer/ewma.lua
Normal file
141
rootfs/etc/nginx/lua/balancer/ewma.lua
Normal file
|
@ -0,0 +1,141 @@
|
||||||
|
-- Original Authors: Shiv Nagarajan & Scott Francis
|
||||||
|
-- Accessed: March 12, 2018
|
||||||
|
-- Inspiration drawn from:
|
||||||
|
-- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
|
||||||
|
-- /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
|
||||||
|
|
||||||
|
|
||||||
|
local resty_lock = require("resty.lock")
|
||||||
|
local util = require("util")
|
||||||
|
|
||||||
|
local DECAY_TIME = 10 -- this value is in seconds
|
||||||
|
local LOCK_KEY = ":ewma_key"
|
||||||
|
local PICK_SET_SIZE = 2
|
||||||
|
|
||||||
|
local ewma_lock = resty_lock:new("locks", {timeout = 0, exptime = 0.1})
|
||||||
|
|
||||||
|
local _M = {}
|
||||||
|
|
||||||
|
local function lock(upstream)
|
||||||
|
local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
|
||||||
|
if err then
|
||||||
|
if err ~= "timeout" then
|
||||||
|
ngx.log(ngx.ERR, string.format("EWMA Balancer failed to lock: %s", tostring(err)))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
return err
|
||||||
|
end
|
||||||
|
|
||||||
|
local function unlock()
|
||||||
|
local ok, err = ewma_lock:unlock()
|
||||||
|
if not ok then
|
||||||
|
ngx.log(ngx.ERR, string.format("EWMA Balancer failed to unlock: %s", tostring(err)))
|
||||||
|
end
|
||||||
|
|
||||||
|
return err
|
||||||
|
end
|
||||||
|
|
||||||
|
local function decay_ewma(ewma, last_touched_at, rtt, now)
|
||||||
|
local td = now - last_touched_at
|
||||||
|
td = (td > 0) and td or 0
|
||||||
|
local weight = math.exp(-td/DECAY_TIME)
|
||||||
|
|
||||||
|
ewma = ewma * weight + rtt * (1.0 - weight)
|
||||||
|
return ewma
|
||||||
|
end
|
||||||
|
|
||||||
|
local function get_or_update_ewma(upstream, rtt, update)
|
||||||
|
local lock_err = nil
|
||||||
|
if update then
|
||||||
|
lock_err = lock(upstream)
|
||||||
|
end
|
||||||
|
local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
|
||||||
|
if lock_err ~= nil then
|
||||||
|
return ewma, lock_err
|
||||||
|
end
|
||||||
|
|
||||||
|
local now = ngx.now()
|
||||||
|
local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
|
||||||
|
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
|
||||||
|
|
||||||
|
if not update then
|
||||||
|
return ewma, nil
|
||||||
|
end
|
||||||
|
|
||||||
|
local success, err, forcible = ngx.shared.balancer_ewma_last_touched_at:set(upstream, now)
|
||||||
|
if not success then
|
||||||
|
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set failed " .. err)
|
||||||
|
end
|
||||||
|
if forcible then
|
||||||
|
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set valid items forcibly overwritten")
|
||||||
|
end
|
||||||
|
|
||||||
|
success, err, forcible = ngx.shared.balancer_ewma:set(upstream, ewma)
|
||||||
|
if not success then
|
||||||
|
ngx.log(ngx.WARN, "balancer_ewma:set failed " .. err)
|
||||||
|
end
|
||||||
|
if forcible then
|
||||||
|
ngx.log(ngx.WARN, "balancer_ewma:set valid items forcibly overwritten")
|
||||||
|
end
|
||||||
|
|
||||||
|
unlock()
|
||||||
|
return ewma, nil
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
local function score(upstream)
|
||||||
|
-- Original implementation used names
|
||||||
|
-- Endpoints don't have names, so passing in IP:Port as key instead
|
||||||
|
local upstream_name = upstream.address .. ":" .. upstream.port
|
||||||
|
return get_or_update_ewma(upstream_name, 0, false)
|
||||||
|
end
|
||||||
|
|
||||||
|
-- implementation similar to https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
|
||||||
|
-- or https://en.wikipedia.org/wiki/Random_permutation
|
||||||
|
-- loop from 1 .. k
|
||||||
|
-- pick a random value r from the remaining set of unpicked values (i .. n)
|
||||||
|
-- swap the value at position i with the value at position r
|
||||||
|
local function shuffle_peers(peers, k)
|
||||||
|
for i=1, k do
|
||||||
|
local rand_index = math.random(i,#peers)
|
||||||
|
peers[i], peers[rand_index] = peers[rand_index], peers[i]
|
||||||
|
end
|
||||||
|
-- peers[1 .. k] will now contain a randomly selected k from #peers
|
||||||
|
end
|
||||||
|
|
||||||
|
local function pick_and_score(peers, k)
|
||||||
|
shuffle_peers(peers, k)
|
||||||
|
local lowest_score_index = 1
|
||||||
|
local lowest_score = score(peers[lowest_score_index])
|
||||||
|
for i = 2, k do
|
||||||
|
local new_score = score(peers[i])
|
||||||
|
if new_score < lowest_score then
|
||||||
|
lowest_score_index, lowest_score = i, new_score
|
||||||
|
end
|
||||||
|
end
|
||||||
|
return peers[lowest_score_index]
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.balance(peers)
|
||||||
|
if #peers == 1 then
|
||||||
|
return peers[1]
|
||||||
|
end
|
||||||
|
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
|
||||||
|
local peer_copy = util.deepcopy(peers)
|
||||||
|
return pick_and_score(peer_copy, k)
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.after_balance()
|
||||||
|
local response_time = tonumber(util.get_first_value(ngx.var.upstream_response_time)) or 0
|
||||||
|
local connect_time = tonumber(util.get_first_value(ngx.var.upstream_connect_time)) or 0
|
||||||
|
local rtt = connect_time + response_time
|
||||||
|
local upstream = util.get_first_value(ngx.var.upstream_addr)
|
||||||
|
|
||||||
|
if util.is_blank(upstream) then
|
||||||
|
return
|
||||||
|
end
|
||||||
|
get_or_update_ewma(upstream, rtt, true)
|
||||||
|
end
|
||||||
|
|
||||||
|
return _M
|
|
@ -1,4 +1,5 @@
|
||||||
local _M = {}
|
local _M = {}
|
||||||
|
local string_len = string.len
|
||||||
|
|
||||||
-- this implementation is taken from
|
-- this implementation is taken from
|
||||||
-- https://web.archive.org/web/20131225070434/http://snippets.luacode.org/snippets/Deep_Comparison_of_Two_Values_3
|
-- https://web.archive.org/web/20131225070434/http://snippets.luacode.org/snippets/Deep_Comparison_of_Two_Values_3
|
||||||
|
@ -24,4 +25,50 @@ local function deep_compare(t1, t2, ignore_mt)
|
||||||
end
|
end
|
||||||
_M.deep_compare = deep_compare
|
_M.deep_compare = deep_compare
|
||||||
|
|
||||||
|
function _M.is_blank(str)
|
||||||
|
return str == nil or string_len(str) == 0
|
||||||
|
end
|
||||||
|
|
||||||
|
-- http://nginx.org/en/docs/http/ngx_http_upstream_module.html#example
|
||||||
|
-- CAVEAT: nginx is giving out : instead of , so the docs are wrong
|
||||||
|
-- 127.0.0.1:26157 : 127.0.0.1:26157 , ngx.var.upstream_addr
|
||||||
|
-- 200 : 200 , ngx.var.upstream_status
|
||||||
|
-- 0.00 : 0.00, ngx.var.upstream_response_time
|
||||||
|
function _M.split_upstream_var(var)
|
||||||
|
if not var then
|
||||||
|
return nil, nil
|
||||||
|
end
|
||||||
|
local t = {}
|
||||||
|
for v in var:gmatch("[^%s|,]+") do
|
||||||
|
if v ~= ":" then
|
||||||
|
t[#t+1] = v
|
||||||
|
end
|
||||||
|
end
|
||||||
|
return t
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.get_first_value(var)
|
||||||
|
local t = _M.split_upstream_var(var) or {}
|
||||||
|
if #t == 0 then return nil end
|
||||||
|
return t[1]
|
||||||
|
end
|
||||||
|
|
||||||
|
-- this implementation is taken from:
|
||||||
|
-- https://github.com/luafun/luafun/blob/master/fun.lua#L33
|
||||||
|
-- SHA: 04c99f9c393e54a604adde4b25b794f48104e0d0
|
||||||
|
local function deepcopy(orig)
|
||||||
|
local orig_type = type(orig)
|
||||||
|
local copy
|
||||||
|
if orig_type == 'table' then
|
||||||
|
copy = {}
|
||||||
|
for orig_key, orig_value in next, orig, nil do
|
||||||
|
copy[deepcopy(orig_key)] = deepcopy(orig_value)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
copy = orig
|
||||||
|
end
|
||||||
|
return copy
|
||||||
|
end
|
||||||
|
_M.deepcopy = deepcopy
|
||||||
|
|
||||||
return _M
|
return _M
|
||||||
|
|
|
@ -42,6 +42,8 @@ http {
|
||||||
lua_shared_dict configuration_data 5M;
|
lua_shared_dict configuration_data 5M;
|
||||||
lua_shared_dict round_robin_state 1M;
|
lua_shared_dict round_robin_state 1M;
|
||||||
lua_shared_dict locks 512k;
|
lua_shared_dict locks 512k;
|
||||||
|
lua_shared_dict balancer_ewma 1M;
|
||||||
|
lua_shared_dict balancer_ewma_last_touched_at 1M;
|
||||||
|
|
||||||
init_by_lua_block {
|
init_by_lua_block {
|
||||||
require("resty.core")
|
require("resty.core")
|
||||||
|
@ -976,6 +978,11 @@ stream {
|
||||||
{{ end }}
|
{{ end }}
|
||||||
|
|
||||||
{{ if not (empty $location.Backend) }}
|
{{ if not (empty $location.Backend) }}
|
||||||
|
{{ if $all.DynamicConfigurationEnabled}}
|
||||||
|
log_by_lua_block {
|
||||||
|
balancer.call()
|
||||||
|
}
|
||||||
|
{{ end }}
|
||||||
{{ buildProxyPass $server.Hostname $all.Backends $location $all.DynamicConfigurationEnabled }}
|
{{ buildProxyPass $server.Hostname $all.Backends $location $all.DynamicConfigurationEnabled }}
|
||||||
{{ if (or (eq $location.Proxy.ProxyRedirectFrom "default") (eq $location.Proxy.ProxyRedirectFrom "off")) }}
|
{{ if (or (eq $location.Proxy.ProxyRedirectFrom "default") (eq $location.Proxy.ProxyRedirectFrom "off")) }}
|
||||||
proxy_redirect {{ $location.Proxy.ProxyRedirectFrom }};
|
proxy_redirect {{ $location.Proxy.ProxyRedirectFrom }};
|
||||||
|
|
Loading…
Reference in a new issue