ingress-nginx-helm/rootfs/etc/nginx/lua/tcp_udp_balancer.lua

178 lines
4.9 KiB
Lua
Raw Normal View History

local ngx_balancer = require("ngx.balancer")
2019-01-18 03:46:14 +00:00
local cjson = require("cjson.safe")
local util = require("util")
local dns_lookup = require("util.dns").lookup
local configuration = require("tcp_udp_configuration")
local round_robin = require("balancer.round_robin")
-- measured in seconds
-- for an Nginx worker to pick up the new list of upstream peers
-- it will take <the delay until controller POSTed the backend object to the Nginx endpoint> + BACKENDS_SYNC_INTERVAL
local BACKENDS_SYNC_INTERVAL = 1
local DEFAULT_LB_ALG = "round_robin"
local IMPLEMENTATIONS = {
round_robin = round_robin
}
local _M = {}
local balancers = {}
local function get_implementation(backend)
local name = backend["load-balance"] or DEFAULT_LB_ALG
local implementation = IMPLEMENTATIONS[name]
if not implementation then
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
implementation = IMPLEMENTATIONS[DEFAULT_LB_ALG]
end
return implementation
end
local function resolve_external_names(original_backend)
local backend = util.deepcopy(original_backend)
local endpoints = {}
for _, endpoint in ipairs(backend.endpoints) do
local ips = dns_lookup(endpoint.address)
for _, ip in ipairs(ips) do
table.insert(endpoints, {address = ip, port = endpoint.port})
end
end
backend.endpoints = endpoints
return backend
end
local function format_ipv6_endpoints(endpoints)
local formatted_endpoints = {}
for _, endpoint in ipairs(endpoints) do
local formatted_endpoint = endpoint
if not endpoint.address:match("^%d+.%d+.%d+.%d+$") then
formatted_endpoint.address = string.format("[%s]", endpoint.address)
end
table.insert(formatted_endpoints, formatted_endpoint)
end
return formatted_endpoints
end
local function sync_backend(backend)
if not backend.endpoints or #backend.endpoints == 0 then
ngx.log(ngx.INFO, string.format("there is no endpoint for backend %s. Skipping...", backend.name))
return
end
ngx.log(ngx.INFO, string.format("backend ", backend.name))
local implementation = get_implementation(backend)
local balancer = balancers[backend.name]
if not balancer then
balancers[backend.name] = implementation:new(backend)
return
end
-- every implementation is the metatable of its instances (see .new(...) functions)
-- here we check if `balancer` is the instance of `implementation`
-- if it is not then we deduce LB algorithm has changed for the backend
if getmetatable(balancer) ~= implementation then
ngx.log(
ngx.INFO,
string.format("LB algorithm changed from %s to %s, resetting the instance", balancer.name, implementation.name)
)
balancers[backend.name] = implementation:new(backend)
return
end
local service_type = backend.service and backend.service.spec and backend.service.spec["type"]
if service_type == "ExternalName" then
backend = resolve_external_names(backend)
end
backend.endpoints = format_ipv6_endpoints(backend.endpoints)
balancer:sync(backend)
end
local function sync_backends()
local backends_data = configuration.get_backends_data()
if not backends_data then
balancers = {}
return
end
2019-01-18 03:46:14 +00:00
local new_backends, err = cjson.decode(backends_data)
if not new_backends then
ngx.log(ngx.ERR, "could not parse backends data: ", err)
return
end
local balancers_to_keep = {}
for _, new_backend in ipairs(new_backends) do
sync_backend(new_backend)
balancers_to_keep[new_backend.name] = balancers[new_backend.name]
end
for backend_name, _ in pairs(balancers) do
if not balancers_to_keep[backend_name] then
balancers[backend_name] = nil
end
end
end
local function get_balancer()
local backend_name = ngx.var.proxy_upstream_name
local balancer = balancers[backend_name]
if not balancer then
return
end
return balancer
end
function _M.init_worker()
sync_backends() -- when worker starts, sync backends without delay
local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if err then
ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
end
end
function _M.balance()
local balancer = get_balancer()
if not balancer then
return
end
local peer = balancer:balance()
if not peer then
ngx.log(ngx.WARN, "no peer was returned, balancer: " .. balancer.name)
return
end
2018-11-16 20:52:46 +00:00
ngx_balancer.set_more_tries(1)
local ok, err = ngx_balancer.set_current_peer(peer)
if not ok then
ngx.log(ngx.ERR, string.format("error while setting current upstream peer %s: %s", peer, err))
end
end
function _M.log()
local balancer = get_balancer()
if not balancer then
return
end
if not balancer.after_balance then
return
end
balancer:after_balance()
end
if _TEST then
_M.get_implementation = get_implementation
_M.sync_backend = sync_backend
end
return _M