Merge 6e6d331bd9
into de1a4c463c
This commit is contained in:
commit
208fea9626
2 changed files with 48 additions and 0 deletions
|
@ -4,18 +4,35 @@
|
||||||
-- be rebalanced.
|
-- be rebalanced.
|
||||||
--
|
--
|
||||||
local balancer_sticky = require("balancer.sticky")
|
local balancer_sticky = require("balancer.sticky")
|
||||||
|
local balancer_round_robin = require("balancer.round_robin")
|
||||||
|
local balancer_chash = require("balancer.chash")
|
||||||
|
local balancer_chashsubset = require("balancer.chashsubset")
|
||||||
|
local balancer_ewma = require("balancer.ewma")
|
||||||
local util_get_nodes = require("util").get_nodes
|
local util_get_nodes = require("util").get_nodes
|
||||||
local util_nodemap = require("util.nodemap")
|
local util_nodemap = require("util.nodemap")
|
||||||
local setmetatable = setmetatable
|
local setmetatable = setmetatable
|
||||||
|
|
||||||
|
local function get_secondary_balancer(backend)
|
||||||
|
local name = backend["load-balance"]
|
||||||
|
|
||||||
|
if not name then return nil
|
||||||
|
elseif name == "chash" then return balancer_chash:new(backend)
|
||||||
|
elseif name == "chashsubset" then return balancer_chashsubset:new(backend)
|
||||||
|
elseif name == "round_robin" then return balancer_round_robin:new(backend)
|
||||||
|
elseif name == "ewma" then return balancer_ewma:new(backend)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
local _M = balancer_sticky:new()
|
local _M = balancer_sticky:new()
|
||||||
|
|
||||||
function _M.new(self, backend)
|
function _M.new(self, backend)
|
||||||
local nodes = util_get_nodes(backend.endpoints)
|
local nodes = util_get_nodes(backend.endpoints)
|
||||||
local hash_salt = backend["name"]
|
local hash_salt = backend["name"]
|
||||||
|
local secondary_balancer = get_secondary_balancer(backend)
|
||||||
|
|
||||||
local o = {
|
local o = {
|
||||||
name = "sticky_persistent",
|
name = "sticky_persistent",
|
||||||
|
secondary_balancer = secondary_balancer,
|
||||||
instance = util_nodemap:new(nodes, hash_salt)
|
instance = util_nodemap:new(nodes, hash_salt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,7 +45,29 @@ function _M.new(self, backend)
|
||||||
end
|
end
|
||||||
|
|
||||||
function _M.pick_new_upstream(self, failed_upstreams)
|
function _M.pick_new_upstream(self, failed_upstreams)
|
||||||
|
if self.secondary_balancer then
|
||||||
|
local endpoint = self.secondary_balancer:balance()
|
||||||
|
local key = self.instance:key_from_endpoint(endpoint)
|
||||||
|
if endpoint and key then
|
||||||
|
return endpoint, key
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
return self.instance:random_except(failed_upstreams)
|
return self.instance:random_except(failed_upstreams)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function _M.sync(self, backend)
|
||||||
|
-- sync inherited balancer
|
||||||
|
balancer_sticky.sync(self, backend)
|
||||||
|
|
||||||
|
-- note this may be inefficient
|
||||||
|
-- perhaps better to only update if name changes?
|
||||||
|
self.secondary_balancer = get_secondary_balancer(backend)
|
||||||
|
|
||||||
|
-- sync secondary_balancer as well
|
||||||
|
if self.secondary_balancer then
|
||||||
|
self.secondary_balancer:sync(backend)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
return _M
|
return _M
|
||||||
|
|
|
@ -122,4 +122,13 @@ function _M.random_except(self, ignore_nodes)
|
||||||
return get_random_node(valid_nodes)
|
return get_random_node(valid_nodes)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
--- find the key of a given endpoint
|
||||||
|
-- @tparam string endpoint The endpoint.
|
||||||
|
-- @treturn string The key for endpoint or nil.
|
||||||
|
function _M.key_from_endpoint(self, endpoint)
|
||||||
|
for k, v in pairs(self.map) do
|
||||||
|
if v==endpoint then return k end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
return _M
|
return _M
|
||||||
|
|
Loading…
Reference in a new issue