fix bug with lua sticky session implementation and refactor balancer
This commit is contained in:
parent
94198fce83
commit
7ac4e1db30
8 changed files with 194 additions and 299 deletions
File diff suppressed because one or more lines are too long
|
@ -4,7 +4,6 @@ local configuration = require("configuration")
|
||||||
local util = require("util")
|
local util = require("util")
|
||||||
local lrucache = require("resty.lrucache")
|
local lrucache = require("resty.lrucache")
|
||||||
local ewma = require("balancer.ewma")
|
local ewma = require("balancer.ewma")
|
||||||
local sticky = require("sticky")
|
|
||||||
local resty_balancer = require("balancer.resty")
|
local resty_balancer = require("balancer.resty")
|
||||||
|
|
||||||
-- measured in seconds
|
-- measured in seconds
|
||||||
|
@ -18,9 +17,9 @@ local _M = {}
|
||||||
|
|
||||||
-- TODO(elvinefendi) we can probably avoid storing all backends here. We already store them in their respective
|
-- TODO(elvinefendi) we can probably avoid storing all backends here. We already store them in their respective
|
||||||
-- load balancer implementations
|
-- load balancer implementations
|
||||||
local backends, err = lrucache.new(1024)
|
local backends, backends_err = lrucache.new(1024)
|
||||||
if not backends then
|
if not backends then
|
||||||
return error("failed to create the cache for backends: " .. (err or "unknown"))
|
return error("failed to create the cache for backends: " .. (backends_err or "unknown"))
|
||||||
end
|
end
|
||||||
|
|
||||||
local function get_current_backend()
|
local function get_current_backend()
|
||||||
|
@ -35,33 +34,33 @@ local function get_current_backend()
|
||||||
return backend
|
return backend
|
||||||
end
|
end
|
||||||
|
|
||||||
local function get_current_lb_alg()
|
local function get_balancer(backend)
|
||||||
local backend = get_current_backend()
|
|
||||||
if not backend then
|
if not backend then
|
||||||
return nil
|
return nil
|
||||||
end
|
end
|
||||||
|
|
||||||
return backend["load-balance"] or DEFAULT_LB_ALG
|
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
|
||||||
|
if resty_balancer.is_applicable(backend) then
|
||||||
|
return resty_balancer
|
||||||
|
elseif lb_alg ~= "ewma" then
|
||||||
|
if lb_alg ~= DEFAULT_LB_ALG then
|
||||||
|
ngx.log(ngx.WARN,
|
||||||
|
string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
|
||||||
|
end
|
||||||
|
return resty_balancer
|
||||||
|
end
|
||||||
|
|
||||||
|
return ewma
|
||||||
end
|
end
|
||||||
|
|
||||||
local function balance()
|
local function balance()
|
||||||
local backend = get_current_backend()
|
local backend = get_current_backend()
|
||||||
if not backend then
|
local balancer = get_balancer(backend)
|
||||||
|
if not balancer then
|
||||||
return nil, nil
|
return nil, nil
|
||||||
end
|
end
|
||||||
local lb_alg = get_current_lb_alg()
|
|
||||||
local is_sticky = sticky.is_sticky(backend)
|
|
||||||
|
|
||||||
if is_sticky then
|
local endpoint = balancer.balance(backend)
|
||||||
local endpoint = sticky.get_endpoint(backend)
|
|
||||||
if endpoint ~= nil then
|
|
||||||
return endpoint.address, endpoint.port
|
|
||||||
end
|
|
||||||
lb_alg = "round_robin"
|
|
||||||
end
|
|
||||||
|
|
||||||
if backend["upstream-hash-by"] then
|
|
||||||
local endpoint = resty_balancer.balance(backend)
|
|
||||||
if not endpoint then
|
if not endpoint then
|
||||||
return nil, nil
|
return nil, nil
|
||||||
end
|
end
|
||||||
|
@ -69,41 +68,14 @@ local function balance()
|
||||||
return endpoint.address, endpoint.port
|
return endpoint.address, endpoint.port
|
||||||
end
|
end
|
||||||
|
|
||||||
if lb_alg == "ewma" then
|
|
||||||
local endpoint = ewma.balance(backend.endpoints)
|
|
||||||
return endpoint.address, endpoint.port
|
|
||||||
end
|
|
||||||
|
|
||||||
if lb_alg ~= DEFAULT_LB_ALG then
|
|
||||||
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", tostring(lb_alg), DEFAULT_LB_ALG))
|
|
||||||
end
|
|
||||||
|
|
||||||
local endpoint = resty_balancer.balance(backend)
|
|
||||||
if not endpoint then
|
|
||||||
return nil, nil
|
|
||||||
end
|
|
||||||
|
|
||||||
if is_sticky then
|
|
||||||
sticky.set_endpoint(endpoint, backend)
|
|
||||||
end
|
|
||||||
|
|
||||||
return endpoint.address, endpoint.port
|
|
||||||
end
|
|
||||||
|
|
||||||
local function sync_backend(backend)
|
local function sync_backend(backend)
|
||||||
backends:set(backend.name, backend)
|
backends:set(backend.name, backend)
|
||||||
|
|
||||||
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
|
local balancer = get_balancer(backend)
|
||||||
|
if not balancer then
|
||||||
if backend["upstream-hash-by"] or lb_alg == "round_robin" then
|
return
|
||||||
resty_balancer.reinit(backend)
|
|
||||||
end
|
|
||||||
|
|
||||||
-- TODO: Reset state of EWMA per backend
|
|
||||||
if lb_alg == "ewma" then
|
|
||||||
ngx.shared.balancer_ewma:flush_all()
|
|
||||||
ngx.shared.balancer_ewma_last_touched_at:flush_all()
|
|
||||||
end
|
end
|
||||||
|
balancer.sync(backend)
|
||||||
end
|
end
|
||||||
|
|
||||||
local function sync_backends()
|
local function sync_backends()
|
||||||
|
@ -133,15 +105,18 @@ local function sync_backends()
|
||||||
end
|
end
|
||||||
|
|
||||||
local function after_balance()
|
local function after_balance()
|
||||||
local lb_alg = get_current_lb_alg()
|
local backend = get_current_backend()
|
||||||
if lb_alg == "ewma" then
|
local balancer = get_balancer(backend)
|
||||||
ewma.after_balance()
|
if not balancer then
|
||||||
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
|
balancer.after_balance()
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.init_worker()
|
function _M.init_worker()
|
||||||
sync_backends() -- when worker starts, sync backends without delay
|
sync_backends() -- when worker starts, sync backends without delay
|
||||||
_, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
|
local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
|
||||||
if err then
|
if err then
|
||||||
ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
|
ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
|
||||||
end
|
end
|
||||||
|
@ -165,12 +140,8 @@ function _M.call()
|
||||||
|
|
||||||
ngx_balancer.set_more_tries(1)
|
ngx_balancer.set_more_tries(1)
|
||||||
|
|
||||||
local ok
|
local ok, err = ngx_balancer.set_current_peer(host, port)
|
||||||
ok, err = ngx_balancer.set_current_peer(host, port)
|
if not ok then
|
||||||
if ok then
|
|
||||||
ngx.log(ngx.INFO,
|
|
||||||
string.format("current peer is set to %s:%s using lb_alg %s", host, port, tostring(get_current_lb_alg())))
|
|
||||||
else
|
|
||||||
ngx.log(ngx.ERR, string.format("error while setting current upstream peer to %s", tostring(err)))
|
ngx.log(ngx.ERR, string.format("error while setting current upstream peer to %s", tostring(err)))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -117,7 +117,8 @@ local function pick_and_score(peers, k)
|
||||||
return peers[lowest_score_index]
|
return peers[lowest_score_index]
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.balance(peers)
|
function _M.balance(backend)
|
||||||
|
local peers = backend.endpoints
|
||||||
if #peers == 1 then
|
if #peers == 1 then
|
||||||
return peers[1]
|
return peers[1]
|
||||||
end
|
end
|
||||||
|
@ -138,4 +139,10 @@ function _M.after_balance()
|
||||||
get_or_update_ewma(upstream, rtt, true)
|
get_or_update_ewma(upstream, rtt, true)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function _M.sync(_)
|
||||||
|
-- TODO: Reset state of EWMA per backend
|
||||||
|
ngx.shared.balancer_ewma:flush_all()
|
||||||
|
ngx.shared.balancer_ewma_last_touched_at:flush_all()
|
||||||
|
end
|
||||||
|
|
||||||
return _M
|
return _M
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
local resty_roundrobin = require("resty.roundrobin")
|
local resty_roundrobin = require("resty.roundrobin")
|
||||||
local resty_chash = require("resty.chash")
|
local resty_chash = require("resty.chash")
|
||||||
local util = require("util")
|
local util = require("util")
|
||||||
|
local ck = require("resty.cookie")
|
||||||
|
|
||||||
local _M = {}
|
local _M = {}
|
||||||
local instances = {}
|
local instances = {}
|
||||||
|
@ -29,6 +30,73 @@ local function init_resty_balancer(factory, instance, endpoints)
|
||||||
return instance
|
return instance
|
||||||
end
|
end
|
||||||
|
|
||||||
|
local function is_sticky(backend)
|
||||||
|
return backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie"
|
||||||
|
end
|
||||||
|
|
||||||
|
local function cookie_name(backend)
|
||||||
|
return backend["sessionAffinityConfig"]["cookieSessionAffinity"]["name"] or "route"
|
||||||
|
end
|
||||||
|
|
||||||
|
local function encrypted_endpoint_string(backend, endpoint_string)
|
||||||
|
local encrypted, err
|
||||||
|
if backend["sessionAffinityConfig"]["cookieSessionAffinity"]["hash"] == "sha1" then
|
||||||
|
encrypted, err = util.sha1_digest(endpoint_string)
|
||||||
|
else
|
||||||
|
encrypted, err = util.md5_digest(endpoint_string)
|
||||||
|
end
|
||||||
|
if err ~= nil then
|
||||||
|
ngx.log(ngx.ERR, err)
|
||||||
|
end
|
||||||
|
|
||||||
|
return encrypted
|
||||||
|
end
|
||||||
|
|
||||||
|
local function set_cookie(backend, value)
|
||||||
|
local cookie, err = ck:new()
|
||||||
|
if not cookie then
|
||||||
|
ngx.log(ngx.ERR, err)
|
||||||
|
end
|
||||||
|
|
||||||
|
local ok
|
||||||
|
ok, err = cookie:set({
|
||||||
|
key = cookie_name(backend),
|
||||||
|
value = value,
|
||||||
|
path = "/",
|
||||||
|
domain = ngx.var.host,
|
||||||
|
httponly = true,
|
||||||
|
})
|
||||||
|
if not ok then
|
||||||
|
ngx.log(ngx.ERR, err)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
local function pick_random(instance)
|
||||||
|
local index = math.random(instance.npoints)
|
||||||
|
return instance:next(index)
|
||||||
|
end
|
||||||
|
|
||||||
|
local function sticky_endpoint_string(instance, backend)
|
||||||
|
local cookie, err = ck:new()
|
||||||
|
if not cookie then
|
||||||
|
ngx.log(ngx.ERR, err)
|
||||||
|
return pick_random(instance)
|
||||||
|
end
|
||||||
|
|
||||||
|
local key = cookie:get(cookie_name(backend))
|
||||||
|
if not key then
|
||||||
|
local tmp_endpoint_string = pick_random(instance)
|
||||||
|
key = encrypted_endpoint_string(backend, tmp_endpoint_string)
|
||||||
|
set_cookie(backend, key)
|
||||||
|
end
|
||||||
|
|
||||||
|
return instance:find(key)
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.is_applicable(backend)
|
||||||
|
return is_sticky(backend) or backend["upstream-hash-by"] or backend["load-balance"] == "round_robin"
|
||||||
|
end
|
||||||
|
|
||||||
function _M.balance(backend)
|
function _M.balance(backend)
|
||||||
local instance = instances[backend.name]
|
local instance = instances[backend.name]
|
||||||
if not instance then
|
if not instance then
|
||||||
|
@ -37,7 +105,9 @@ function _M.balance(backend)
|
||||||
end
|
end
|
||||||
|
|
||||||
local endpoint_string
|
local endpoint_string
|
||||||
if backend["upstream-hash-by"] then
|
if is_sticky(backend) then
|
||||||
|
endpoint_string = sticky_endpoint_string(instance, backend)
|
||||||
|
elseif backend["upstream-hash-by"] then
|
||||||
local key = util.lua_ngx_var(backend["upstream-hash-by"])
|
local key = util.lua_ngx_var(backend["upstream-hash-by"])
|
||||||
endpoint_string = instance:find(key)
|
endpoint_string = instance:find(key)
|
||||||
else
|
else
|
||||||
|
@ -48,10 +118,10 @@ function _M.balance(backend)
|
||||||
return { address = address, port = port }
|
return { address = address, port = port }
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.reinit(backend)
|
function _M.sync(backend)
|
||||||
local instance = instances[backend.name]
|
local instance = instances[backend.name]
|
||||||
local factory = resty_roundrobin
|
local factory = resty_roundrobin
|
||||||
if backend["upstream-hash-by"] then
|
if is_sticky(backend) or backend["upstream-hash-by"] then
|
||||||
factory = resty_chash
|
factory = resty_chash
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -66,4 +136,7 @@ function _M.reinit(backend)
|
||||||
instances[backend.name] = init_resty_balancer(factory, instance, backend.endpoints)
|
instances[backend.name] = init_resty_balancer(factory, instance, backend.endpoints)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function _M.after_balance()
|
||||||
|
end
|
||||||
|
|
||||||
return _M
|
return _M
|
||||||
|
|
|
@ -1,133 +0,0 @@
|
||||||
local json = require('cjson')
|
|
||||||
local str = require("resty.string")
|
|
||||||
local sha1_crypto = require("resty.sha1")
|
|
||||||
local md5_crypto = require("resty.md5")
|
|
||||||
|
|
||||||
local sticky_sessions = ngx.shared.sticky_sessions
|
|
||||||
|
|
||||||
local DEFAULT_SESSION_COOKIE_NAME = "route"
|
|
||||||
local DEFAULT_SESSION_COOKIE_HASH = "md5"
|
|
||||||
-- Currently STICKY_TIMEOUT never expires
|
|
||||||
local STICKY_TIMEOUT = 0
|
|
||||||
|
|
||||||
local _M = {}
|
|
||||||
|
|
||||||
local function md5_digest(raw)
|
|
||||||
local md5 = md5_crypto:new()
|
|
||||||
if not md5 then
|
|
||||||
return nil, "md5: failed to create object"
|
|
||||||
end
|
|
||||||
local ok = md5:update(raw)
|
|
||||||
if not ok then
|
|
||||||
return nil, "md5: failed to add data"
|
|
||||||
end
|
|
||||||
local digest = md5:final()
|
|
||||||
if digest == nil then
|
|
||||||
return nil, "md5: failed to create digest"
|
|
||||||
end
|
|
||||||
return str.to_hex(digest), nil
|
|
||||||
end
|
|
||||||
|
|
||||||
local function sha1_digest(raw)
|
|
||||||
local sha1 = sha1_crypto:new()
|
|
||||||
if not sha1 then
|
|
||||||
return nil, "sha1: failed to create object"
|
|
||||||
end
|
|
||||||
local ok = sha1:update(raw)
|
|
||||||
if not ok then
|
|
||||||
return nil, "sha1: failed to add data"
|
|
||||||
end
|
|
||||||
local digest = sha1:final()
|
|
||||||
if digest == nil then
|
|
||||||
return nil, "sha1: failed to create digest"
|
|
||||||
end
|
|
||||||
return str.to_hex(digest), nil
|
|
||||||
end
|
|
||||||
|
|
||||||
local function get_cookie_name(backend)
|
|
||||||
local name = backend["sessionAffinityConfig"]["cookieSessionAffinity"]["name"]
|
|
||||||
return name or DEFAULT_SESSION_COOKIE_NAME
|
|
||||||
end
|
|
||||||
|
|
||||||
local function is_valid_endpoint(backend, address, port)
|
|
||||||
for _, ep in ipairs(backend.endpoints) do
|
|
||||||
if ep.address == address and ep.port == port then
|
|
||||||
return true
|
|
||||||
end
|
|
||||||
end
|
|
||||||
return false
|
|
||||||
end
|
|
||||||
|
|
||||||
function _M.is_sticky(backend)
|
|
||||||
return backend["sessionAffinityConfig"]["name"] == "cookie"
|
|
||||||
end
|
|
||||||
|
|
||||||
function _M.get_endpoint(backend)
|
|
||||||
local cookie_name = get_cookie_name(backend)
|
|
||||||
local cookie_key = "cookie_" .. cookie_name
|
|
||||||
local endpoint_key = ngx.var[cookie_key]
|
|
||||||
if endpoint_key == nil then
|
|
||||||
ngx.log(ngx.INFO, string.format(
|
|
||||||
"[backend=%s, affinity=cookie] cookie \"%s\" is not set for this request",
|
|
||||||
backend.name,
|
|
||||||
cookie_name
|
|
||||||
))
|
|
||||||
return nil
|
|
||||||
end
|
|
||||||
|
|
||||||
local endpoint_string = sticky_sessions:get(endpoint_key)
|
|
||||||
if endpoint_string == nil then
|
|
||||||
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] no endpoint assigned", backend.name))
|
|
||||||
return nil
|
|
||||||
end
|
|
||||||
|
|
||||||
local endpoint = json.decode(endpoint_string)
|
|
||||||
local valid = is_valid_endpoint(backend, endpoint.address, endpoint.port)
|
|
||||||
if not valid then
|
|
||||||
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] assigned endpoint is no longer valid", backend.name))
|
|
||||||
sticky_sessions:delete(endpoint_key)
|
|
||||||
return nil
|
|
||||||
end
|
|
||||||
return endpoint
|
|
||||||
end
|
|
||||||
|
|
||||||
function _M.set_endpoint(endpoint, backend)
|
|
||||||
local cookie_name = get_cookie_name(backend)
|
|
||||||
local encrypted, err
|
|
||||||
local endpoint_string = json.encode(endpoint)
|
|
||||||
local hash = backend["sessionAffinityConfig"]["cookieSessionAffinity"]["hash"]
|
|
||||||
|
|
||||||
if hash == "sha1" then
|
|
||||||
encrypted, err = sha1_digest(endpoint_string)
|
|
||||||
else
|
|
||||||
if hash ~= DEFAULT_SESSION_COOKIE_HASH then
|
|
||||||
ngx.log(ngx.WARN, string.format(
|
|
||||||
"[backend=%s, affinity=cookie] session-cookie-hash \"%s\" is not valid, defaulting to %s",
|
|
||||||
backend.name,
|
|
||||||
hash,
|
|
||||||
DEFAULT_SESSION_COOKIE_HASH
|
|
||||||
))
|
|
||||||
end
|
|
||||||
encrypted, err = md5_digest(endpoint_string)
|
|
||||||
end
|
|
||||||
if err ~= nil then
|
|
||||||
ngx.log(ngx.WARN, string.format("[backend=%s, affinity=cookie] failed to assign endpoint: %s", backend.name, err))
|
|
||||||
return
|
|
||||||
end
|
|
||||||
|
|
||||||
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] assigning a new endpoint", backend.name))
|
|
||||||
ngx.header["Set-Cookie"] = cookie_name .. "=" .. encrypted .. ";"
|
|
||||||
local success, forcible
|
|
||||||
success, err, forcible = sticky_sessions:set(encrypted, endpoint_string, STICKY_TIMEOUT)
|
|
||||||
if not success then
|
|
||||||
ngx.log(ngx.WARN, string.format("[backend=%s, affinity=cookie] failed to assign endpoint: %s", backend.name, err))
|
|
||||||
end
|
|
||||||
if forcible then
|
|
||||||
ngx.log(ngx.WARN, string.format(
|
|
||||||
"[backend=%s, affinity=cookie] sticky_sessions shared dict is full; endpoint forcibly overwritten",
|
|
||||||
backend.name
|
|
||||||
))
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
return _M
|
|
|
@ -1,7 +1,7 @@
|
||||||
local cwd = io.popen("pwd"):read('*l')
|
local cwd = io.popen("pwd"):read('*l')
|
||||||
package.path = cwd .. "/rootfs/etc/nginx/lua/?.lua;" .. package.path
|
package.path = cwd .. "/rootfs/etc/nginx/lua/?.lua;" .. package.path
|
||||||
|
|
||||||
local balancer, mock_cjson, mock_config, mock_sticky, mock_backends, mock_lrucache, lock, mock_lock,
|
local balancer, mock_cjson, mock_config, mock_backends, mock_lrucache, lock, mock_lock,
|
||||||
mock_ngx_balancer, mock_ewma
|
mock_ngx_balancer, mock_ewma
|
||||||
|
|
||||||
local function dict_generator(vals)
|
local function dict_generator(vals)
|
||||||
|
@ -46,10 +46,14 @@ local default_backends = {
|
||||||
local function init()
|
local function init()
|
||||||
mock_cjson = {}
|
mock_cjson = {}
|
||||||
mock_config = {}
|
mock_config = {}
|
||||||
mock_sticky = {}
|
|
||||||
mock_ngx_balancer = {}
|
mock_ngx_balancer = {}
|
||||||
mock_ewma = {}
|
mock_ewma = {
|
||||||
mock_resty_balancer = {}
|
sync = function(b) return end
|
||||||
|
}
|
||||||
|
mock_resty_balancer = {
|
||||||
|
sync = function(b) return end,
|
||||||
|
after_balance = function () return end
|
||||||
|
}
|
||||||
mock_backends = dict_generator(default_backends)
|
mock_backends = dict_generator(default_backends)
|
||||||
mock_lrucache = {
|
mock_lrucache = {
|
||||||
new = function () return mock_backends end
|
new = function () return mock_backends end
|
||||||
|
@ -76,12 +80,15 @@ local function init()
|
||||||
}
|
}
|
||||||
package.loaded["ngx.balancer"] = mock_ngx_balancer
|
package.loaded["ngx.balancer"] = mock_ngx_balancer
|
||||||
package.loaded["resty.lrucache"] = mock_lrucache
|
package.loaded["resty.lrucache"] = mock_lrucache
|
||||||
|
package.loaded["resty.string"] = {}
|
||||||
|
package.loaded["resty.sha1"] = {}
|
||||||
|
package.loaded["resty.md5"] = {}
|
||||||
|
package.loaded["resty.cookie"] = {}
|
||||||
package.loaded["cjson"] = mock_cjson
|
package.loaded["cjson"] = mock_cjson
|
||||||
package.loaded["resty.lock"] = mock_lock
|
package.loaded["resty.lock"] = mock_lock
|
||||||
package.loaded["balancer.ewma"] = mock_ewma
|
package.loaded["balancer.ewma"] = mock_ewma
|
||||||
package.loaded["balancer.resty"] = mock_resty_balancer
|
package.loaded["balancer.resty"] = mock_resty_balancer
|
||||||
package.loaded["configuration"] = mock_config
|
package.loaded["configuration"] = mock_config
|
||||||
package.loaded["sticky"] = mock_sticky
|
|
||||||
balancer = require("balancer")
|
balancer = require("balancer")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -91,7 +98,7 @@ describe("[balancer_test]", function()
|
||||||
end)
|
end)
|
||||||
|
|
||||||
teardown(function()
|
teardown(function()
|
||||||
local packages = {"ngx.balancer", "resty.lrucache","cjson", "resty.lock", "balancer.ewma","configuration", "sticky"}
|
local packages = {"ngx.balancer", "resty.lrucache","cjson", "resty.lock", "balancer.ewma","configuration"}
|
||||||
for i, package_name in ipairs(packages) do
|
for i, package_name in ipairs(packages) do
|
||||||
package.loaded[package_name] = nil
|
package.loaded[package_name] = nil
|
||||||
end
|
end
|
||||||
|
@ -108,7 +115,6 @@ describe("[balancer_test]", function()
|
||||||
_G.ngx.get_phase = nil
|
_G.ngx.get_phase = nil
|
||||||
_G.ngx.var = {}
|
_G.ngx.var = {}
|
||||||
mock_backends._vals = default_backends
|
mock_backends._vals = default_backends
|
||||||
mock_sticky.is_sticky = function(b) return false end
|
|
||||||
end)
|
end)
|
||||||
|
|
||||||
describe("phase=log", function()
|
describe("phase=log", function()
|
||||||
|
@ -121,6 +127,7 @@ describe("[balancer_test]", function()
|
||||||
|
|
||||||
local ewma_after_balance_spy = spy.on(mock_ewma, "after_balance")
|
local ewma_after_balance_spy = spy.on(mock_ewma, "after_balance")
|
||||||
|
|
||||||
|
mock_resty_balancer.is_applicable = function(b) return false end
|
||||||
assert.has_no_errors(balancer.call)
|
assert.has_no_errors(balancer.call)
|
||||||
assert.spy(ewma_after_balance_spy).was_called()
|
assert.spy(ewma_after_balance_spy).was_called()
|
||||||
end)
|
end)
|
||||||
|
@ -130,6 +137,7 @@ describe("[balancer_test]", function()
|
||||||
|
|
||||||
local ewma_after_balance_spy = spy.on(mock_ewma, "after_balance")
|
local ewma_after_balance_spy = spy.on(mock_ewma, "after_balance")
|
||||||
|
|
||||||
|
mock_resty_balancer.is_applicable = function(b) return true end
|
||||||
assert.has_no_errors(balancer.call)
|
assert.has_no_errors(balancer.call)
|
||||||
assert.spy(ewma_after_balance_spy).was_not_called()
|
assert.spy(ewma_after_balance_spy).was_not_called()
|
||||||
end)
|
end)
|
||||||
|
@ -173,46 +181,13 @@ describe("[balancer_test]", function()
|
||||||
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
|
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
|
||||||
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")
|
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")
|
||||||
|
|
||||||
|
mock_resty_balancer.is_applicable = function(b) return false end
|
||||||
assert.has_no_errors(balancer.call)
|
assert.has_no_errors(balancer.call)
|
||||||
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_ewma_backend")
|
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_ewma_backend")
|
||||||
assert.spy(set_more_tries_spy).was_called_with(1)
|
assert.spy(set_more_tries_spy).was_called_with(1)
|
||||||
assert.spy(set_current_peer_spy).was_called_with("000.000.111", "8083")
|
assert.spy(set_current_peer_spy).was_called_with("000.000.111", "8083")
|
||||||
end)
|
end)
|
||||||
|
|
||||||
it("sticky=true, returns stored endpoints and peer was successfully set", function()
|
|
||||||
_G.ngx.var.proxy_upstream_name = "mock_rr_backend"
|
|
||||||
|
|
||||||
mock_sticky.is_sticky = function(b) return true end
|
|
||||||
mock_sticky.get_endpoint = function() return {address = "000.000.011", port = "8082"} end
|
|
||||||
|
|
||||||
local backend_get_spy = spy.on(mock_backends, "get")
|
|
||||||
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
|
|
||||||
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")
|
|
||||||
|
|
||||||
assert.has_no_errors(balancer.call)
|
|
||||||
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
|
|
||||||
assert.spy(set_more_tries_spy).was_called_with(1)
|
|
||||||
assert.spy(set_current_peer_spy).was_called_with("000.000.011", "8082")
|
|
||||||
end)
|
|
||||||
|
|
||||||
it("sticky=true, does not return stored endpoints, defaults to round robin", function()
|
|
||||||
_G.ngx.var.proxy_upstream_name = "mock_rr_backend"
|
|
||||||
|
|
||||||
mock_sticky.is_sticky = function(b) return true end
|
|
||||||
mock_sticky.get_endpoint = function() return nil end
|
|
||||||
mock_sticky.set_endpoint = function(b) return end
|
|
||||||
|
|
||||||
local backend_get_spy = spy.on(mock_backends, "get")
|
|
||||||
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
|
|
||||||
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")
|
|
||||||
|
|
||||||
mock_resty_balancer.balance = function(b) return {address = "000.000.000", port = "8080"} end
|
|
||||||
assert.has_no_errors(balancer.call)
|
|
||||||
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
|
|
||||||
assert.spy(set_more_tries_spy).was_called_with(1)
|
|
||||||
assert.spy(set_current_peer_spy).was_called_with("000.000.000", "8080")
|
|
||||||
end)
|
|
||||||
|
|
||||||
it("fails when no backend exists", function()
|
it("fails when no backend exists", function()
|
||||||
_G.ngx.var.proxy_upstream_name = "mock_rr_backend"
|
_G.ngx.var.proxy_upstream_name = "mock_rr_backend"
|
||||||
|
|
||||||
|
@ -276,18 +251,18 @@ describe("[balancer_test]", function()
|
||||||
end)
|
end)
|
||||||
|
|
||||||
it("lb_alg=ewma, updates backend when sync is required", function()
|
it("lb_alg=ewma, updates backend when sync is required", function()
|
||||||
|
_G.ngx.var.proxy_upstream_name = "mock_ewma_backend"
|
||||||
mock_config.get_backends_data = function() return { default_backends.mock_ewma_backend } end
|
mock_config.get_backends_data = function() return { default_backends.mock_ewma_backend } end
|
||||||
mock_backends._vals = {}
|
mock_backends._vals = {}
|
||||||
|
|
||||||
local backend_set_spy = spy.on(mock_backends, "set")
|
local backend_set_spy = spy.on(mock_backends, "set")
|
||||||
local ewma_flush_spy = spy.on(_G.ngx.shared.balancer_ewma, "flush_all")
|
local ewma_sync_spy = spy.on(mock_ewma, "sync")
|
||||||
local ewma_lta_flush_spy = spy.on(_G.ngx.shared.balancer_ewma_last_touched_at, "flush_all")
|
|
||||||
|
|
||||||
|
mock_resty_balancer.is_applicable = function(b) return false end
|
||||||
assert.has_no_errors(balancer.init_worker)
|
assert.has_no_errors(balancer.init_worker)
|
||||||
assert.spy(backend_set_spy)
|
assert.spy(backend_set_spy)
|
||||||
.was_called_with(match.is_table(), default_backends.mock_ewma_backend.name, match.is_table())
|
.was_called_with(match.is_table(), default_backends.mock_ewma_backend.name, match.is_table())
|
||||||
assert.spy(ewma_flush_spy).was_called_with(match.is_table())
|
assert.spy(ewma_sync_spy).was_called()
|
||||||
assert.spy(ewma_lta_flush_spy).was_called_with(match.is_table())
|
|
||||||
end)
|
end)
|
||||||
end)
|
end)
|
||||||
end)
|
end)
|
||||||
|
|
|
@ -1,6 +1,34 @@
|
||||||
local _M = {}
|
|
||||||
local string_len = string.len
|
local string_len = string.len
|
||||||
local string_sub = string.sub
|
local string_sub = string.sub
|
||||||
|
local resty_str = require("resty.string")
|
||||||
|
local resty_sha1 = require("resty.sha1")
|
||||||
|
local resty_md5 = require("resty.md5")
|
||||||
|
|
||||||
|
local _M = {}
|
||||||
|
|
||||||
|
local function hash_digest(hash_factory, message)
|
||||||
|
local hash = hash_factory:new()
|
||||||
|
if not hash then
|
||||||
|
return nil, "failed to create object"
|
||||||
|
end
|
||||||
|
local ok = hash:update(message)
|
||||||
|
if not ok then
|
||||||
|
return nil, "failed to add data"
|
||||||
|
end
|
||||||
|
local binary_digest = hash:final()
|
||||||
|
if binary_digest == nil then
|
||||||
|
return nil, "failed to create digest"
|
||||||
|
end
|
||||||
|
return resty_str.to_hex(binary_digest), nil
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.sha1_digest(message)
|
||||||
|
return hash_digest(resty_sha1, message)
|
||||||
|
end
|
||||||
|
|
||||||
|
function _M.md5_digest(message)
|
||||||
|
return hash_digest(resty_md5, message)
|
||||||
|
end
|
||||||
|
|
||||||
-- given an Nginx variable i.e $request_uri
|
-- given an Nginx variable i.e $request_uri
|
||||||
-- it returns value of ngx.var[request_uri]
|
-- it returns value of ngx.var[request_uri]
|
||||||
|
|
|
@ -256,6 +256,9 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
|
||||||
|
|
||||||
Context("when session affinity annotation is present", func() {
|
Context("when session affinity annotation is present", func() {
|
||||||
It("should use sticky sessions when ingress rules are configured", func() {
|
It("should use sticky sessions when ingress rules are configured", func() {
|
||||||
|
err := framework.UpdateDeployment(f.KubeClientSet, f.IngressController.Namespace, "nginx-ingress-controller", 2, nil)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
cookieName := "STICKYSESSION"
|
cookieName := "STICKYSESSION"
|
||||||
|
|
||||||
By("Updating affinity annotation on ingress")
|
By("Updating affinity annotation on ingress")
|
||||||
|
@ -275,18 +278,24 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
|
||||||
|
|
||||||
By("Making a first request")
|
By("Making a first request")
|
||||||
host := "foo.com"
|
host := "foo.com"
|
||||||
resp, _, errs := gorequest.New().
|
resp, body, errs := gorequest.New().
|
||||||
Get(f.IngressController.HTTPURL).
|
Get(f.IngressController.HTTPURL).
|
||||||
Set("Host", host).
|
Set("Host", host).
|
||||||
End()
|
End()
|
||||||
Expect(len(errs)).Should(BeNumerically("==", 0))
|
Expect(len(errs)).Should(BeNumerically("==", 0))
|
||||||
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
|
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
|
||||||
|
|
||||||
|
hostnamePattern := regexp.MustCompile(`Hostname: ([a-zA-Z0-9\-]+)`)
|
||||||
|
upstreamName := hostnamePattern.FindAllStringSubmatch(body, -1)[0][1]
|
||||||
|
|
||||||
cookies := (*http.Response)(resp).Cookies()
|
cookies := (*http.Response)(resp).Cookies()
|
||||||
sessionCookie, err := getCookie(cookieName, cookies)
|
sessionCookie, err := getCookie(cookieName, cookies)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
By("Making a second request with the previous session cookie")
|
Expect(sessionCookie.Domain).Should(Equal(host))
|
||||||
|
|
||||||
|
By("Making many requests with the previous session cookie")
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
resp, _, errs = gorequest.New().
|
resp, _, errs = gorequest.New().
|
||||||
Get(f.IngressController.HTTPURL).
|
Get(f.IngressController.HTTPURL).
|
||||||
AddCookie(sessionCookie).
|
AddCookie(sessionCookie).
|
||||||
|
@ -295,27 +304,15 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
|
||||||
Expect(len(errs)).Should(BeNumerically("==", 0))
|
Expect(len(errs)).Should(BeNumerically("==", 0))
|
||||||
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
|
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
|
||||||
|
|
||||||
By("Making a third request with no cookie")
|
newCookies := (*http.Response)(resp).Cookies()
|
||||||
resp, _, errs = gorequest.New().
|
_, err := getCookie(cookieName, newCookies)
|
||||||
Get(f.IngressController.HTTPURL).
|
By("Omitting cookie in all subsequent requests")
|
||||||
Set("Host", host).
|
Expect(err).To(HaveOccurred())
|
||||||
End()
|
|
||||||
|
|
||||||
Expect(len(errs)).Should(BeNumerically("==", 0))
|
By("By proxying to the same upstream")
|
||||||
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
|
newUpstreamName := hostnamePattern.FindAllStringSubmatch(body, -1)[0][1]
|
||||||
|
Expect(newUpstreamName).Should(Equal(upstreamName))
|
||||||
log, err := f.NginxLogs()
|
}
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(log).ToNot(BeEmpty())
|
|
||||||
|
|
||||||
By("Checking that upstreams are sticky when session cookie is used")
|
|
||||||
index := strings.Index(log, fmt.Sprintf("reason: 'UPDATE' Ingress %s/foo.com", f.IngressController.Namespace))
|
|
||||||
reqLogs := log[index:]
|
|
||||||
re := regexp.MustCompile(`\d{1,3}(?:\.\d{1,3}){3}(?::\d{1,5})`)
|
|
||||||
upstreams := re.FindAllString(reqLogs, -1)
|
|
||||||
Expect(len(upstreams)).Should(BeNumerically("==", 3))
|
|
||||||
Expect(upstreams[0]).To(Equal(upstreams[1]))
|
|
||||||
Expect(upstreams[1]).ToNot(Equal(upstreams[2]))
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should NOT use sticky sessions when a default backend and no ingress rules configured", func() {
|
It("should NOT use sticky sessions when a default backend and no ingress rules configured", func() {
|
||||||
|
|
Loading…
Reference in a new issue