local ngx_balancer = require("ngx.balancer") local json = require("cjson") local util = require("util") local dns_util = require("util.dns") local configuration = require("configuration") local round_robin = require("balancer.round_robin") local chash = require("balancer.chash") local sticky = require("balancer.sticky") local ewma = require("balancer.ewma") -- measured in seconds -- for an Nginx worker to pick up the new list of upstream peers -- it will take + BACKENDS_SYNC_INTERVAL local BACKENDS_SYNC_INTERVAL = 1 local DEFAULT_LB_ALG = "round_robin" local IMPLEMENTATIONS = { round_robin = round_robin, chash = chash, sticky = sticky, ewma = ewma, } local _M = {} local balancers = {} local function get_implementation(backend) local name = backend["load-balance"] or DEFAULT_LB_ALG if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then name = "sticky" elseif backend["upstream-hash-by"] then name = "chash" end local implementation = IMPLEMENTATIONS[name] if not implementation then ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG)) implementation = IMPLEMENTATIONS[DEFAULT_LB_ALG] end return implementation end local function resolve_external_names(original_backend) local backend = util.deepcopy(original_backend) local endpoints = {} for _, endpoint in ipairs(backend.endpoints) do local ips = dns_util.resolve(endpoint.address) for _, ip in ipairs(ips) do table.insert(endpoints, { address = ip, port = endpoint.port }) end end backend.endpoints = endpoints return backend end local function sync_backend(backend) local implementation = get_implementation(backend) local balancer = balancers[backend.name] if not balancer then balancers[backend.name] = implementation:new(backend) return end -- every implementation is the metatable of its instances (see .new(...) functions) -- here we check if `balancer` is the instance of `implementation` -- if it is not then we deduce LB algorithm has changed for the backend if getmetatable(balancer) ~= implementation then ngx.log(ngx.INFO, string.format("LB algorithm changed from %s to %s, resetting the instance", balancer.name, implementation.name)) balancers[backend.name] = implementation:new(backend) return end local service_type = backend.service and backend.service.spec and backend.service.spec["type"] if service_type == "ExternalName" then backend = resolve_external_names(backend) end balancer:sync(backend) end local function sync_backends() local backends_data = configuration.get_backends_data() if not backends_data then balancers = {} return end local ok, new_backends = pcall(json.decode, backends_data) if not ok then ngx.log(ngx.ERR, "could not parse backends data: " .. tostring(new_backends)) return end local balancers_to_keep = {} for _, new_backend in ipairs(new_backends) do sync_backend(new_backend) balancers_to_keep[new_backend.name] = balancers[new_backend.name] end for backend_name, _ in pairs(balancers) do if not balancers_to_keep[backend_name] then balancers[backend_name] = nil end end end local function get_balancer() local backend_name = ngx.var.proxy_upstream_name return balancers[backend_name] end function _M.init_worker() sync_backends() -- when worker starts, sync backends without delay local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends) if err then ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err))) end end function _M.rewrite() local balancer = get_balancer() if not balancer then ngx.status = ngx.HTTP_SERVICE_UNAVAILABLE return ngx.exit(ngx.status) end end function _M.balance() local balancer = get_balancer() if not balancer then return end local host, port = balancer:balance() if not (host and port) then ngx.log(ngx.WARN, string.format("host or port is missing, balancer: %s, host: %s, port: %s", balancer.name, host, port)) return end ngx_balancer.set_more_tries(1) local ok, err = ngx_balancer.set_current_peer(host, port) if not ok then ngx.log(ngx.ERR, "error while setting current upstream peer to " .. tostring(err)) end end function _M.log() local balancer = get_balancer() if not balancer then return end if not balancer.after_balance then return end balancer:after_balance() end if _TEST then _M.get_implementation = get_implementation _M.sync_backend = sync_backend end return _M