ingress-nginx-helm/rootfs/etc/nginx/lua/balancer.lua

316 lines
8.7 KiB
Lua
Raw Normal View History

local ngx_balancer = require("ngx.balancer")
2019-01-18 03:46:14 +00:00
local cjson = require("cjson.safe")
local util = require("util")
local dns_lookup = require("util.dns").lookup
local configuration = require("configuration")
local round_robin = require("balancer.round_robin")
local chash = require("balancer.chash")
local chashsubset = require("balancer.chashsubset")
local sticky_balanced = require("balancer.sticky_balanced")
local sticky_persistent = require("balancer.sticky_persistent")
local ewma = require("balancer.ewma")
local string = string
local ipairs = ipairs
local table = table
local getmetatable = getmetatable
local tostring = tostring
local pairs = pairs
local math = math
local ngx = ngx
-- measured in seconds
2018-03-18 16:31:49 +00:00
-- for an Nginx worker to pick up the new list of upstream peers
-- it will take <the delay until controller POSTed the backend object to the
-- Nginx endpoint> + BACKENDS_SYNC_INTERVAL
local BACKENDS_SYNC_INTERVAL = 1
local DEFAULT_LB_ALG = "round_robin"
local IMPLEMENTATIONS = {
round_robin = round_robin,
chash = chash,
chashsubset = chashsubset,
sticky_balanced = sticky_balanced,
sticky_persistent = sticky_persistent,
ewma = ewma,
}
local _M = {}
local balancers = {}
local function get_implementation(backend)
local name = backend["load-balance"] or DEFAULT_LB_ALG
if backend["sessionAffinityConfig"] and
backend["sessionAffinityConfig"]["name"] == "cookie" then
if backend["sessionAffinityConfig"]["mode"] == 'persistent' then
name = "sticky_persistent"
else
name = "sticky_balanced"
end
elseif backend["upstreamHashByConfig"] and
backend["upstreamHashByConfig"]["upstream-hash-by"] then
if backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
name = "chashsubset"
else
name = "chash"
end
2018-04-08 20:37:13 +00:00
end
local implementation = IMPLEMENTATIONS[name]
if not implementation then
ngx.log(ngx.WARN, backend["load-balance"], "is not supported, ",
"falling back to ", DEFAULT_LB_ALG)
implementation = IMPLEMENTATIONS[DEFAULT_LB_ALG]
end
return implementation
end
local function resolve_external_names(original_backend)
local backend = util.deepcopy(original_backend)
local endpoints = {}
for _, endpoint in ipairs(backend.endpoints) do
local ips = dns_lookup(endpoint.address)
for _, ip in ipairs(ips) do
table.insert(endpoints, { address = ip, port = endpoint.port })
end
end
backend.endpoints = endpoints
return backend
end
local function format_ipv6_endpoints(endpoints)
local formatted_endpoints = {}
for _, endpoint in ipairs(endpoints) do
local formatted_endpoint = endpoint
if not endpoint.address:match("^%d+.%d+.%d+.%d+$") then
formatted_endpoint.address = string.format("[%s]", endpoint.address)
end
table.insert(formatted_endpoints, formatted_endpoint)
end
return formatted_endpoints
end
local function sync_backend(backend)
if not backend.endpoints or #backend.endpoints == 0 then
balancers[backend.name] = nil
return
end
local implementation = get_implementation(backend)
local balancer = balancers[backend.name]
if not balancer then
balancers[backend.name] = implementation:new(backend)
return
end
-- every implementation is the metatable of its instances (see .new(...) functions)
-- here we check if `balancer` is the instance of `implementation`
-- if it is not then we deduce LB algorithm has changed for the backend
if getmetatable(balancer) ~= implementation then
ngx.log(ngx.INFO,
string.format("LB algorithm changed from %s to %s, resetting the instance",
balancer.name, implementation.name))
balancers[backend.name] = implementation:new(backend)
return
end
local service_type = backend.service and backend.service.spec
and backend.service.spec["type"]
if service_type == "ExternalName" then
backend = resolve_external_names(backend)
end
backend.endpoints = format_ipv6_endpoints(backend.endpoints)
balancer:sync(backend)
end
local function sync_backends()
local backends_data = configuration.get_backends_data()
if not backends_data then
balancers = {}
return
end
2019-01-18 03:46:14 +00:00
local new_backends, err = cjson.decode(backends_data)
if not new_backends then
ngx.log(ngx.ERR, "could not parse backends data: ", err)
return
end
local balancers_to_keep = {}
for _, new_backend in ipairs(new_backends) do
sync_backend(new_backend)
balancers_to_keep[new_backend.name] = balancers[new_backend.name]
end
for backend_name, _ in pairs(balancers) do
if not balancers_to_keep[backend_name] then
balancers[backend_name] = nil
end
end
end
local function route_to_alternative_balancer(balancer)
if not balancer.alternative_backends then
return false
end
-- TODO: support traffic shaping for n > 1 alternative backends
local backend_name = balancer.alternative_backends[1]
if not backend_name then
ngx.log(ngx.ERR, "empty alternative backend")
return false
end
local alternative_balancer = balancers[backend_name]
if not alternative_balancer then
ngx.log(ngx.ERR, "no alternative balancer for backend: ",
tostring(backend_name))
return false
end
local traffic_shaping_policy = alternative_balancer.traffic_shaping_policy
if not traffic_shaping_policy then
ngx.log(ngx.ERR, "traffic shaping policy is not set for balanacer ",
"of backend: ", tostring(backend_name))
return false
end
local target_header = util.replace_special_char(traffic_shaping_policy.header,
"-", "_")
local header = ngx.var["http_" .. target_header]
if header then
if traffic_shaping_policy.headerValue
and #traffic_shaping_policy.headerValue > 0 then
if traffic_shaping_policy.headerValue == header then
return true
end
elseif traffic_shaping_policy.headerPattern
and #traffic_shaping_policy.headerPattern > 0 then
local m, err = ngx.re.match(header, traffic_shaping_policy.headerPattern)
if m then
return true
elseif err then
ngx.log(ngx.ERR, "error when matching canary-by-header-pattern: '",
traffic_shaping_policy.headerPattern, "', error: ", err)
return false
end
elseif header == "always" then
return true
elseif header == "never" then
return false
end
end
local target_cookie = traffic_shaping_policy.cookie
local cookie = ngx.var["cookie_" .. target_cookie]
if cookie then
if cookie == "always" then
return true
elseif cookie == "never" then
return false
end
end
if math.random(100) <= traffic_shaping_policy.weight then
return true
end
return false
end
local function get_balancer()
2019-07-26 13:35:58 +00:00
if ngx.ctx.balancer then
return ngx.ctx.balancer
end
local backend_name = ngx.var.proxy_upstream_name
local balancer = balancers[backend_name]
if not balancer then
return
end
if route_to_alternative_balancer(balancer) then
local alternative_backend_name = balancer.alternative_backends[1]
ngx.var.proxy_alternative_upstream_name = alternative_backend_name
2019-07-26 13:35:58 +00:00
balancer = balancers[alternative_backend_name]
end
2019-07-26 14:18:31 +00:00
2019-07-26 13:35:58 +00:00
ngx.ctx.balancer = balancer
return balancer
end
function _M.init_worker()
-- when worker starts, sync backends without delay
-- we call it in timer because for endpoints that require
-- DNS resolution it needs to use socket which is not avalable in
-- init_worker phase
local ok, err = ngx.timer.at(0, sync_backends)
if not ok then
ngx.log(ngx.ERR, "failed to create timer: ", err)
end
ok, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if not ok then
ngx.log(ngx.ERR, "error when setting up timer.every for sync_backends: ", err)
end
end
function _M.rewrite()
local balancer = get_balancer()
if not balancer then
ngx.status = ngx.HTTP_SERVICE_UNAVAILABLE
return ngx.exit(ngx.status)
end
end
function _M.balance()
local balancer = get_balancer()
if not balancer then
return
end
local peer = balancer:balance()
if not peer then
ngx.log(ngx.WARN, "no peer was returned, balancer: " .. balancer.name)
return
2018-04-24 16:10:57 +00:00
end
2018-05-14 21:02:09 +00:00
ngx_balancer.set_more_tries(1)
local ok, err = ngx_balancer.set_current_peer(peer)
if not ok then
ngx.log(ngx.ERR, "error while setting current upstream peer ", peer,
": ", err)
end
end
function _M.log()
local balancer = get_balancer()
if not balancer then
return
end
if not balancer.after_balance then
return
end
balancer:after_balance()
end
setmetatable(_M, {__index = {
get_implementation = get_implementation,
sync_backend = sync_backend,
route_to_alternative_balancer = route_to_alternative_balancer,
get_balancer = get_balancer,
}})
return _M