upstream-hash-by annotation support for dynamic configuraton mode
This commit is contained in:
parent
0813b38314
commit
2ce9196ecf
7 changed files with 174 additions and 3 deletions
File diff suppressed because one or more lines are too long
|
@ -6,6 +6,7 @@ local lrucache = require("resty.lrucache")
|
|||
local resty_lock = require("resty.lock")
|
||||
local ewma = require("balancer.ewma")
|
||||
local sticky = require("sticky")
|
||||
local chash = require("balancer.chash")
|
||||
|
||||
-- measured in seconds
|
||||
-- for an Nginx worker to pick up the new list of upstream peers
|
||||
|
@ -62,6 +63,11 @@ local function balance()
|
|||
lb_alg = "round_robin"
|
||||
end
|
||||
|
||||
if backend["upstream-hash-by"] then
|
||||
local endpoint = chash.balance(backend)
|
||||
return endpoint.address, endpoint.port
|
||||
end
|
||||
|
||||
if lb_alg == "ip_hash" then
|
||||
-- TODO(elvinefendi) implement me
|
||||
return backend.endpoints[0].address, backend.endpoints[0].port
|
||||
|
@ -111,6 +117,11 @@ local function sync_backend(backend)
|
|||
ngx.shared.balancer_ewma_last_touched_at:flush_all()
|
||||
end
|
||||
|
||||
-- reset chash for this backend
|
||||
if backend["upstream-hash-by"] then
|
||||
chash.reinit(backend)
|
||||
end
|
||||
|
||||
ngx.log(ngx.INFO, "syncronization completed for: " .. backend.name)
|
||||
end
|
||||
|
||||
|
|
48
rootfs/etc/nginx/lua/balancer/chash.lua
Normal file
48
rootfs/etc/nginx/lua/balancer/chash.lua
Normal file
|
@ -0,0 +1,48 @@
|
|||
local resty_chash = require("resty.chash")
|
||||
local util = require("util")
|
||||
local string_sub = string.sub
|
||||
|
||||
local _M = {}
|
||||
local instances = {}
|
||||
|
||||
-- given an Nginx variable i.e $request_uri
|
||||
-- it returns value of ngx.var[request_uri]
|
||||
local function get_lua_ngx_var(ngx_var)
|
||||
local var_name = string_sub(ngx_var, 2)
|
||||
return ngx.var[var_name]
|
||||
end
|
||||
|
||||
function _M.balance(backend)
|
||||
local instance = instances[backend.name]
|
||||
if not instance then
|
||||
return nil
|
||||
end
|
||||
|
||||
local key = get_lua_ngx_var(backend["upstream-hash-by"])
|
||||
local endpoint_string = instance:find(key)
|
||||
|
||||
local address, port = util.split_pair(endpoint_string, ":")
|
||||
return { address = address, port = port }
|
||||
end
|
||||
|
||||
function _M.reinit(backend)
|
||||
local instance = instances[backend.name]
|
||||
|
||||
local nodes = {}
|
||||
-- we don't support weighted consistent hashing
|
||||
local weight = 1
|
||||
|
||||
for _, endpoint in pairs(backend.endpoints) do
|
||||
local endpoint_string = endpoint.address .. ":" .. endpoint.port
|
||||
nodes[endpoint_string] = weight
|
||||
end
|
||||
|
||||
if instance then
|
||||
instance:reinit(nodes)
|
||||
else
|
||||
instance = resty_chash:new(nodes)
|
||||
instances[backend.name] = instance
|
||||
end
|
||||
end
|
||||
|
||||
return _M
|
6
rootfs/etc/nginx/lua/test/balancer/chash_test.lua
Normal file
6
rootfs/etc/nginx/lua/test/balancer/chash_test.lua
Normal file
|
@ -0,0 +1,6 @@
|
|||
local cwd = io.popen("pwd"):read('*l')
|
||||
package.path = cwd .. "/rootfs/etc/nginx/lua/?.lua;" .. package.path
|
||||
|
||||
describe("[chash_test]", function()
|
||||
-- TODO(elvinefendi) add unit tests
|
||||
end)
|
|
@ -49,6 +49,7 @@ local function init()
|
|||
mock_sticky = {}
|
||||
mock_ngx_balancer = {}
|
||||
mock_ewma = {}
|
||||
mock_chash = {}
|
||||
mock_backends = dict_generator(default_backends)
|
||||
mock_lrucache = {
|
||||
new = function () return mock_backends end
|
||||
|
@ -78,6 +79,7 @@ local function init()
|
|||
package.loaded["cjson"] = mock_cjson
|
||||
package.loaded["resty.lock"] = mock_lock
|
||||
package.loaded["balancer.ewma"] = mock_ewma
|
||||
package.loaded["balancer.chash"] = mock_chash
|
||||
package.loaded["configuration"] = mock_config
|
||||
package.loaded["sticky"] = mock_sticky
|
||||
balancer = require("balancer")
|
||||
|
|
|
@ -1,6 +1,17 @@
|
|||
local _M = {}
|
||||
local string_len = string.len
|
||||
|
||||
function _M.split_pair(pair, seperator)
|
||||
local i = pair:find(seperator)
|
||||
if i == nil then
|
||||
return pair, nil
|
||||
else
|
||||
local name = pair:sub(1, i - 1)
|
||||
local value = pair:sub(i + 1, -1)
|
||||
return name, value
|
||||
end
|
||||
end
|
||||
|
||||
-- this implementation is taken from
|
||||
-- https://web.archive.org/web/20131225070434/http://snippets.luacode.org/snippets/Deep_Comparison_of_Two_Values_3
|
||||
-- and modified for use in this project
|
||||
|
|
|
@ -218,6 +218,51 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
|
|||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should not fail requests when upstream-hash-by annotation is set", func() {
|
||||
ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.IngressController.Namespace).Get("foo.com", metav1.GetOptions{})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
ingress.ObjectMeta.Annotations["nginx.ingress.kubernetes.io/upstream-hash-by"] = "$query_string"
|
||||
_, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.IngressController.Namespace).Update(ingress)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
replicas := 2
|
||||
err = framework.UpdateDeployment(f.KubeClientSet, f.IngressController.Namespace, "http-svc", replicas, nil)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
resp, body, errs := gorequest.New().
|
||||
Get(fmt.Sprintf("%s?a-unique-request-uri", f.IngressController.HTTPURL)).
|
||||
Set("Host", "foo.com").
|
||||
End()
|
||||
Expect(len(errs)).Should(Equal(0))
|
||||
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
|
||||
|
||||
hostnamePattern := regexp.MustCompile(`Hostname: ([a-zA-Z0-9\-]+)`)
|
||||
upstreamName := hostnamePattern.FindAllStringSubmatch(body, -1)[0][1]
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
resp, body, errs := gorequest.New().
|
||||
Get(fmt.Sprintf("%s?a-unique-request-uri", f.IngressController.HTTPURL)).
|
||||
Set("Host", "foo.com").
|
||||
End()
|
||||
Expect(len(errs)).Should(Equal(0))
|
||||
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
|
||||
newUpstreamName := hostnamePattern.FindAllStringSubmatch(body, -1)[0][1]
|
||||
Expect(newUpstreamName).Should(Equal(upstreamName))
|
||||
}
|
||||
|
||||
resp, body, errs = gorequest.New().
|
||||
Get(fmt.Sprintf("%s?completely-different-path", f.IngressController.HTTPURL)).
|
||||
Set("Host", "foo.com").
|
||||
End()
|
||||
Expect(len(errs)).Should(Equal(0))
|
||||
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
|
||||
anotherUpstreamName := hostnamePattern.FindAllStringSubmatch(body, -1)[0][1]
|
||||
Expect(anotherUpstreamName).NotTo(Equal(upstreamName))
|
||||
})
|
||||
|
||||
Context("when session affinity annotation is present", func() {
|
||||
It("should use sticky sessions when ingress rules are configured", func() {
|
||||
cookieName := "STICKYSESSION"
|
||||
|
|
Loading…
Reference in a new issue