This commit is contained in:
Kir Shatrov 2023-07-20 23:57:14 -07:00 committed by GitHub
commit 16334619a7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 963 additions and 8 deletions

View file

@ -29,9 +29,10 @@ type upstreamhashby struct {
// Config contains the Consistent hash configuration to be used in the Ingress
type Config struct {
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor,omitempty"`
}
// NewParser creates a new UpstreamHashBy annotation parser
@ -44,10 +45,11 @@ func (a upstreamhashby) Parse(ing *networking.Ingress) (interface{}, error) {
upstreamHashBy, _ := parser.GetStringAnnotation("upstream-hash-by", ing)
upstreamHashBySubset, _ := parser.GetBoolAnnotation("upstream-hash-by-subset", ing)
upstreamHashbySubsetSize, _ := parser.GetIntAnnotation("upstream-hash-by-subset-size", ing)
upstreamHashByBalanceFactor, _ := parser.GetFloatAnnotation("upstream-hash-by-balance-factor", ing)
if upstreamHashbySubsetSize == 0 {
upstreamHashbySubsetSize = 3
}
return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize}, nil
return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize, upstreamHashByBalanceFactor}, nil
}

View file

@ -1069,6 +1069,7 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
upstreams[name].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy
upstreams[name].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset
upstreams[name].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize
upstreams[name].UpstreamHashBy.UpstreamHashByBalanceFactor = anns.UpstreamHashBy.UpstreamHashByBalanceFactor
upstreams[name].LoadBalancing = anns.LoadBalancing
if upstreams[name].LoadBalancing == "" {

View file

@ -132,6 +132,18 @@ type Backend struct {
// Default 3
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size"`
// Configures percentage of average cluster load to bound per upstream host.
// For example, with a value of 1.5 no upstream host will get a load more than 1.5x times
// the average load of all the hosts in the cluster.
//
// This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350
// This is an O(N) algorithm, unlike other load balancers.
// Using a lower hash_balance_factor results in more hosts being probed,
// so use a higher value if you require better performance.
//
// Defaults to 2 (meaning a host might be overloaded 2x compared to average)
UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor"`
// Let's us choose a load balancing algorithm per ingress
LoadBalancing string `json:"load-balance"`

View file

@ -168,9 +168,10 @@ type CookieSessionAffinity struct {
// UpstreamHashByConfig described setting from the upstream-hash-by* annotations.
type UpstreamHashByConfig struct {
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor,omitempty"`
}
// Endpoint describes a kubernetes endpoint in a backend

View file

@ -6,6 +6,7 @@ local configuration = require("configuration")
local round_robin = require("balancer.round_robin")
local chash = require("balancer.chash")
local chashsubset = require("balancer.chashsubset")
local chashboundedloads = require("balancer.chashboundedloads")
local sticky_balanced = require("balancer.sticky_balanced")
local sticky_persistent = require("balancer.sticky_persistent")
local ewma = require("balancer.ewma")
@ -29,6 +30,7 @@ local IMPLEMENTATIONS = {
round_robin = round_robin,
chash = chash,
chashsubset = chashsubset,
chashboundedloads = chashboundedloads,
sticky_balanced = sticky_balanced,
sticky_persistent = sticky_persistent,
ewma = ewma,
@ -55,7 +57,9 @@ local function get_implementation(backend)
elseif backend["upstreamHashByConfig"] and
backend["upstreamHashByConfig"]["upstream-hash-by"] then
if backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
if backend["upstreamHashByConfig"]["upstream-hash-by-balance-factor"] then
name = "chashboundedloads"
elseif backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
name = "chashsubset"
else
name = "chash"

View file

@ -0,0 +1,257 @@
-- Implements Consistent Hashing with Bounded Loads based on the paper [1].
-- For the specified hash-balance-factor, requests to any upstream host are capped
-- at hash_balance_factor times the average number of requests across the cluster.
-- When a request arrives for an upstream host that is currently serving at its max capacity,
-- linear probing is used to identify the next eligible host.
--
-- This is an O(N) algorithm, unlike other load balancers. Using a lower hash-balance-factor
-- results in more hosts being probed, so use a higher value if you require better performance.
--
-- [1]: https://arxiv.org/abs/1608.01350
local resty_roundrobin = require("resty.roundrobin")
local resty_chash = require("resty.chash")
local setmetatable = setmetatable
local lrucache = require("resty.lrucache")
local util = require("util")
local split = require("util.split")
local reverse_table = util.reverse_table
local string_format = string.format
local INFO = ngx.INFO
local ngx_ERR = ngx.ERR
local ngx_WARN = ngx.WARN
local ngx_log = ngx.log
local math_ceil = math.ceil
local ipairs = ipairs
local ngx = ngx
local DEFAULT_HASH_BALANCE_FACTOR = 2
local HOST_SEED = util.get_host_seed()
-- Controls how many "tenants" we'll keep track of
-- to avoid routing them to alternative_backends
-- as they were already consistently routed to some endpoint.
-- Lowering this value will increases the chances of more
-- tenants being routed to alternative_backends.
-- Similarly, increasing this value will keep more tenants
-- consistently routed to the same endpoint in the main backend.
local SEEN_LRU_SIZE = 1000
local _M = {}
local function incr_req_stats(self, endpoint)
if not self.requests_by_endpoint[endpoint] then
self.requests_by_endpoint[endpoint] = 1
else
self.requests_by_endpoint[endpoint] = self.requests_by_endpoint[endpoint] + 1
end
self.total_requests = self.total_requests + 1
end
local function decr_req_stats(self, endpoint)
if self.requests_by_endpoint[endpoint] then
self.requests_by_endpoint[endpoint] = self.requests_by_endpoint[endpoint] - 1
if self.requests_by_endpoint[endpoint] == 0 then
self.requests_by_endpoint[endpoint] = nil
end
end
self.total_requests = self.total_requests - 1
end
local function get_hash_by_value(self)
if not ngx.ctx.chash_hash_by_value then
ngx.ctx.chash_hash_by_value = util.generate_var_value(self.hash_by)
end
local v = ngx.ctx.chash_hash_by_value
if v == "" then
return nil
end
return v
end
local function endpoint_eligible(self, endpoint)
-- (num_requests * hash-balance-factor / num_servers)
local allowed = math_ceil(
(self.total_requests + 1) * self.balance_factor / self.total_endpoints)
local current = self.requests_by_endpoint[endpoint]
if current == nil then
return true, 0, allowed
else
return current < allowed, current, allowed
end
end
local function update_balance_factor(self, backend)
local balance_factor = backend["upstreamHashByConfig"]["upstream-hash-by-balance-factor"]
if balance_factor and balance_factor <= 1 then
ngx_log(ngx_WARN,
"upstream-hash-by-balance-factor must be > 1. Forcing it to the default value of ",
DEFAULT_HASH_BALANCE_FACTOR)
balance_factor = DEFAULT_HASH_BALANCE_FACTOR
end
self.balance_factor = balance_factor or DEFAULT_HASH_BALANCE_FACTOR
end
local function normalize_endpoints(endpoints)
local b = {}
for i, endpoint in ipairs(endpoints) do
b[i] = string_format("%s:%s", endpoint.address, endpoint.port)
end
return b
end
local function update_endpoints(self, endpoints)
self.endpoints = endpoints
self.endpoints_reverse = reverse_table(endpoints)
self.total_endpoints = #endpoints
self.ring_seed = util.array_mod(HOST_SEED, self.total_endpoints)
end
function _M.is_affinitized(self)
-- alternative_backends might contain a canary backend that gets a percentage of traffic.
-- If a tenant has already been consistently routed to a endpoint, we want to stick to that
-- to keep a higher cache ratio, rather than routing it to an alternative backend.
-- This would mean that alternative backends (== canary) would mostly be seeing "new" tenants.
if not self.alternative_backends or not self.alternative_backends[1] then
return false
end
local hash_by_value = get_hash_by_value(self)
if not hash_by_value then
return false
end
return self.seen_hash_by_values:get(hash_by_value) ~= nil
end
function _M.new(self, backend)
local nodes = util.get_nodes(backend.endpoints)
local complex_val, err =
util.parse_complex_value(backend["upstreamHashByConfig"]["upstream-hash-by"])
if err ~= nil then
ngx_log(ngx_ERR, "could not parse the value of the upstream-hash-by: ", err)
end
local o = {
name = "chashboundedloads",
chash = resty_chash:new(nodes),
roundrobin = resty_roundrobin:new(nodes),
alternative_backends = backend.alternativeBackends,
hash_by = complex_val,
requests_by_endpoint = {},
total_requests = 0,
seen_hash_by_values = lrucache.new(SEEN_LRU_SIZE)
}
update_endpoints(o, normalize_endpoints(backend.endpoints))
update_balance_factor(o, backend)
setmetatable(o, self)
self.__index = self
return o
end
function _M.sync(self, backend)
self.alternative_backends = backend.alternativeBackends
update_balance_factor(self, backend)
local new_endpoints = normalize_endpoints(backend.endpoints)
if util.deep_compare(self.endpoints, new_endpoints) then
ngx_log(INFO, "endpoints did not change for backend", backend.name)
return
end
ngx_log(INFO, string_format("[%s] endpoints have changed for backend %s",
self.name, backend.name))
update_endpoints(self, new_endpoints)
local nodes = util.get_nodes(backend.endpoints)
self.chash:reinit(nodes)
self.roundrobin:reinit(nodes)
self.seen_hash_by_values = lrucache.new(SEEN_LRU_SIZE)
ngx_log(INFO, string_format("[%s] nodes have changed for backend %s", self.name, backend.name))
end
function _M.balance(self)
local hash_by_value = get_hash_by_value(self)
-- Tenant key not available, falling back to round-robin
if not hash_by_value then
local endpoint = self.roundrobin:find()
ngx.var.chashbl_debug = "fallback_round_robin"
return endpoint
end
self.seen_hash_by_values:set(hash_by_value, true)
local tried_endpoints
if not ngx.ctx.balancer_chashbl_tried_endpoints then
tried_endpoints = {}
ngx.ctx.balancer_chashbl_tried_endpoints = tried_endpoints
else
tried_endpoints = ngx.ctx.balancer_chashbl_tried_endpoints
end
local first_endpoint = self.chash:find(hash_by_value)
local index = self.endpoints_reverse[first_endpoint]
-- By design, resty.chash always points to the same element of the ring,
-- regardless of the environment. In this algorithm, we want the consistency
-- to be "seeded" based on the host where it's running.
-- That's how both Envoy and Haproxy implement this.
-- For convenience, we keep resty.chash but manually introduce the seed.
index = util.array_mod(index + self.ring_seed, self.total_endpoints)
for i=0, self.total_endpoints-1 do
local j = util.array_mod(index + i, self.total_endpoints)
local endpoint = self.endpoints[j]
if not tried_endpoints[endpoint] then
local eligible, current, allowed = endpoint_eligible(self, endpoint)
if eligible then
ngx.var.chashbl_debug = string_format(
"attempt=%d score=%d allowed=%d total_requests=%d hash_by_value=%s",
i, current, allowed, self.total_requests, hash_by_value)
incr_req_stats(self, endpoint)
tried_endpoints[endpoint] = true
return endpoint
end
end
end
-- Normally, this case should never be reach out because with balance_factor > 1
-- there should always be an eligible endpoint.
-- This would get reached only if the number of endpoints is less or equal
-- than max Nginx retries and tried_endpoints contains all endpoints.
incr_req_stats(self, first_endpoint)
ngx.var.chashbl_debug = "fallback_first_endpoint"
return first_endpoint
end
function _M.after_balance(self)
local tried_upstreams = split.split_upstream_var(ngx.var.upstream_addr)
if (not tried_upstreams) or (not get_hash_by_value(self)) then
return
end
for _, addr in ipairs(tried_upstreams) do
decr_req_stats(self, addr)
end
end
return _M

View file

@ -0,0 +1,294 @@
local original_ngx = ngx
local function reset_ngx()
_G.ngx = original_ngx
end
describe("Balancer chashboundedloads", function()
local balancer_chashboundedloads, backend, instance
local function endpoint_for_hash(instance, hash_by_value, offset)
if offset == nil then offset = 0 end
local first_endpoint = instance.chash:find(hash_by_value)
local index = instance.endpoints_reverse[first_endpoint]
index = util.array_mod(index + instance.ring_seed + offset, instance.total_endpoints)
return instance.endpoints[index]
end
before_each(function()
util = require_without_cache("util")
util.get_hostname = function()
return "test-host"
end
balancer_chashboundedloads = require_without_cache("balancer.chashboundedloads")
backend = {
name = "namespace-service-port", ["load-balance"] = "ewma",
upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by-balance-factor"] = 2, ["upstream-hash-by"] = "$request_uri" },
endpoints = {
{ address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
ngx.header = {}
ngx.req = {
get_uri_args = function()
return {}
end
}
instance = balancer_chashboundedloads:new(backend)
end)
after_each(function()
reset_ngx()
ngx.var = {}
ngx.ctx = {}
end)
it("sets balance_factor", function()
backend = {
name = "namespace-service-port", ["load-balance"] = "ewma",
upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by-balance-factor"] = 2.5, ["upstream-hash-by"] = "$request_uri" },
endpoints = {
{ address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
instance = balancer_chashboundedloads:new(backend)
assert.are.equals(2.5, instance.balance_factor)
end)
it("does not allow balance_factor <= 1", function()
local new_backend = util.deepcopy(backend)
new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 1
instance = balancer_chashboundedloads:new(new_backend)
assert.are.equals(2, instance.balance_factor)
new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 0.1
instance = balancer_chashboundedloads:new(new_backend)
assert.are.equals(2, instance.balance_factor)
end)
it("sets default balance factor", function()
backend = {
name = "namespace-service-port", ["load-balance"] = "ewma",
upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by"] = "$request_uri" },
endpoints = {
{ address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
instance = balancer_chashboundedloads:new(backend)
assert.are.equals(2, instance.balance_factor)
end)
it("uses round-robin and does not touch counters when hash_by value is missing", function()
ngx.var = { request_uri = nil }
instance.roundrobin = {
find = function(self)
return "some-round-robin-endpoint"
end
}
local endpoint = instance:balance()
assert.are.equals("some-round-robin-endpoint", endpoint)
assert.are.same({}, instance.requests_by_endpoint)
assert.are.equals(0, instance.total_requests)
instance:after_balance()
assert.are.same({}, instance.requests_by_endpoint)
assert.are.equals(0, instance.total_requests)
end)
it("skips tried endpoint", function()
ngx.var = { request_uri = "/alma/armud" }
local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud")
local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1)
ngx.ctx.balancer_chashbl_tried_endpoints = {[expected_first_endpoint]=true}
local endpoint = instance:balance()
assert.are.equals(expected_second_endpoint, endpoint)
assert.are.same({[expected_second_endpoint] = 1}, instance.requests_by_endpoint)
assert.are.equals(1, instance.total_requests)
end)
it("after_balance decrements all tried endpoints", function()
local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud")
local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1)
instance.requests_by_endpoint[expected_first_endpoint] = 1
instance.requests_by_endpoint[expected_second_endpoint] = 1
instance.total_requests = 2
ngx.var = { request_uri = "/alma/armud", upstream_addr = expected_first_endpoint .. " : " .. expected_second_endpoint }
instance:after_balance()
assert.are.same({}, instance.requests_by_endpoint)
assert.are.equals(0, instance.total_requests)
end)
it("spills over", function()
ngx.var = { request_uri = "/alma/armud" }
local endpoint = instance:balance()
local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud")
local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1)
assert.are.equals(expected_first_endpoint, endpoint)
assert.are.same({[expected_first_endpoint] = 1}, instance.requests_by_endpoint)
assert.are.equals(1, instance.total_requests)
ngx.ctx.balancer_chashbl_tried_endpoints = nil
local endpoint = instance:balance()
assert.are.equals(expected_first_endpoint, endpoint)
assert.are.same({[expected_first_endpoint] = 2}, instance.requests_by_endpoint)
assert.are.equals(2, instance.total_requests)
ngx.ctx.balancer_chashbl_tried_endpoints = nil
local endpoint = instance:balance()
assert.are.equals(expected_second_endpoint, endpoint)
assert.are.same({[expected_first_endpoint] = 2, [expected_second_endpoint] = 1}, instance.requests_by_endpoint)
assert.are.equals(3, instance.total_requests)
end)
it("balances and keeps track of requests", function()
local expected_endpoint = endpoint_for_hash(instance, "/alma/armud")
ngx.var = { request_uri = "/alma/armud" }
local endpoint = instance:balance()
assert.are.equals(expected_endpoint, endpoint)
assert.are.same({[expected_endpoint] = 1}, instance.requests_by_endpoint)
assert.are.equals(1, instance.total_requests)
ngx.var = { upstream_addr = endpoint }
instance:after_balance()
assert.are.same({}, instance.requests_by_endpoint)
assert.are.equals(0, instance.total_requests)
end)
it("starts from the beginning of the ring if first_endpoints points to the end of ring", function()
instance.chash = {
find = function(self, key)
return "10.10.10.3:8080"
end
}
instance.requests_by_endpoint["10.10.10.3:8080"] = 2
instance.total_requests = 2
instance.ring_seed = 0
ngx.var = { request_uri = "/alma/armud" }
local endpoint = instance:balance()
assert.are.equals("10.10.10.1:8080", endpoint)
end)
it("balances to the first when all endpoints have identical load", function()
instance.requests_by_endpoint["10.10.10.1:8080"] = 2
instance.requests_by_endpoint["10.10.10.2:8080"] = 2
instance.requests_by_endpoint["10.10.10.3:8080"] = 2
instance.total_requests = 6
local expected_endpoint = endpoint_for_hash(instance, "/alma/armud")
ngx.var = { request_uri = "/alma/armud" }
local endpoint = instance:balance()
assert.are.equals(expected_endpoint, endpoint)
end)
describe("is_affinitized()", function()
it("returns false is alternative_backends is empty", function()
instance.alternative_backends = nil
assert.is_false(instance:is_affinitized())
instance.alternative_backends = {}
assert.is_false(instance:is_affinitized())
end)
it("returns false if tenant was not seen", function()
ngx.var = { request_uri = "/alma/armud" }
instance.alternative_backends = {"omglol"}
assert.is_false(instance:is_affinitized())
end)
it("returns true if tenant was seen", function()
ngx.var = { request_uri = "/alma/armud" }
instance.alternative_backends = {"omglol"}
instance.seen_hash_by_values:set("/alma/armud", true)
assert.is_true(instance:is_affinitized())
end)
end)
describe("sync()", function()
it("updates endpoints and total_endpoints", function()
local new_backend = util.deepcopy(backend)
new_backend.endpoints[4] = { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 },
assert.are.same({"10.10.10.1:8080", "10.10.10.2:8080", "10.10.10.3:8080"}, instance.endpoints)
assert.are.equal(3, instance.total_endpoints)
assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 2, ["10.10.10.3:8080"] = 3}, instance.endpoints_reverse)
instance:sync(new_backend)
assert.are.same({"10.10.10.1:8080", "10.10.10.2:8080", "10.10.10.3:8080", "10.10.10.4:8080"}, instance.endpoints)
assert.are.equal(4, instance.total_endpoints)
assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 2, ["10.10.10.3:8080"] = 3, ["10.10.10.4:8080"] = 4}, instance.endpoints_reverse)
local expected_seed = util.array_mod(util.hash_string(util.get_hostname()), instance.total_endpoints)
assert.are.equal(expected_seed, instance.ring_seed)
end)
it("updates chash and roundrobin", function()
instance.roundrobin = {
reinit = function(self, nodes)
self.nodes = nodes
end
}
instance.chash = {
reinit = function(self, nodes)
self.nodes = nodes
end
}
local new_backend = util.deepcopy(backend)
new_backend.endpoints[4] = { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 },
assert.are.equal(3, instance.total_endpoints)
instance:sync(new_backend)
assert.are.equal(4, instance.total_endpoints)
assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 1,["10.10.10.4:8080"] = 1,["10.10.10.3:8080"] = 1}, instance.roundrobin.nodes)
assert.are.same(instance.roundrobin.nodes, instance.chash.nodes)
end)
it("updates balance-factor", function()
local new_backend = util.deepcopy(backend)
new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 4
instance:sync(new_backend)
assert.are.equal(4, instance.balance_factor)
end)
end)
end)

View file

@ -0,0 +1,294 @@
local original_ngx = ngx
local function reset_ngx()
_G.ngx = original_ngx
end
describe("Balancer chashboundedloads", function()
local balancer_chashboundedloads, backend, instance
local function endpoint_for_hash(instance, hash_by_value, offset)
if offset == nil then offset = 0 end
local first_endpoint = instance.chash:find(hash_by_value)
local index = instance.endpoints_reverse[first_endpoint]
index = util.array_mod(index + instance.ring_seed + offset, instance.total_endpoints)
return instance.endpoints[index]
end
before_each(function()
util = require_without_cache("util")
util.get_hostname = function()
return "test-host"
end
balancer_chashboundedloads = require_without_cache("balancer.chashboundedloads")
backend = {
name = "namespace-service-port", ["load-balance"] = "ewma",
upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by-balance-factor"] = 2, ["upstream-hash-by"] = "$request_uri" },
endpoints = {
{ address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
ngx.header = {}
ngx.req = {
get_uri_args = function()
return {}
end
}
instance = balancer_chashboundedloads:new(backend)
end)
after_each(function()
reset_ngx()
ngx.var = {}
ngx.ctx = {}
end)
it("sets balance_factor", function()
backend = {
name = "namespace-service-port", ["load-balance"] = "ewma",
upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by-balance-factor"] = 2.5, ["upstream-hash-by"] = "$request_uri" },
endpoints = {
{ address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
instance = balancer_chashboundedloads:new(backend)
assert.are.equals(2.5, instance.balance_factor)
end)
it("does not allow balance_factor <= 1", function()
local new_backend = util.deepcopy(backend)
new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 1
instance = balancer_chashboundedloads:new(new_backend)
assert.are.equals(2, instance.balance_factor)
new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 0.1
instance = balancer_chashboundedloads:new(new_backend)
assert.are.equals(2, instance.balance_factor)
end)
it("sets default balance factor", function()
backend = {
name = "namespace-service-port", ["load-balance"] = "ewma",
upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by"] = "$request_uri" },
endpoints = {
{ address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 },
{ address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 },
}
}
instance = balancer_chashboundedloads:new(backend)
assert.are.equals(2, instance.balance_factor)
end)
it("uses round-robin and does not touch counters when hash_by value is missing", function()
ngx.var = { request_uri = nil }
instance.roundrobin = {
find = function(self)
return "some-round-robin-endpoint"
end
}
local endpoint = instance:balance()
assert.are.equals("some-round-robin-endpoint", endpoint)
assert.are.same({}, instance.requests_by_endpoint)
assert.are.equals(0, instance.total_requests)
instance:after_balance()
assert.are.same({}, instance.requests_by_endpoint)
assert.are.equals(0, instance.total_requests)
end)
it("skips tried endpoint", function()
ngx.var = { request_uri = "/alma/armud" }
local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud")
local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1)
ngx.ctx.balancer_chashbl_tried_endpoints = {[expected_first_endpoint]=true}
local endpoint = instance:balance()
assert.are.equals(expected_second_endpoint, endpoint)
assert.are.same({[expected_second_endpoint] = 1}, instance.requests_by_endpoint)
assert.are.equals(1, instance.total_requests)
end)
it("after_balance decrements all tried endpoints", function()
local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud")
local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1)
instance.requests_by_endpoint[expected_first_endpoint] = 1
instance.requests_by_endpoint[expected_second_endpoint] = 1
instance.total_requests = 2
ngx.var = { request_uri = "/alma/armud", upstream_addr = expected_first_endpoint .. " : " .. expected_second_endpoint }
instance:after_balance()
assert.are.same({}, instance.requests_by_endpoint)
assert.are.equals(0, instance.total_requests)
end)
it("spills over", function()
ngx.var = { request_uri = "/alma/armud" }
local endpoint = instance:balance()
local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud")
local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1)
assert.are.equals(expected_first_endpoint, endpoint)
assert.are.same({[expected_first_endpoint] = 1}, instance.requests_by_endpoint)
assert.are.equals(1, instance.total_requests)
ngx.ctx.balancer_chashbl_tried_endpoints = nil
local endpoint = instance:balance()
assert.are.equals(expected_first_endpoint, endpoint)
assert.are.same({[expected_first_endpoint] = 2}, instance.requests_by_endpoint)
assert.are.equals(2, instance.total_requests)
ngx.ctx.balancer_chashbl_tried_endpoints = nil
local endpoint = instance:balance()
assert.are.equals(expected_second_endpoint, endpoint)
assert.are.same({[expected_first_endpoint] = 2, [expected_second_endpoint] = 1}, instance.requests_by_endpoint)
assert.are.equals(3, instance.total_requests)
end)
it("balances and keeps track of requests", function()
local expected_endpoint = endpoint_for_hash(instance, "/alma/armud")
ngx.var = { request_uri = "/alma/armud" }
local endpoint = instance:balance()
assert.are.equals(expected_endpoint, endpoint)
assert.are.same({[expected_endpoint] = 1}, instance.requests_by_endpoint)
assert.are.equals(1, instance.total_requests)
ngx.var = { upstream_addr = endpoint }
instance:after_balance()
assert.are.same({}, instance.requests_by_endpoint)
assert.are.equals(0, instance.total_requests)
end)
it("starts from the beginning of the ring if first_endpoints points to the end of ring", function()
instance.chash = {
find = function(self, key)
return "10.10.10.3:8080"
end
}
instance.requests_by_endpoint["10.10.10.3:8080"] = 2
instance.total_requests = 2
instance.ring_seed = 0
ngx.var = { request_uri = "/alma/armud" }
local endpoint = instance:balance()
assert.are.equals("10.10.10.1:8080", endpoint)
end)
it("balances to the first when all endpoints have identical load", function()
instance.requests_by_endpoint["10.10.10.1:8080"] = 2
instance.requests_by_endpoint["10.10.10.2:8080"] = 2
instance.requests_by_endpoint["10.10.10.3:8080"] = 2
instance.total_requests = 6
local expected_endpoint = endpoint_for_hash(instance, "/alma/armud")
ngx.var = { request_uri = "/alma/armud" }
local endpoint = instance:balance()
assert.are.equals(expected_endpoint, endpoint)
end)
describe("is_affinitized()", function()
it("returns false is alternative_backends is empty", function()
instance.alternative_backends = nil
assert.is_false(instance:is_affinitized())
instance.alternative_backends = {}
assert.is_false(instance:is_affinitized())
end)
it("returns false if tenant was not seen", function()
ngx.var = { request_uri = "/alma/armud" }
instance.alternative_backends = {"omglol"}
assert.is_false(instance:is_affinitized())
end)
it("returns true if tenant was seen", function()
ngx.var = { request_uri = "/alma/armud" }
instance.alternative_backends = {"omglol"}
instance.seen_hash_by_values:set("/alma/armud", true)
assert.is_true(instance:is_affinitized())
end)
end)
describe("sync()", function()
it("updates endpoints and total_endpoints", function()
local new_backend = util.deepcopy(backend)
new_backend.endpoints[4] = { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 },
assert.are.same({"10.10.10.1:8080", "10.10.10.2:8080", "10.10.10.3:8080"}, instance.endpoints)
assert.are.equal(3, instance.total_endpoints)
assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 2, ["10.10.10.3:8080"] = 3}, instance.endpoints_reverse)
instance:sync(new_backend)
assert.are.same({"10.10.10.1:8080", "10.10.10.2:8080", "10.10.10.3:8080", "10.10.10.4:8080"}, instance.endpoints)
assert.are.equal(4, instance.total_endpoints)
assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 2, ["10.10.10.3:8080"] = 3, ["10.10.10.4:8080"] = 4}, instance.endpoints_reverse)
local expected_seed = util.array_mod(util.hash_string(util.get_hostname()), instance.total_endpoints)
assert.are.equal(expected_seed, instance.ring_seed)
end)
it("updates chash and roundrobin", function()
instance.roundrobin = {
reinit = function(self, nodes)
self.nodes = nodes
end
}
instance.chash = {
reinit = function(self, nodes)
self.nodes = nodes
end
}
local new_backend = util.deepcopy(backend)
new_backend.endpoints[4] = { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 },
assert.are.equal(3, instance.total_endpoints)
instance:sync(new_backend)
assert.are.equal(4, instance.total_endpoints)
assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 1,["10.10.10.4:8080"] = 1,["10.10.10.3:8080"] = 1}, instance.roundrobin.nodes)
assert.are.same(instance.roundrobin.nodes, instance.chash.nodes)
end)
it("updates balance-factor", function()
local new_backend = util.deepcopy(backend)
new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 4
instance:sync(new_backend)
assert.are.equal(4, instance.balance_factor)
end)
end)
end)

View file

@ -102,4 +102,34 @@ describe("utility", function()
assert.are.same({ "10.10.10.2:8080" }, removed)
end)
end)
describe("hash_string", function()
it("returns same seed for same string", function()
local seed1 = util.hash_string("test")
local seed2 = util.hash_string("test")
assert.are.same(seed1, seed2)
end)
it("does not overflow on a typical hostname", function()
local seed = util.hash_string("nginx-ingress-controller-6f7f95d6c-kvlvj")
assert.is_true(seed < 2^31)
end)
it("returns different seed for different string", function()
local seed1 = util.hash_string("test")
local seed2 = util.hash_string("test2")
assert.are_not.same(seed1, seed2)
end)
end)
describe("get_host_seed", function()
it("returns same seed for subsequent calls", function()
local seed1 = util.get_host_seed()
local seed2 = util.get_host_seed()
assert.are.same(seed1, seed2)
end)
end)
end)

View file

@ -10,6 +10,7 @@ local type = type
local next = next
local table = table
local re_gmatch = ngx.re.gmatch
local io = io
local _M = {}
@ -92,6 +93,14 @@ local function normalize_endpoints(endpoints)
return normalized_endpoints
end
function _M.reverse_table(t)
local b = {}
for k, v in ipairs(t) do
b[v] = k
end
return b
end
-- diff_endpoints compares old and new
-- and as a first argument returns what endpoints are in new
-- but are not in old, and as a second argument it returns
@ -180,4 +189,45 @@ local function replace_special_char(str, a, b)
end
_M.replace_special_char = replace_special_char
local function get_hostname()
local f = io.popen("/bin/hostname")
if f ~= nil then
local h = f:read("*a") or ""
h = string.gsub(h, "[\n]", "")
f:close()
return h
else
return "unknown"
end
end
_M.get_hostname = get_hostname
local MAX_HASH_NUM = 2^31-1
local function hash_string(str)
local hash = 0
for i = 1, string.len(str) do
hash = 31 * hash + string.byte(str, i)
if hash > MAX_HASH_NUM then
hash = hash % MAX_HASH_NUM
end
end
return hash
end
_M.hash_string = hash_string
local function get_host_seed()
return hash_string(_M.get_hostname())
end
_M.get_host_seed = get_host_seed
local function array_mod(i, max)
return (i - 1) % max + 1
end
_M.array_mod = array_mod
return _M

View file

@ -99,4 +99,14 @@ var _ = framework.DescribeAnnotation("upstream-hash-by-*", func() {
podMap := startIngress(f, annotations)
assert.Equal(ginkgo.GinkgoT(), len(podMap), 3)
})
ginkgo.It("should connect to the same pod with bounded loads", func() {
annotations := map[string]string{
"nginx.ingress.kubernetes.io/upstream-hash-by": "$request_uri",
"nginx.ingress.kubernetes.io/upstream-hash-by-balance-factor": "1.5",
}
podMap := startIngress(f, annotations)
assert.Equal(ginkgo.GinkgoT(), len(podMap), 1)
})
})