diff --git a/rootfs/etc/nginx/lua/balancer.lua b/rootfs/etc/nginx/lua/balancer.lua index 00104c89d..be56f2207 100644 --- a/rootfs/etc/nginx/lua/balancer.lua +++ b/rootfs/etc/nginx/lua/balancer.lua @@ -1,5 +1,4 @@ local ngx_balancer = require("ngx.balancer") -local cjson = require("cjson.safe") local util = require("util") local dns_lookup = require("util.dns").lookup local configuration = require("configuration") @@ -143,21 +142,15 @@ local function sync_backends_with_external_name() end end -local function sync_backends() +local function sync_backends(is_init_call) local raw_backends_last_synced_at = configuration.get_raw_backends_last_synced_at() if raw_backends_last_synced_at <= backends_last_synced_at then return end - local backends_data = configuration.get_backends_data() - if not backends_data then - balancers = {} - return - end - - local new_backends, err = cjson.decode(backends_data) + local new_backends = configuration.get_backends_data(is_init_call) if not new_backends then - ngx.log(ngx.ERR, "could not parse backends data: ", err) + balancers = {} return end @@ -296,7 +289,7 @@ end function _M.init_worker() -- when worker starts, sync non ExternalName backends without delay - sync_backends() + sync_backends(true) -- we call sync_backends_with_external_name in timer because for endpoints that require -- DNS resolution it needs to use socket which is not available in -- init_worker phase diff --git a/rootfs/etc/nginx/lua/configuration.lua b/rootfs/etc/nginx/lua/configuration.lua index 50de662bc..98703129f 100644 --- a/rootfs/etc/nginx/lua/configuration.lua +++ b/rootfs/etc/nginx/lua/configuration.lua @@ -1,4 +1,5 @@ local cjson = require("cjson.safe") +local util = require("util") local io = io local ngx = ngx @@ -6,6 +7,7 @@ local tostring = tostring local string = string local table = table local pairs = pairs +local ipairs = ipairs -- this is the Lua representation of Configuration struct in internal/ingress/types.go local configuration_data = ngx.shared.configuration_data @@ -14,13 +16,10 @@ local certificate_servers = ngx.shared.certificate_servers local ocsp_response_cache = ngx.shared.ocsp_response_cache local EMPTY_UID = "-1" +local BACKEND_BUCKET_SIZE = 10 local _M = {} -function _M.get_backends_data() - return configuration_data:get("backends") -end - function _M.get_general_data() return configuration_data:get("general") end @@ -185,11 +184,130 @@ local function handle_certs() end end +local function get_backends_by_name(name) + local backends_data = configuration_data:get(name) + if not backends_data then + ngx.log( ngx.ERR, string.format("could not get backends data by %s", name)) + return + end + + local backends, err = cjson.decode(backends_data) + if not backends then + ngx.log(ngx.ERR, "could not parse backends data: ", err) + return + end + return backends +end + +local function get_backend_bucket_names() + local names = configuration_data:get("backend_bucket_names") + if not names then + return + end + local backend_bucket_names, err = cjson.decode(names) + if not backend_bucket_names then + ngx.log(ngx.ERR, "could not parse backend bucket names data: ", err) + return + end + + return backend_bucket_names +end + +local function get_backends(is_init_call) + ngx.log(ngx.DEBUG, "start get bucket name") + local bucket_names = get_backend_bucket_names() + if not bucket_names then + ngx.log(ngx.WARN, "bucket name not found") + return + end + local all_backends = {} + for _, name in ipairs(bucket_names) do + local backends = get_backends_by_name(name) + for _, v in ipairs(backends) do + table.insert(all_backends, v) + end + + if not is_init_call then + ngx.sleep(0) + end + end + + return all_backends +end + +local function save_backends(backends) + local backend_buckets = {} + local backend_bucket_names = {} + local backend_bucket_size = BACKEND_BUCKET_SIZE + local tmp_backends = {} + for _, v in ipairs(backends) do + if table.getn(tmp_backends) >= BACKEND_BUCKET_SIZE then + local bucket_key = string.format("bucket_%d", backend_bucket_size) + local batch_backends = util.deepcopy(tmp_backends) + table.insert(backend_bucket_names, bucket_key) + table.insert(backend_buckets, batch_backends) + tmp_backends = {} + backend_bucket_size = backend_bucket_size + BACKEND_BUCKET_SIZE + end + table.insert(tmp_backends, v) + end + + if table.getn(tmp_backends) > 0 then + local bucket_key = string.format("bucket_%d", backend_bucket_size) + local batch_backends = util.deepcopy(tmp_backends) + table.insert(backend_bucket_names, bucket_key) + table.insert(backend_buckets, batch_backends) + end + + for i, bucket in ipairs(backend_buckets) do + local new_backends, encode_err = cjson.encode(bucket) + if not new_backends then + ngx.log(ngx.ERR, "could not parse backends data: ", encode_err) + ngx.status = ngx.HTTP_BAD_REQUEST + return + end + + local backend_bucket_name = backend_bucket_names[i] + local success, set_err = configuration_data:set(backend_bucket_name, new_backends) + if not success then + ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(set_err)) + ngx.status = ngx.HTTP_BAD_REQUEST + return + end + end + + local new_backend_names, encode_err = cjson.encode(backend_bucket_names) + if not new_backend_names then + ngx.log(ngx.ERR, "could not parse backends_names data: ", encode_err) + ngx.status = ngx.HTTP_BAD_REQUEST + return + end + + local success, set_err = configuration_data:set("backend_bucket_names", new_backend_names) + if not success then + ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(set_err)) + ngx.status = ngx.HTTP_BAD_REQUEST + return + end +end local function handle_backends() if ngx.var.request_method == "GET" then + local backends = get_backends(false) + if not backends then + ngx.log(ngx.ERR, "could not get backends") + ngx.status = ngx.HTTP_BAD_REQUEST + return + end + + local backends_raw, err = cjson.encode(backends) + if err then + ngx.log(ngx.ERR, "could not decode backends data: ", err) + ngx.status = ngx.HTTP_BAD_REQUEST + return + end ngx.status = ngx.HTTP_OK - ngx.print(_M.get_backends_data()) + ngx.print(backends_raw) return end @@ -200,19 +318,26 @@ local function handle_backends() return end - local success, err = configuration_data:set("backends", backends) - if not success then - ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(err)) + local new_backends, decode_err = cjson.decode(backends) + if not new_backends then + ngx.log(ngx.ERR, "could not parse backends data: ", decode_err) ngx.status = ngx.HTTP_BAD_REQUEST return end + save_backends(new_backends) ngx.update_time() local raw_backends_last_synced_at = ngx.time() - success, err = configuration_data:set("raw_backends_last_synced_at", raw_backends_last_synced_at) + local success, set_err = configuration_data:set( + "raw_backends_last_synced_at", + raw_backends_last_synced_at + ) if not success then - ngx.log(ngx.ERR, "dynamic-configuration: error updating when backends sync, " .. - "new upstream peers waiting for force syncing: " .. tostring(err)) + ngx.log( + ngx.ERR, + "dynamic-configuration: error updating when backends sync, " .. + "new upstream peers waiting for force syncing: " .. tostring(set_err) + ) ngx.status = ngx.HTTP_BAD_REQUEST return end @@ -251,6 +376,10 @@ function _M.call() ngx.print("Not found!") end +function _M.get_backends_data(is_init_call) + return get_backends(is_init_call) +end + setmetatable(_M, {__index = { handle_servers = handle_servers }}) return _M diff --git a/rootfs/etc/nginx/lua/test/balancer_test.lua b/rootfs/etc/nginx/lua/test/balancer_test.lua index 2d42ad330..46ff8d38a 100644 --- a/rootfs/etc/nginx/lua/test/balancer_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer_test.lua @@ -524,7 +524,8 @@ describe("Balancer", function() } mock_ngx({ var = { proxy_upstream_name = "access-router-production-web-80" }, ctx = { } }, function() - ngx.shared.configuration_data:set("backends", cjson.encode(backends)) + ngx.shared.configuration_data:set("backend_bucket_names", cjson.encode({"bucket_1"})) + ngx.shared.configuration_data:set("bucket_1", cjson.encode(backends)) end) balancer.init_worker() diff --git a/rootfs/etc/nginx/lua/test/configuration_test.lua b/rootfs/etc/nginx/lua/test/configuration_test.lua index 64a048929..6de17f0a7 100644 --- a/rootfs/etc/nginx/lua/test/configuration_test.lua +++ b/rootfs/etc/nginx/lua/test/configuration_test.lua @@ -30,6 +30,56 @@ local function get_backends() } end + +-- from +-- https://stackoverflow.com/questions/25922437/how-can-i-deep-compare-2-lua-tables-which-may-or-may-not-have-tables-as-keys +local function table_eq(table1, table2) + local avoid_loops = {} + local function recurse(t1, t2) + -- compare value types + if type(t1) ~= type(t2) then return false end + -- Base case: compare simple values + if type(t1) ~= "table" then return t1 == t2 end + -- Now, on to tables. + -- First, let's avoid looping forever. + if avoid_loops[t1] then return avoid_loops[t1] == t2 end + avoid_loops[t1] = t2 + -- Copy keys from t2 + local t2keys = {} + local t2tablekeys = {} + for k, _ in pairs(t2) do + if type(k) == "table" then table.insert(t2tablekeys, k) end + t2keys[k] = true + end + -- Let's iterate keys from t1 + for k1, v1 in pairs(t1) do + local v2 = t2[k1] + if type(k1) == "table" then + -- if key is a table, we need to find an equivalent one. + local ok = false + for i, tk in ipairs(t2tablekeys) do + if table_eq(k1, tk) and recurse(v1, t2[tk]) then + table.remove(t2tablekeys, i) + t2keys[tk] = nil + ok = true + break + end + end + if not ok then return false end + else + -- t1 has a key which t2 doesn't have, fail. + if v2 == nil then return false end + t2keys[k1] = nil + if not recurse(v1, v2) then return false end + end + end + -- if t2 has a key which t1 doesn't have, fail. + if next(t2keys) then return false end + return true + end + return recurse(table1, table2) +end + local function get_mocked_ngx_env() local _ngx = { status = ngx.HTTP_OK, @@ -80,11 +130,25 @@ describe("Configuration", function() it("returns the current configured backends on the response body", function() -- Encoding backends since comparing tables fail due to reference comparison + local backend_names = {} + table.insert(backend_names, "bucket_1") local encoded_backends = cjson.encode(get_backends()) - ngx.shared.configuration_data:set("backends", encoded_backends) + ngx.shared.configuration_data:set("backend_bucket_names", cjson.encode(backend_names)) + ngx.shared.configuration_data:set("bucket_1", encoded_backends) local s = spy.on(ngx, "print") assert.has_no.errors(configuration.call) - assert.spy(s).was_called_with(encoded_backends) + + local resp_backends = nil + if #s.calls > 0 then + resp_backends = s.calls[#s.calls]["vals"] + end + + local resp_backends_tab + for _, value in ipairs(resp_backends) do + resp_backends_tab = cjson.decode(value) + end + + assert.equal(table_eq(resp_backends_tab, get_backends()), true) end) it("returns a status of 200", function() @@ -102,7 +166,7 @@ describe("Configuration", function() it("stores the posted backends on the shared dictionary", function() -- Encoding backends since comparing tables fail due to reference comparison assert.has_no.errors(configuration.call) - assert.equal(ngx.shared.configuration_data:get("backends"), cjson.encode(get_backends())) + assert.equal(ngx.shared.configuration_data:get("bucket_1"), cjson.encode(get_backends())) end) context("Failed to read request body", function() diff --git a/test/e2e/annotations/serviceupstream.go b/test/e2e/annotations/serviceupstream.go index 94ad6d346..6d1647941 100644 --- a/test/e2e/annotations/serviceupstream.go +++ b/test/e2e/annotations/serviceupstream.go @@ -62,7 +62,7 @@ var _ = framework.DescribeAnnotation("service-upstream", func() { curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort) output, err := f.ExecIngressPod(curlCmd) assert.Nil(ginkgo.GinkgoT(), err) - assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`{"address":"%s"`, s.Spec.ClusterIP)) + assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`"address":"%s"`, s.Spec.ClusterIP)) }) }) @@ -91,7 +91,7 @@ var _ = framework.DescribeAnnotation("service-upstream", func() { curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort) output, err := f.ExecIngressPod(curlCmd) assert.Nil(ginkgo.GinkgoT(), err) - assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`{"address":"%s"`, s.Spec.ClusterIP)) + assert.Contains(ginkgo.GinkgoT(), output, fmt.Sprintf(`"address":"%s"`, s.Spec.ClusterIP)) }) }) @@ -122,7 +122,7 @@ var _ = framework.DescribeAnnotation("service-upstream", func() { curlCmd := fmt.Sprintf("curl --fail --silent http://localhost:%v/configuration/backends", nginx.StatusPort) output, err := f.ExecIngressPod(curlCmd) assert.Nil(ginkgo.GinkgoT(), err) - assert.NotContains(ginkgo.GinkgoT(), output, fmt.Sprintf(`{"address":"%s"`, s.Spec.ClusterIP)) + assert.NotContains(ginkgo.GinkgoT(), output, fmt.Sprintf(`"address":"%s"`, s.Spec.ClusterIP)) }) }) })