use roundrobin from lua-resty-balancer library and refactor balancer.lua

This commit is contained in:
Elvin Efendi 2018-05-04 17:39:57 -04:00
parent c80365dcfa
commit 6cb28e059c
7 changed files with 126 additions and 129 deletions

File diff suppressed because one or more lines are too long

View file

@ -189,7 +189,6 @@ func buildLuaSharedDictionaries(s interface{}, dynamicConfigurationEnabled bool,
if dynamicConfigurationEnabled {
out = append(out,
"lua_shared_dict configuration_data 5M",
"lua_shared_dict round_robin_state 1M",
"lua_shared_dict locks 512k",
"lua_shared_dict balancer_ewma 1M",
"lua_shared_dict balancer_ewma_last_touched_at 1M",

View file

@ -3,25 +3,21 @@ local json = require("cjson")
local configuration = require("configuration")
local util = require("util")
local lrucache = require("resty.lrucache")
local resty_lock = require("resty.lock")
local ewma = require("balancer.ewma")
local sticky = require("sticky")
local chash = require("balancer.chash")
local resty_balancer = require("balancer.resty")
-- measured in seconds
-- for an Nginx worker to pick up the new list of upstream peers
-- it will take <the delay until controller POSTed the backend object to the Nginx endpoint> + BACKENDS_SYNC_INTERVAL
local BACKENDS_SYNC_INTERVAL = 1
local ROUND_ROBIN_LOCK_KEY = "round_robin"
local DEFAULT_LB_ALG = "round_robin"
local round_robin_state = ngx.shared.round_robin_state
local _M = {}
local round_robin_lock = resty_lock:new("locks", {timeout = 0, exptime = 0.1})
-- TODO(elvinefendi) we can probably avoid storing all backends here. We already store them in their respective
-- load balancer implementations
local backends, err = lrucache.new(1024)
if not backends then
return error("failed to create the cache for backends: " .. (err or "unknown"))
@ -64,42 +60,27 @@ local function balance()
end
if backend["upstream-hash-by"] then
local endpoint = chash.balance(backend)
local endpoint = resty_balancer.balance(backend)
return endpoint.address, endpoint.port
end
if lb_alg == "ip_hash" then
-- TODO(elvinefendi) implement me
return backend.endpoints[0].address, backend.endpoints[0].port
elseif lb_alg == "ewma" then
if lb_alg == "ewma" then
local endpoint = ewma.balance(backend.endpoints)
return endpoint.address, endpoint.port
end
if lb_alg ~= DEFAULT_LB_ALG then
ngx.log(ngx.WARN, tostring(lb_alg) .. " is not supported, falling back to " .. DEFAULT_LB_ALG)
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", tostring(lb_alg), DEFAULT_LB_ALG))
end
-- Round-Robin
round_robin_lock:lock(backend.name .. ROUND_ROBIN_LOCK_KEY)
local last_index = round_robin_state:get(backend.name)
local index, endpoint = next(backend.endpoints, last_index)
if not index then
index = 1
endpoint = backend.endpoints[index]
end
local success, forcible
success, err, forcible = round_robin_state:set(backend.name, index)
if not success then
ngx.log(ngx.WARN, "round_robin_state:set failed " .. err)
end
if forcible then
ngx.log(ngx.WARN, "round_robin_state:set valid items forcibly overwritten")
local endpoint = resty_balancer.balance(backend)
if not endpoint then
return nil, nil
end
if is_sticky then
sticky.set_endpoint(endpoint, backend)
end
round_robin_lock:unlock(backend.name .. ROUND_ROBIN_LOCK_KEY)
return endpoint.address, endpoint.port
end
@ -107,22 +88,16 @@ end
local function sync_backend(backend)
backends:set(backend.name, backend)
-- also reset the respective balancer state since backend has changed
round_robin_state:delete(backend.name)
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
-- TODO: Reset state of EWMA per backend
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
if lb_alg == "ewma" then
ngx.shared.balancer_ewma:flush_all()
ngx.shared.balancer_ewma_last_touched_at:flush_all()
return
end
-- reset chash for this backend
if backend["upstream-hash-by"] then
chash.reinit(backend)
end
ngx.log(ngx.INFO, "syncronization completed for: " .. backend.name)
resty_balancer.reinit(backend)
end
local function sync_backends()
@ -161,7 +136,7 @@ end
function _M.init_worker()
_, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if err then
ngx.log(ngx.ERR, "error when setting up timer.every for sync_backends: " .. tostring(err))
ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
end
end
@ -186,12 +161,10 @@ function _M.call()
local ok
ok, err = ngx_balancer.set_current_peer(host, port)
if ok then
ngx.log(
ngx.INFO,
"current peer is set to " .. host .. ":" .. port .. " using lb_alg " .. tostring(get_current_lb_alg())
)
ngx.log(ngx.INFO,
string.format("current peer is set to %s:%s using lb_alg %s", host, port, tostring(get_current_lb_alg())))
else
ngx.log(ngx.ERR, "error while setting current upstream peer to: " .. tostring(err))
ngx.log(ngx.ERR, string.format("error while setting current upstream peer to %s", tostring(err)))
end
end

View file

@ -1,48 +0,0 @@
local resty_chash = require("resty.chash")
local util = require("util")
local string_sub = string.sub
local _M = {}
local instances = {}
-- given an Nginx variable i.e $request_uri
-- it returns value of ngx.var[request_uri]
local function get_lua_ngx_var(ngx_var)
local var_name = string_sub(ngx_var, 2)
return ngx.var[var_name]
end
function _M.balance(backend)
local instance = instances[backend.name]
if not instance then
return nil
end
local key = get_lua_ngx_var(backend["upstream-hash-by"])
local endpoint_string = instance:find(key)
local address, port = util.split_pair(endpoint_string, ":")
return { address = address, port = port }
end
function _M.reinit(backend)
local instance = instances[backend.name]
local nodes = {}
-- we don't support weighted consistent hashing
local weight = 1
for _, endpoint in pairs(backend.endpoints) do
local endpoint_string = endpoint.address .. ":" .. endpoint.port
nodes[endpoint_string] = weight
end
if instance then
instance:reinit(nodes)
else
instance = resty_chash:new(nodes)
instances[backend.name] = instance
end
end
return _M

View file

@ -0,0 +1,68 @@
local resty_roundrobin = require("resty.roundrobin")
local resty_chash = require("resty.chash")
local util = require("util")
local _M = {}
local instances = {}
local function get_resty_balancer_nodes(endpoints)
local nodes = {}
local weight = 1
for _, endpoint in pairs(endpoints) do
local endpoint_string = endpoint.address .. ":" .. endpoint.port
nodes[endpoint_string] = weight
end
return nodes
end
local function init_resty_balancer(factory, instance, endpoints)
local nodes = get_resty_balancer_nodes(endpoints)
if instance then
instance:reinit(nodes)
else
instance = factory:new(nodes)
end
return instance
end
function _M.balance(backend)
local instance = instances[backend.name]
if not instance then
return nil
end
local endpoint_string
if backend["upstream-hash-by"] then
local key = util.lua_ngx_var(backend["upstream-hash-by"])
endpoint_string = instance:find(key)
else
endpoint_string = instance:find()
end
local address, port = util.split_pair(endpoint_string, ":")
return { address = address, port = port }
end
function _M.reinit(backend)
local instance = instances[backend.name]
local factory = resty_roundrobin
if backend["upstream-hash-by"] then
factory = resty_chash
end
if instance then
local mt = getmetatable(instance)
if mt.__index ~= factory then
ngx.log(ngx.INFO, "LB algorithm has been changed, resetting the instance")
instance = nil
end
end
instances[backend.name] = init_resty_balancer(factory, instance, backend.endpoints)
end
return _M

View file

@ -49,7 +49,7 @@ local function init()
mock_sticky = {}
mock_ngx_balancer = {}
mock_ewma = {}
mock_chash = {}
mock_resty_balancer = {}
mock_backends = dict_generator(default_backends)
mock_lrucache = {
new = function () return mock_backends end
@ -63,7 +63,6 @@ local function init()
}
_G.ngx = {
shared = {
round_robin_state = dict_generator({}),
balancer_ewma = dict_generator({}),
balancer_ewma_last_touched_at = dict_generator({}),
},
@ -79,7 +78,7 @@ local function init()
package.loaded["cjson"] = mock_cjson
package.loaded["resty.lock"] = mock_lock
package.loaded["balancer.ewma"] = mock_ewma
package.loaded["balancer.chash"] = mock_chash
package.loaded["balancer.resty"] = mock_resty_balancer
package.loaded["configuration"] = mock_config
package.loaded["sticky"] = mock_sticky
balancer = require("balancer")
@ -106,7 +105,6 @@ describe("[balancer_test]", function()
before_each(function()
_G.ngx.get_phase = nil
_G.ngx.shared.round_robin_state._vals = {}
_G.ngx.var = {}
mock_backends._vals = default_backends
mock_sticky.is_sticky = function(b) return false end
@ -148,6 +146,7 @@ describe("[balancer_test]", function()
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")
mock_resty_balancer.balance = function(b) return {address = "000.000.000", port = "8080"} end
assert.has_no_errors(balancer.call)
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
assert.spy(set_more_tries_spy).was_called_with(1)
@ -157,6 +156,7 @@ describe("[balancer_test]", function()
mock_ngx_balancer.set_more_tries:clear()
mock_ngx_balancer.set_current_peer:clear()
mock_resty_balancer.balance = function(b) return {address = "000.000.001", port = "8081"} end
assert.has_no_errors(balancer.call)
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
assert.spy(set_more_tries_spy).was_called_with(1)
@ -205,6 +205,7 @@ describe("[balancer_test]", function()
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")
mock_resty_balancer.balance = function(b) return {address = "000.000.000", port = "8080"} end
assert.has_no_errors(balancer.call)
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
assert.spy(set_more_tries_spy).was_called_with(1)
@ -261,17 +262,16 @@ describe("[balancer_test]", function()
it("lb_alg=round_robin, updates backend when sync is required", function()
mock_config.get_backends_data = function() return { default_backends.mock_rr_backend } end
mock_backends._vals = {}
_G.ngx.shared.round_robin_state._vals = default_backends.mock_rr_backend
local backend_set_spy = spy.on(mock_backends, "set")
local rr_delete_spy = spy.on(_G.ngx.shared.round_robin_state, "delete")
local ewma_flush_spy = spy.on(_G.ngx.shared.balancer_ewma, "flush_all")
local ewma_lta_flush_spy = spy.on(_G.ngx.shared.balancer_ewma_last_touched_at, "flush_all")
mock_resty_balancer.balance = function(b) return {address = "000.000.000", port = "8080"} end
mock_resty_balancer.reinit = function(b) return end
assert.has_no_errors(balancer.init_worker)
assert.spy(backend_set_spy)
.was_called_with(match.is_table(), default_backends.mock_rr_backend.name, match.is_table())
assert.spy(rr_delete_spy).was_called_with(match.is_table(), default_backends.mock_rr_backend.name)
assert.spy(ewma_flush_spy).was_not_called()
assert.spy(ewma_lta_flush_spy).was_not_called()
end)
@ -279,17 +279,14 @@ describe("[balancer_test]", function()
it("lb_alg=ewma, updates backend when sync is required", function()
mock_config.get_backends_data = function() return { default_backends.mock_ewma_backend } end
mock_backends._vals = {}
_G.ngx.shared.round_robin_state._vals = default_backends.mock_ewma_backend
local backend_set_spy = spy.on(mock_backends, "set")
local rr_delete_spy = spy.on(_G.ngx.shared.round_robin_state, "delete")
local ewma_flush_spy = spy.on(_G.ngx.shared.balancer_ewma, "flush_all")
local ewma_lta_flush_spy = spy.on(_G.ngx.shared.balancer_ewma_last_touched_at, "flush_all")
assert.has_no_errors(balancer.init_worker)
assert.spy(backend_set_spy)
.was_called_with(match.is_table(), default_backends.mock_ewma_backend.name, match.is_table())
assert.spy(rr_delete_spy).was_called_with(match.is_table(), default_backends.mock_ewma_backend.name)
assert.spy(ewma_flush_spy).was_called_with(match.is_table())
assert.spy(ewma_lta_flush_spy).was_called_with(match.is_table())
end)

View file

@ -1,5 +1,13 @@
local _M = {}
local string_len = string.len
local string_sub = string.sub
-- given an Nginx variable i.e $request_uri
-- it returns value of ngx.var[request_uri]
function _M.lua_ngx_var(ngx_var)
local var_name = string_sub(ngx_var, 2)
return ngx.var[var_name]
end
function _M.split_pair(pair, seperator)
local i = pair:find(seperator)