perf: backend split to buckets
This commit is contained in:
parent
54389680a7
commit
c113bb75f1
5 changed files with 216 additions and 29 deletions
|
@ -1,5 +1,4 @@
|
||||||
local ngx_balancer = require("ngx.balancer")
|
local ngx_balancer = require("ngx.balancer")
|
||||||
local cjson = require("cjson.safe")
|
|
||||||
local util = require("util")
|
local util = require("util")
|
||||||
local dns_lookup = require("util.dns").lookup
|
local dns_lookup = require("util.dns").lookup
|
||||||
local configuration = require("configuration")
|
local configuration = require("configuration")
|
||||||
|
@ -143,21 +142,15 @@ local function sync_backends_with_external_name()
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
local function sync_backends()
|
local function sync_backends(is_init_call)
|
||||||
local raw_backends_last_synced_at = configuration.get_raw_backends_last_synced_at()
|
local raw_backends_last_synced_at = configuration.get_raw_backends_last_synced_at()
|
||||||
if raw_backends_last_synced_at <= backends_last_synced_at then
|
if raw_backends_last_synced_at <= backends_last_synced_at then
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
local backends_data = configuration.get_backends_data()
|
local new_backends = configuration.get_backends_data(is_init_call)
|
||||||
if not backends_data then
|
|
||||||
balancers = {}
|
|
||||||
return
|
|
||||||
end
|
|
||||||
|
|
||||||
local new_backends, err = cjson.decode(backends_data)
|
|
||||||
if not new_backends then
|
if not new_backends then
|
||||||
ngx.log(ngx.ERR, "could not parse backends data: ", err)
|
balancers = {}
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -296,7 +289,7 @@ end
|
||||||
|
|
||||||
function _M.init_worker()
|
function _M.init_worker()
|
||||||
-- when worker starts, sync non ExternalName backends without delay
|
-- when worker starts, sync non ExternalName backends without delay
|
||||||
sync_backends()
|
sync_backends(true)
|
||||||
-- we call sync_backends_with_external_name in timer because for endpoints that require
|
-- we call sync_backends_with_external_name in timer because for endpoints that require
|
||||||
-- DNS resolution it needs to use socket which is not available in
|
-- DNS resolution it needs to use socket which is not available in
|
||||||
-- init_worker phase
|
-- init_worker phase
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
local cjson = require("cjson.safe")
|
local cjson = require("cjson.safe")
|
||||||
|
local util = require("util")
|
||||||
|
|
||||||
local io = io
|
local io = io
|
||||||
local ngx = ngx
|
local ngx = ngx
|
||||||
|
@ -6,6 +7,7 @@ local tostring = tostring
|
||||||
local string = string
|
local string = string
|
||||||
local table = table
|
local table = table
|
||||||
local pairs = pairs
|
local pairs = pairs
|
||||||
|
local ipairs = ipairs
|
||||||
|
|
||||||
-- this is the Lua representation of Configuration struct in internal/ingress/types.go
|
-- this is the Lua representation of Configuration struct in internal/ingress/types.go
|
||||||
local configuration_data = ngx.shared.configuration_data
|
local configuration_data = ngx.shared.configuration_data
|
||||||
|
@ -14,13 +16,10 @@ local certificate_servers = ngx.shared.certificate_servers
|
||||||
local ocsp_response_cache = ngx.shared.ocsp_response_cache
|
local ocsp_response_cache = ngx.shared.ocsp_response_cache
|
||||||
|
|
||||||
local EMPTY_UID = "-1"
|
local EMPTY_UID = "-1"
|
||||||
|
local BACKEND_BUCKET_SIZE = 10
|
||||||
|
|
||||||
local _M = {}
|
local _M = {}
|
||||||
|
|
||||||
function _M.get_backends_data()
|
|
||||||
return configuration_data:get("backends")
|
|
||||||
end
|
|
||||||
|
|
||||||
function _M.get_general_data()
|
function _M.get_general_data()
|
||||||
return configuration_data:get("general")
|
return configuration_data:get("general")
|
||||||
end
|
end
|
||||||
|
@ -185,11 +184,130 @@ local function handle_certs()
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
local function get_backends_by_name(name)
|
||||||
|
local backends_data = configuration_data:get(name)
|
||||||
|
if not backends_data then
|
||||||
|
ngx.log( ngx.ERR, string.format("could not get backends data by %s", name))
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
local backends, err = cjson.decode(backends_data)
|
||||||
|
if not backends then
|
||||||
|
ngx.log(ngx.ERR, "could not parse backends data: ", err)
|
||||||
|
return
|
||||||
|
end
|
||||||
|
return backends
|
||||||
|
end
|
||||||
|
|
||||||
|
local function get_backend_bucket_names()
|
||||||
|
local names = configuration_data:get("backend_bucket_names")
|
||||||
|
if not names then
|
||||||
|
return
|
||||||
|
end
|
||||||
|
local backend_bucket_names, err = cjson.decode(names)
|
||||||
|
if not backend_bucket_names then
|
||||||
|
ngx.log(ngx.ERR, "could not parse backend bucket names data: ", err)
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
return backend_bucket_names
|
||||||
|
end
|
||||||
|
|
||||||
|
local function get_backends(is_init_call)
|
||||||
|
ngx.log(ngx.DEBUG, "start get bucket name")
|
||||||
|
local bucket_names = get_backend_bucket_names()
|
||||||
|
if not bucket_names then
|
||||||
|
ngx.log(ngx.WARN, "bucket name not found")
|
||||||
|
return
|
||||||
|
end
|
||||||
|
local all_backends = {}
|
||||||
|
for _, name in ipairs(bucket_names) do
|
||||||
|
local backends = get_backends_by_name(name)
|
||||||
|
for _, v in ipairs(backends) do
|
||||||
|
table.insert(all_backends, v)
|
||||||
|
end
|
||||||
|
|
||||||
|
if not is_init_call then
|
||||||
|
ngx.sleep(0)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
return all_backends
|
||||||
|
end
|
||||||
|
|
||||||
|
local function save_backends(backends)
|
||||||
|
local backend_buckets = {}
|
||||||
|
local backend_bucket_names = {}
|
||||||
|
local backend_bucket_size = BACKEND_BUCKET_SIZE
|
||||||
|
local tmp_backends = {}
|
||||||
|
for _, v in ipairs(backends) do
|
||||||
|
if table.getn(tmp_backends) >= BACKEND_BUCKET_SIZE then
|
||||||
|
local bucket_key = string.format("bucket_%d", backend_bucket_size)
|
||||||
|
local batch_backends = util.deepcopy(tmp_backends)
|
||||||
|
table.insert(backend_bucket_names, bucket_key)
|
||||||
|
table.insert(backend_buckets, batch_backends)
|
||||||
|
tmp_backends = {}
|
||||||
|
backend_bucket_size = backend_bucket_size + BACKEND_BUCKET_SIZE
|
||||||
|
end
|
||||||
|
table.insert(tmp_backends, v)
|
||||||
|
end
|
||||||
|
|
||||||
|
if table.getn(tmp_backends) > 0 then
|
||||||
|
local bucket_key = string.format("bucket_%d", backend_bucket_size)
|
||||||
|
local batch_backends = util.deepcopy(tmp_backends)
|
||||||
|
table.insert(backend_bucket_names, bucket_key)
|
||||||
|
table.insert(backend_buckets, batch_backends)
|
||||||
|
end
|
||||||
|
|
||||||
|
for i, bucket in ipairs(backend_buckets) do
|
||||||
|
local new_backends, encode_err = cjson.encode(bucket)
|
||||||
|
if not new_backends then
|
||||||
|
ngx.log(ngx.ERR, "could not parse backends data: ", encode_err)
|
||||||
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
local backend_bucket_name = backend_bucket_names[i]
|
||||||
|
local success, set_err = configuration_data:set(backend_bucket_name, new_backends)
|
||||||
|
if not success then
|
||||||
|
ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(set_err))
|
||||||
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
|
return
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
local new_backend_names, encode_err = cjson.encode(backend_bucket_names)
|
||||||
|
if not new_backend_names then
|
||||||
|
ngx.log(ngx.ERR, "could not parse backends_names data: ", encode_err)
|
||||||
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
local success, set_err = configuration_data:set("backend_bucket_names", new_backend_names)
|
||||||
|
if not success then
|
||||||
|
ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(set_err))
|
||||||
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
|
return
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
local function handle_backends()
|
local function handle_backends()
|
||||||
if ngx.var.request_method == "GET" then
|
if ngx.var.request_method == "GET" then
|
||||||
|
local backends = get_backends(false)
|
||||||
|
if not backends then
|
||||||
|
ngx.log(ngx.ERR, "could not get backends")
|
||||||
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
local backends_raw, err = cjson.encode(backends)
|
||||||
|
if err then
|
||||||
|
ngx.log(ngx.ERR, "could not decode backends data: ", err)
|
||||||
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
|
return
|
||||||
|
end
|
||||||
ngx.status = ngx.HTTP_OK
|
ngx.status = ngx.HTTP_OK
|
||||||
ngx.print(_M.get_backends_data())
|
ngx.print(backends_raw)
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -200,19 +318,26 @@ local function handle_backends()
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
local success, err = configuration_data:set("backends", backends)
|
local new_backends, decode_err = cjson.decode(backends)
|
||||||
if not success then
|
if not new_backends then
|
||||||
ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(err))
|
ngx.log(ngx.ERR, "could not parse backends data: ", decode_err)
|
||||||
ngx.status = ngx.HTTP_BAD_REQUEST
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
save_backends(new_backends)
|
||||||
|
|
||||||
ngx.update_time()
|
ngx.update_time()
|
||||||
local raw_backends_last_synced_at = ngx.time()
|
local raw_backends_last_synced_at = ngx.time()
|
||||||
success, err = configuration_data:set("raw_backends_last_synced_at", raw_backends_last_synced_at)
|
local success, set_err = configuration_data:set(
|
||||||
|
"raw_backends_last_synced_at",
|
||||||
|
raw_backends_last_synced_at
|
||||||
|
)
|
||||||
if not success then
|
if not success then
|
||||||
ngx.log(ngx.ERR, "dynamic-configuration: error updating when backends sync, " ..
|
ngx.log(
|
||||||
"new upstream peers waiting for force syncing: " .. tostring(err))
|
ngx.ERR,
|
||||||
|
"dynamic-configuration: error updating when backends sync, " ..
|
||||||
|
"new upstream peers waiting for force syncing: " .. tostring(set_err)
|
||||||
|
)
|
||||||
ngx.status = ngx.HTTP_BAD_REQUEST
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
@ -251,6 +376,10 @@ function _M.call()
|
||||||
ngx.print("Not found!")
|
ngx.print("Not found!")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function _M.get_backends_data(is_init_call)
|
||||||
|
return get_backends(is_init_call)
|
||||||
|
end
|
||||||
|
|
||||||
setmetatable(_M, {__index = { handle_servers = handle_servers }})
|
setmetatable(_M, {__index = { handle_servers = handle_servers }})
|
||||||
|
|
||||||
return _M
|
return _M
|
||||||
|
|
|
@ -524,7 +524,8 @@ describe("Balancer", function()
|
||||||
}
|
}
|
||||||
|
|
||||||
mock_ngx({ var = { proxy_upstream_name = "access-router-production-web-80" }, ctx = { } }, function()
|
mock_ngx({ var = { proxy_upstream_name = "access-router-production-web-80" }, ctx = { } }, function()
|
||||||
ngx.shared.configuration_data:set("backends", cjson.encode(backends))
|
ngx.shared.configuration_data:set("backend_bucket_names", cjson.encode({"bucket_1"}))
|
||||||
|
ngx.shared.configuration_data:set("bucket_1", cjson.encode(backends))
|
||||||
end)
|
end)
|
||||||
|
|
||||||
balancer.init_worker()
|
balancer.init_worker()
|
||||||
|
|
|
@ -30,6 +30,56 @@ local function get_backends()
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
-- from
|
||||||
|
-- https://stackoverflow.com/questions/25922437/how-can-i-deep-compare-2-lua-tables-which-may-or-may-not-have-tables-as-keys
|
||||||
|
local function table_eq(table1, table2)
|
||||||
|
local avoid_loops = {}
|
||||||
|
local function recurse(t1, t2)
|
||||||
|
-- compare value types
|
||||||
|
if type(t1) ~= type(t2) then return false end
|
||||||
|
-- Base case: compare simple values
|
||||||
|
if type(t1) ~= "table" then return t1 == t2 end
|
||||||
|
-- Now, on to tables.
|
||||||
|
-- First, let's avoid looping forever.
|
||||||
|
if avoid_loops[t1] then return avoid_loops[t1] == t2 end
|
||||||
|
avoid_loops[t1] = t2
|
||||||
|
-- Copy keys from t2
|
||||||
|
local t2keys = {}
|
||||||
|
local t2tablekeys = {}
|
||||||
|
for k, _ in pairs(t2) do
|
||||||
|
if type(k) == "table" then table.insert(t2tablekeys, k) end
|
||||||
|
t2keys[k] = true
|
||||||
|
end
|
||||||
|
-- Let's iterate keys from t1
|
||||||
|
for k1, v1 in pairs(t1) do
|
||||||
|
local v2 = t2[k1]
|
||||||
|
if type(k1) == "table" then
|
||||||
|
-- if key is a table, we need to find an equivalent one.
|
||||||
|
local ok = false
|
||||||
|
for i, tk in ipairs(t2tablekeys) do
|
||||||
|
if table_eq(k1, tk) and recurse(v1, t2[tk]) then
|
||||||
|
table.remove(t2tablekeys, i)
|
||||||
|
t2keys[tk] = nil
|
||||||
|
ok = true
|
||||||
|
break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
if not ok then return false end
|
||||||
|
else
|
||||||
|
-- t1 has a key which t2 doesn't have, fail.
|
||||||
|
if v2 == nil then return false end
|
||||||
|
t2keys[k1] = nil
|
||||||
|
if not recurse(v1, v2) then return false end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
-- if t2 has a key which t1 doesn't have, fail.
|
||||||
|
if next(t2keys) then return false end
|
||||||
|
return true
|
||||||
|
end
|
||||||
|
return recurse(table1, table2)
|
||||||
|
end
|
||||||
|
|
||||||
local function get_mocked_ngx_env()
|
local function get_mocked_ngx_env()
|
||||||
local _ngx = {
|
local _ngx = {
|
||||||
status = ngx.HTTP_OK,
|
status = ngx.HTTP_OK,
|
||||||
|
@ -80,11 +130,25 @@ describe("Configuration", function()
|
||||||
|
|
||||||
it("returns the current configured backends on the response body", function()
|
it("returns the current configured backends on the response body", function()
|
||||||
-- Encoding backends since comparing tables fail due to reference comparison
|
-- Encoding backends since comparing tables fail due to reference comparison
|
||||||
|
local backend_names = {}
|
||||||
|
table.insert(backend_names, "bucket_1")
|
||||||
local encoded_backends = cjson.encode(get_backends())
|
local encoded_backends = cjson.encode(get_backends())
|
||||||
ngx.shared.configuration_data:set("backends", encoded_backends)
|
ngx.shared.configuration_data:set("backend_bucket_names", cjson.encode(backend_names))
|
||||||
|
ngx.shared.configuration_data:set("bucket_1", encoded_backends)
|
||||||
local s = spy.on(ngx, "print")
|
local s = spy.on(ngx, "print")
|
||||||
assert.has_no.errors(configuration.call)
|
assert.has_no.errors(configuration.call)
|
||||||
assert.spy(s).was_called_with(encoded_backends)
|
|
||||||
|
local resp_backends = nil
|
||||||
|
if #s.calls > 0 then
|
||||||
|
resp_backends = s.calls[#s.calls]["vals"]
|
||||||
|
end
|
||||||
|
|
||||||
|
local resp_backends_tab
|
||||||
|
for _, value in ipairs(resp_backends) do
|
||||||
|
resp_backends_tab = cjson.decode(value)
|
||||||
|
end
|
||||||
|
|
||||||
|
assert.equal(table_eq(resp_backends_tab, get_backends()), true)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
it("returns a status of 200", function()
|
it("returns a status of 200", function()
|
||||||
|
@ -102,7 +166,7 @@ describe("Configuration", function()
|
||||||
it("stores the posted backends on the shared dictionary", function()
|
it("stores the posted backends on the shared dictionary", function()
|
||||||
-- Encoding backends since comparing tables fail due to reference comparison
|
-- Encoding backends since comparing tables fail due to reference comparison
|
||||||
assert.has_no.errors(configuration.call)
|
assert.has_no.errors(configuration.call)
|
||||||
assert.equal(ngx.shared.configuration_data:get("backends"), cjson.encode(get_backends()))
|
assert.equal(ngx.shared.configuration_data:get("bucket_1"), cjson.encode(get_backends()))
|
||||||
end)
|
end)
|
||||||
|
|
||||||
context("Failed to read request body", function()
|
context("Failed to read request body", function()
|
||||||
|
|
|
@ -62,7 +62,7 @@ var _ = framework.DescribeAnnotation("service-upstream", func() {
|
||||||
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort)
|
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort)
|
||||||
output, err := f.ExecIngressPod(curlCmd)
|
output, err := f.ExecIngressPod(curlCmd)
|
||||||
assert.Nil(ginkgo.GinkgoT(), err)
|
assert.Nil(ginkgo.GinkgoT(), err)
|
||||||
assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`{"address":"%s"`, s.Spec.ClusterIP))
|
assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`"address":"%s"`, s.Spec.ClusterIP))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ var _ = framework.DescribeAnnotation("service-upstream", func() {
|
||||||
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort)
|
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort)
|
||||||
output, err := f.ExecIngressPod(curlCmd)
|
output, err := f.ExecIngressPod(curlCmd)
|
||||||
assert.Nil(ginkgo.GinkgoT(), err)
|
assert.Nil(ginkgo.GinkgoT(), err)
|
||||||
assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`{"address":"%s"`, s.Spec.ClusterIP))
|
assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`"address":"%s"`, s.Spec.ClusterIP))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ var _ = framework.DescribeAnnotation("service-upstream", func() {
|
||||||
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort)
|
curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort)
|
||||||
output, err := f.ExecIngressPod(curlCmd)
|
output, err := f.ExecIngressPod(curlCmd)
|
||||||
assert.Nil(ginkgo.GinkgoT(), err)
|
assert.Nil(ginkgo.GinkgoT(), err)
|
||||||
assert.NotContains(ginkgo.GinkgoT(), output, fmt.Sprintf(`{"address":"%s"`, s.Spec.ClusterIP))
|
assert.NotContains(ginkgo.GinkgoT(), output, fmt.Sprintf(`"address":"%s"`, s.Spec.ClusterIP))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue