Merge pull request #2484 from Shopify/lua-sticky-session-bug

Fix bugs in Lua implementation of sticky sessions
This commit is contained in:
k8s-ci-robot 2018-05-16 19:14:50 -07:00 committed by GitHub
commit ee7a63d050
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 194 additions and 299 deletions

File diff suppressed because one or more lines are too long

View file

@ -4,7 +4,6 @@ local configuration = require("configuration")
local util = require("util")
local lrucache = require("resty.lrucache")
local ewma = require("balancer.ewma")
local sticky = require("sticky")
local resty_balancer = require("balancer.resty")
-- measured in seconds
@ -18,9 +17,9 @@ local _M = {}
-- TODO(elvinefendi) we can probably avoid storing all backends here. We already store them in their respective
-- load balancer implementations
local backends, err = lrucache.new(1024)
local backends, backends_err = lrucache.new(1024)
if not backends then
return error("failed to create the cache for backends: " .. (err or "unknown"))
return error("failed to create the cache for backends: " .. (backends_err or "unknown"))
end
local function get_current_backend()
@ -35,75 +34,48 @@ local function get_current_backend()
return backend
end
local function get_current_lb_alg()
local backend = get_current_backend()
local function get_balancer(backend)
if not backend then
return nil
end
return backend["load-balance"] or DEFAULT_LB_ALG
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
if resty_balancer.is_applicable(backend) then
return resty_balancer
elseif lb_alg ~= "ewma" then
if lb_alg ~= DEFAULT_LB_ALG then
ngx.log(ngx.WARN,
string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
end
return resty_balancer
end
return ewma
end
local function balance()
local backend = get_current_backend()
if not backend then
local balancer = get_balancer(backend)
if not balancer then
return nil, nil
end
local lb_alg = get_current_lb_alg()
local is_sticky = sticky.is_sticky(backend)
if is_sticky then
local endpoint = sticky.get_endpoint(backend)
if endpoint ~= nil then
return endpoint.address, endpoint.port
end
lb_alg = "round_robin"
end
if backend["upstream-hash-by"] then
local endpoint = resty_balancer.balance(backend)
if not endpoint then
return nil, nil
end
return endpoint.address, endpoint.port
end
if lb_alg == "ewma" then
local endpoint = ewma.balance(backend.endpoints)
return endpoint.address, endpoint.port
end
if lb_alg ~= DEFAULT_LB_ALG then
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", tostring(lb_alg), DEFAULT_LB_ALG))
end
local endpoint = resty_balancer.balance(backend)
local endpoint = balancer.balance(backend)
if not endpoint then
return nil, nil
end
if is_sticky then
sticky.set_endpoint(endpoint, backend)
end
return endpoint.address, endpoint.port
end
local function sync_backend(backend)
backends:set(backend.name, backend)
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
if backend["upstream-hash-by"] or lb_alg == "round_robin" then
resty_balancer.reinit(backend)
end
-- TODO: Reset state of EWMA per backend
if lb_alg == "ewma" then
ngx.shared.balancer_ewma:flush_all()
ngx.shared.balancer_ewma_last_touched_at:flush_all()
local balancer = get_balancer(backend)
if not balancer then
return
end
balancer.sync(backend)
end
local function sync_backends()
@ -133,15 +105,18 @@ local function sync_backends()
end
local function after_balance()
local lb_alg = get_current_lb_alg()
if lb_alg == "ewma" then
ewma.after_balance()
local backend = get_current_backend()
local balancer = get_balancer(backend)
if not balancer then
return
end
balancer.after_balance()
end
function _M.init_worker()
sync_backends() -- when worker starts, sync backends without delay
_, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if err then
ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
end
@ -165,12 +140,8 @@ function _M.call()
ngx_balancer.set_more_tries(1)
local ok
ok, err = ngx_balancer.set_current_peer(host, port)
if ok then
ngx.log(ngx.INFO,
string.format("current peer is set to %s:%s using lb_alg %s", host, port, tostring(get_current_lb_alg())))
else
local ok, err = ngx_balancer.set_current_peer(host, port)
if not ok then
ngx.log(ngx.ERR, string.format("error while setting current upstream peer to %s", tostring(err)))
end
end

View file

@ -117,7 +117,8 @@ local function pick_and_score(peers, k)
return peers[lowest_score_index]
end
function _M.balance(peers)
function _M.balance(backend)
local peers = backend.endpoints
if #peers == 1 then
return peers[1]
end
@ -138,4 +139,10 @@ function _M.after_balance()
get_or_update_ewma(upstream, rtt, true)
end
function _M.sync(_)
-- TODO: Reset state of EWMA per backend
ngx.shared.balancer_ewma:flush_all()
ngx.shared.balancer_ewma_last_touched_at:flush_all()
end
return _M

View file

@ -1,6 +1,7 @@
local resty_roundrobin = require("resty.roundrobin")
local resty_chash = require("resty.chash")
local util = require("util")
local ck = require("resty.cookie")
local _M = {}
local instances = {}
@ -29,6 +30,73 @@ local function init_resty_balancer(factory, instance, endpoints)
return instance
end
local function is_sticky(backend)
return backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie"
end
local function cookie_name(backend)
return backend["sessionAffinityConfig"]["cookieSessionAffinity"]["name"] or "route"
end
local function encrypted_endpoint_string(backend, endpoint_string)
local encrypted, err
if backend["sessionAffinityConfig"]["cookieSessionAffinity"]["hash"] == "sha1" then
encrypted, err = util.sha1_digest(endpoint_string)
else
encrypted, err = util.md5_digest(endpoint_string)
end
if err ~= nil then
ngx.log(ngx.ERR, err)
end
return encrypted
end
local function set_cookie(backend, value)
local cookie, err = ck:new()
if not cookie then
ngx.log(ngx.ERR, err)
end
local ok
ok, err = cookie:set({
key = cookie_name(backend),
value = value,
path = "/",
domain = ngx.var.host,
httponly = true,
})
if not ok then
ngx.log(ngx.ERR, err)
end
end
local function pick_random(instance)
local index = math.random(instance.npoints)
return instance:next(index)
end
local function sticky_endpoint_string(instance, backend)
local cookie, err = ck:new()
if not cookie then
ngx.log(ngx.ERR, err)
return pick_random(instance)
end
local key = cookie:get(cookie_name(backend))
if not key then
local tmp_endpoint_string = pick_random(instance)
key = encrypted_endpoint_string(backend, tmp_endpoint_string)
set_cookie(backend, key)
end
return instance:find(key)
end
function _M.is_applicable(backend)
return is_sticky(backend) or backend["upstream-hash-by"] or backend["load-balance"] == "round_robin"
end
function _M.balance(backend)
local instance = instances[backend.name]
if not instance then
@ -37,7 +105,9 @@ function _M.balance(backend)
end
local endpoint_string
if backend["upstream-hash-by"] then
if is_sticky(backend) then
endpoint_string = sticky_endpoint_string(instance, backend)
elseif backend["upstream-hash-by"] then
local key = util.lua_ngx_var(backend["upstream-hash-by"])
endpoint_string = instance:find(key)
else
@ -48,10 +118,10 @@ function _M.balance(backend)
return { address = address, port = port }
end
function _M.reinit(backend)
function _M.sync(backend)
local instance = instances[backend.name]
local factory = resty_roundrobin
if backend["upstream-hash-by"] then
if is_sticky(backend) or backend["upstream-hash-by"] then
factory = resty_chash
end
@ -66,4 +136,7 @@ function _M.reinit(backend)
instances[backend.name] = init_resty_balancer(factory, instance, backend.endpoints)
end
function _M.after_balance()
end
return _M

View file

@ -1,133 +0,0 @@
local json = require('cjson')
local str = require("resty.string")
local sha1_crypto = require("resty.sha1")
local md5_crypto = require("resty.md5")
local sticky_sessions = ngx.shared.sticky_sessions
local DEFAULT_SESSION_COOKIE_NAME = "route"
local DEFAULT_SESSION_COOKIE_HASH = "md5"
-- Currently STICKY_TIMEOUT never expires
local STICKY_TIMEOUT = 0
local _M = {}
local function md5_digest(raw)
local md5 = md5_crypto:new()
if not md5 then
return nil, "md5: failed to create object"
end
local ok = md5:update(raw)
if not ok then
return nil, "md5: failed to add data"
end
local digest = md5:final()
if digest == nil then
return nil, "md5: failed to create digest"
end
return str.to_hex(digest), nil
end
local function sha1_digest(raw)
local sha1 = sha1_crypto:new()
if not sha1 then
return nil, "sha1: failed to create object"
end
local ok = sha1:update(raw)
if not ok then
return nil, "sha1: failed to add data"
end
local digest = sha1:final()
if digest == nil then
return nil, "sha1: failed to create digest"
end
return str.to_hex(digest), nil
end
local function get_cookie_name(backend)
local name = backend["sessionAffinityConfig"]["cookieSessionAffinity"]["name"]
return name or DEFAULT_SESSION_COOKIE_NAME
end
local function is_valid_endpoint(backend, address, port)
for _, ep in ipairs(backend.endpoints) do
if ep.address == address and ep.port == port then
return true
end
end
return false
end
function _M.is_sticky(backend)
return backend["sessionAffinityConfig"]["name"] == "cookie"
end
function _M.get_endpoint(backend)
local cookie_name = get_cookie_name(backend)
local cookie_key = "cookie_" .. cookie_name
local endpoint_key = ngx.var[cookie_key]
if endpoint_key == nil then
ngx.log(ngx.INFO, string.format(
"[backend=%s, affinity=cookie] cookie \"%s\" is not set for this request",
backend.name,
cookie_name
))
return nil
end
local endpoint_string = sticky_sessions:get(endpoint_key)
if endpoint_string == nil then
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] no endpoint assigned", backend.name))
return nil
end
local endpoint = json.decode(endpoint_string)
local valid = is_valid_endpoint(backend, endpoint.address, endpoint.port)
if not valid then
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] assigned endpoint is no longer valid", backend.name))
sticky_sessions:delete(endpoint_key)
return nil
end
return endpoint
end
function _M.set_endpoint(endpoint, backend)
local cookie_name = get_cookie_name(backend)
local encrypted, err
local endpoint_string = json.encode(endpoint)
local hash = backend["sessionAffinityConfig"]["cookieSessionAffinity"]["hash"]
if hash == "sha1" then
encrypted, err = sha1_digest(endpoint_string)
else
if hash ~= DEFAULT_SESSION_COOKIE_HASH then
ngx.log(ngx.WARN, string.format(
"[backend=%s, affinity=cookie] session-cookie-hash \"%s\" is not valid, defaulting to %s",
backend.name,
hash,
DEFAULT_SESSION_COOKIE_HASH
))
end
encrypted, err = md5_digest(endpoint_string)
end
if err ~= nil then
ngx.log(ngx.WARN, string.format("[backend=%s, affinity=cookie] failed to assign endpoint: %s", backend.name, err))
return
end
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] assigning a new endpoint", backend.name))
ngx.header["Set-Cookie"] = cookie_name .. "=" .. encrypted .. ";"
local success, forcible
success, err, forcible = sticky_sessions:set(encrypted, endpoint_string, STICKY_TIMEOUT)
if not success then
ngx.log(ngx.WARN, string.format("[backend=%s, affinity=cookie] failed to assign endpoint: %s", backend.name, err))
end
if forcible then
ngx.log(ngx.WARN, string.format(
"[backend=%s, affinity=cookie] sticky_sessions shared dict is full; endpoint forcibly overwritten",
backend.name
))
end
end
return _M

View file

@ -1,7 +1,7 @@
local cwd = io.popen("pwd"):read('*l')
package.path = cwd .. "/rootfs/etc/nginx/lua/?.lua;" .. package.path
local balancer, mock_cjson, mock_config, mock_sticky, mock_backends, mock_lrucache, lock, mock_lock,
local balancer, mock_cjson, mock_config, mock_backends, mock_lrucache, lock, mock_lock,
mock_ngx_balancer, mock_ewma
local function dict_generator(vals)
@ -46,10 +46,14 @@ local default_backends = {
local function init()
mock_cjson = {}
mock_config = {}
mock_sticky = {}
mock_ngx_balancer = {}
mock_ewma = {}
mock_resty_balancer = {}
mock_ewma = {
sync = function(b) return end
}
mock_resty_balancer = {
sync = function(b) return end,
after_balance = function () return end
}
mock_backends = dict_generator(default_backends)
mock_lrucache = {
new = function () return mock_backends end
@ -76,12 +80,15 @@ local function init()
}
package.loaded["ngx.balancer"] = mock_ngx_balancer
package.loaded["resty.lrucache"] = mock_lrucache
package.loaded["resty.string"] = {}
package.loaded["resty.sha1"] = {}
package.loaded["resty.md5"] = {}
package.loaded["resty.cookie"] = {}
package.loaded["cjson"] = mock_cjson
package.loaded["resty.lock"] = mock_lock
package.loaded["balancer.ewma"] = mock_ewma
package.loaded["balancer.resty"] = mock_resty_balancer
package.loaded["configuration"] = mock_config
package.loaded["sticky"] = mock_sticky
balancer = require("balancer")
end
@ -91,7 +98,7 @@ describe("[balancer_test]", function()
end)
teardown(function()
local packages = {"ngx.balancer", "resty.lrucache","cjson", "resty.lock", "balancer.ewma","configuration", "sticky"}
local packages = {"ngx.balancer", "resty.lrucache","cjson", "resty.lock", "balancer.ewma","configuration"}
for i, package_name in ipairs(packages) do
package.loaded[package_name] = nil
end
@ -108,7 +115,6 @@ describe("[balancer_test]", function()
_G.ngx.get_phase = nil
_G.ngx.var = {}
mock_backends._vals = default_backends
mock_sticky.is_sticky = function(b) return false end
end)
describe("phase=log", function()
@ -121,6 +127,7 @@ describe("[balancer_test]", function()
local ewma_after_balance_spy = spy.on(mock_ewma, "after_balance")
mock_resty_balancer.is_applicable = function(b) return false end
assert.has_no_errors(balancer.call)
assert.spy(ewma_after_balance_spy).was_called()
end)
@ -130,6 +137,7 @@ describe("[balancer_test]", function()
local ewma_after_balance_spy = spy.on(mock_ewma, "after_balance")
mock_resty_balancer.is_applicable = function(b) return true end
assert.has_no_errors(balancer.call)
assert.spy(ewma_after_balance_spy).was_not_called()
end)
@ -173,46 +181,13 @@ describe("[balancer_test]", function()
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")
mock_resty_balancer.is_applicable = function(b) return false end
assert.has_no_errors(balancer.call)
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_ewma_backend")
assert.spy(set_more_tries_spy).was_called_with(1)
assert.spy(set_current_peer_spy).was_called_with("000.000.111", "8083")
end)
it("sticky=true, returns stored endpoints and peer was successfully set", function()
_G.ngx.var.proxy_upstream_name = "mock_rr_backend"
mock_sticky.is_sticky = function(b) return true end
mock_sticky.get_endpoint = function() return {address = "000.000.011", port = "8082"} end
local backend_get_spy = spy.on(mock_backends, "get")
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")
assert.has_no_errors(balancer.call)
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
assert.spy(set_more_tries_spy).was_called_with(1)
assert.spy(set_current_peer_spy).was_called_with("000.000.011", "8082")
end)
it("sticky=true, does not return stored endpoints, defaults to round robin", function()
_G.ngx.var.proxy_upstream_name = "mock_rr_backend"
mock_sticky.is_sticky = function(b) return true end
mock_sticky.get_endpoint = function() return nil end
mock_sticky.set_endpoint = function(b) return end
local backend_get_spy = spy.on(mock_backends, "get")
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")
mock_resty_balancer.balance = function(b) return {address = "000.000.000", port = "8080"} end
assert.has_no_errors(balancer.call)
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
assert.spy(set_more_tries_spy).was_called_with(1)
assert.spy(set_current_peer_spy).was_called_with("000.000.000", "8080")
end)
it("fails when no backend exists", function()
_G.ngx.var.proxy_upstream_name = "mock_rr_backend"
@ -276,18 +251,18 @@ describe("[balancer_test]", function()
end)
it("lb_alg=ewma, updates backend when sync is required", function()
_G.ngx.var.proxy_upstream_name = "mock_ewma_backend"
mock_config.get_backends_data = function() return { default_backends.mock_ewma_backend } end
mock_backends._vals = {}
local backend_set_spy = spy.on(mock_backends, "set")
local ewma_flush_spy = spy.on(_G.ngx.shared.balancer_ewma, "flush_all")
local ewma_lta_flush_spy = spy.on(_G.ngx.shared.balancer_ewma_last_touched_at, "flush_all")
local ewma_sync_spy = spy.on(mock_ewma, "sync")
mock_resty_balancer.is_applicable = function(b) return false end
assert.has_no_errors(balancer.init_worker)
assert.spy(backend_set_spy)
.was_called_with(match.is_table(), default_backends.mock_ewma_backend.name, match.is_table())
assert.spy(ewma_flush_spy).was_called_with(match.is_table())
assert.spy(ewma_lta_flush_spy).was_called_with(match.is_table())
assert.spy(ewma_sync_spy).was_called()
end)
end)
end)

View file

@ -1,6 +1,34 @@
local _M = {}
local string_len = string.len
local string_sub = string.sub
local resty_str = require("resty.string")
local resty_sha1 = require("resty.sha1")
local resty_md5 = require("resty.md5")
local _M = {}
local function hash_digest(hash_factory, message)
local hash = hash_factory:new()
if not hash then
return nil, "failed to create object"
end
local ok = hash:update(message)
if not ok then
return nil, "failed to add data"
end
local binary_digest = hash:final()
if binary_digest == nil then
return nil, "failed to create digest"
end
return resty_str.to_hex(binary_digest), nil
end
function _M.sha1_digest(message)
return hash_digest(resty_sha1, message)
end
function _M.md5_digest(message)
return hash_digest(resty_md5, message)
end
-- given an Nginx variable i.e $request_uri
-- it returns value of ngx.var[request_uri]

View file

@ -256,6 +256,9 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
Context("when session affinity annotation is present", func() {
It("should use sticky sessions when ingress rules are configured", func() {
err := framework.UpdateDeployment(f.KubeClientSet, f.IngressController.Namespace, "nginx-ingress-controller", 2, nil)
Expect(err).NotTo(HaveOccurred())
cookieName := "STICKYSESSION"
By("Updating affinity annotation on ingress")
@ -275,47 +278,41 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
By("Making a first request")
host := "foo.com"
resp, _, errs := gorequest.New().
resp, body, errs := gorequest.New().
Get(f.IngressController.HTTPURL).
Set("Host", host).
End()
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
hostnamePattern := regexp.MustCompile(`Hostname: ([a-zA-Z0-9\-]+)`)
upstreamName := hostnamePattern.FindAllStringSubmatch(body, -1)[0][1]
cookies := (*http.Response)(resp).Cookies()
sessionCookie, err := getCookie(cookieName, cookies)
Expect(err).ToNot(HaveOccurred())
By("Making a second request with the previous session cookie")
resp, _, errs = gorequest.New().
Get(f.IngressController.HTTPURL).
AddCookie(sessionCookie).
Set("Host", host).
End()
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
Expect(sessionCookie.Domain).Should(Equal(host))
By("Making a third request with no cookie")
resp, _, errs = gorequest.New().
Get(f.IngressController.HTTPURL).
Set("Host", host).
End()
By("Making many requests with the previous session cookie")
for i := 0; i < 5; i++ {
resp, _, errs = gorequest.New().
Get(f.IngressController.HTTPURL).
AddCookie(sessionCookie).
Set("Host", host).
End()
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
newCookies := (*http.Response)(resp).Cookies()
_, err := getCookie(cookieName, newCookies)
By("Omitting cookie in all subsequent requests")
Expect(err).To(HaveOccurred())
log, err := f.NginxLogs()
Expect(err).ToNot(HaveOccurred())
Expect(log).ToNot(BeEmpty())
By("Checking that upstreams are sticky when session cookie is used")
index := strings.Index(log, fmt.Sprintf("reason: 'UPDATE' Ingress %s/foo.com", f.IngressController.Namespace))
reqLogs := log[index:]
re := regexp.MustCompile(`\d{1,3}(?:\.\d{1,3}){3}(?::\d{1,5})`)
upstreams := re.FindAllString(reqLogs, -1)
Expect(len(upstreams)).Should(BeNumerically("==", 3))
Expect(upstreams[0]).To(Equal(upstreams[1]))
Expect(upstreams[1]).ToNot(Equal(upstreams[2]))
By("By proxying to the same upstream")
newUpstreamName := hostnamePattern.FindAllStringSubmatch(body, -1)[0][1]
Expect(newUpstreamName).Should(Equal(upstreamName))
}
})
It("should NOT use sticky sessions when a default backend and no ingress rules configured", func() {