Merge pull request #3396 from flugel-it/master

New balancer implementation: consistent hash subset
This commit is contained in:
Kubernetes Prow Robot 2019-01-04 10:31:03 -08:00 committed by GitHub
commit 2c3ce07135
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 434 additions and 17 deletions

View file

@ -0,0 +1,70 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginxhello
spec:
replicas: 10
selector:
matchLabels:
app: nginxhello
template:
metadata:
labels:
app: nginxhello
spec:
containers:
- name: nginxhello
image: gcr.io/kubernetes-e2e-test-images/echoserver:2.2
ports:
- containerPort: 8080
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
---
kind: Service
apiVersion: v1
metadata:
name: nginxhello
labels:
app: nginxhello
spec:
selector:
app: nginxhello
ports:
- name: http
port: 80
targetPort: 8080
---
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
annotations:
nginx.ingress.kubernetes.io/upstream-hash-by: "$arg_predictorid"
nginx.ingress.kubernetes.io/upstream-hash-by-subset: "true"
nginx.ingress.kubernetes.io/upstream-hash-by-subset-size: "3"
name: nginxhello-ingress
namespace: default
spec:
backend:
serviceName: nginxhello
servicePort: 80

View file

@ -186,10 +186,16 @@ nginx.ingress.kubernetes.io/auth-realm: "realm string"
NGINX supports load balancing by client-server mapping based on [consistent hashing](http://nginx.org/en/docs/http/ngx_http_upstream_module.html#hash) for a given key. The key can contain text, variables or any combination thereof. This feature allows for request stickiness other than client IP or cookies. The [ketama](http://www.last.fm/user/RJ/journal/2007/04/10/392555/) consistent hashing method will be used which ensures only a few keys would be remapped to different servers on upstream group changes. NGINX supports load balancing by client-server mapping based on [consistent hashing](http://nginx.org/en/docs/http/ngx_http_upstream_module.html#hash) for a given key. The key can contain text, variables or any combination thereof. This feature allows for request stickiness other than client IP or cookies. The [ketama](http://www.last.fm/user/RJ/journal/2007/04/10/392555/) consistent hashing method will be used which ensures only a few keys would be remapped to different servers on upstream group changes.
There is a special mode of upstream hashing called subset. In this mode, upstream servers are grouped into subsets, and stickiness works by mapping keys to a subset instead of individual upstream servers. Specific server is chosen uniformly at random from the selected sticky subset. It provides a balance between stickiness and load distribution.
To enable consistent hashing for a backend: To enable consistent hashing for a backend:
`nginx.ingress.kubernetes.io/upstream-hash-by`: the nginx variable, text value or any combination thereof to use for consistent hashing. For example `nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"` to consistently hash upstream requests by the current request URI. `nginx.ingress.kubernetes.io/upstream-hash-by`: the nginx variable, text value or any combination thereof to use for consistent hashing. For example `nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"` to consistently hash upstream requests by the current request URI.
"subset" hashing can be enabled setting `nginx.ingress.kubernetes.io/upstream-hash-by-subset`: "true". This maps requests to subset of nodes instead of a single one. `upstream-hash-by-subset-size` determines the size of each subset (default 3).
Please check the [chashsubset](../../examples/chashsubset/deployment.yaml) example.
### Custom NGINX load balancing ### Custom NGINX load balancing
This is similar to [`load-balance` in ConfigMap](./configmap.md#load-balance), but configures load balancing algorithm per ingress. This is similar to [`load-balance` in ConfigMap](./configmap.md#load-balance), but configures load balancing algorithm per ingress.

View file

@ -92,7 +92,7 @@ type Ingress struct {
SessionAffinity sessionaffinity.Config SessionAffinity sessionaffinity.Config
SSLPassthrough bool SSLPassthrough bool
UsePortInRedirects bool UsePortInRedirects bool
UpstreamHashBy string UpstreamHashBy upstreamhashby.Config
LoadBalancing string LoadBalancing string
UpstreamVhost string UpstreamVhost string
Whitelist ipwhitelist.SourceRange Whitelist ipwhitelist.SourceRange

View file

@ -186,7 +186,7 @@ func TestUpstreamHashBy(t *testing.T) {
for _, foo := range fooAnns { for _, foo := range fooAnns {
ing.SetAnnotations(foo.annotations) ing.SetAnnotations(foo.annotations)
r := ec.Extract(ing).UpstreamHashBy r := ec.Extract(ing).UpstreamHashBy.UpstreamHashBy
if r != foo.er { if r != foo.er {
t.Errorf("Returned %v but expected %v", r, foo.er) t.Errorf("Returned %v but expected %v", r, foo.er)
} }

View file

@ -27,14 +27,27 @@ type upstreamhashby struct {
r resolver.Resolver r resolver.Resolver
} }
// NewParser creates a new CORS annotation parser // Config contains the Consistent hash configuration to be used in the Ingress
type Config struct {
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
}
// NewParser creates a new UpstreamHashBy annotation parser
func NewParser(r resolver.Resolver) parser.IngressAnnotation { func NewParser(r resolver.Resolver) parser.IngressAnnotation {
return upstreamhashby{r} return upstreamhashby{r}
} }
// Parse parses the annotations contained in the ingress rule // Parse parses the annotations contained in the ingress rule
// used to indicate if the location/s contains a fragment of
// configuration to be included inside the paths of the rules
func (a upstreamhashby) Parse(ing *extensions.Ingress) (interface{}, error) { func (a upstreamhashby) Parse(ing *extensions.Ingress) (interface{}, error) {
return parser.GetStringAnnotation("upstream-hash-by", ing) upstreamHashBy, _ := parser.GetStringAnnotation("upstream-hash-by", ing)
upstreamHashBySubset, _ := parser.GetBoolAnnotation("upstream-hash-by-subset", ing)
upstreamHashbySubsetSize, _ := parser.GetIntAnnotation("upstream-hash-by-subset-size", ing)
if upstreamHashbySubsetSize == 0 {
upstreamHashbySubsetSize = 3
}
return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize}, nil
} }

View file

@ -55,7 +55,12 @@ func TestParse(t *testing.T) {
for _, testCase := range testCases { for _, testCase := range testCases {
ing.SetAnnotations(testCase.annotations) ing.SetAnnotations(testCase.annotations)
result, _ := ap.Parse(ing) result, _ := ap.Parse(ing)
if result != testCase.expected { uc, ok := result.(*Config)
if !ok {
t.Fatalf("expected a Config type")
}
if uc.UpstreamHashBy != testCase.expected {
t.Errorf("expected %v but returned %v, annotations: %s", testCase.expected, result, testCase.annotations) t.Errorf("expected %v but returned %v, annotations: %s", testCase.expected, result, testCase.annotations)
} }
} }

View file

@ -661,9 +661,13 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
if upstreams[defBackend].SecureCACert.Secret == "" { if upstreams[defBackend].SecureCACert.Secret == "" {
upstreams[defBackend].SecureCACert = anns.SecureUpstream.CACert upstreams[defBackend].SecureCACert = anns.SecureUpstream.CACert
} }
if upstreams[defBackend].UpstreamHashBy == "" {
upstreams[defBackend].UpstreamHashBy = anns.UpstreamHashBy if upstreams[defBackend].UpstreamHashBy.UpstreamHashBy == "" {
upstreams[defBackend].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy
upstreams[defBackend].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset
upstreams[defBackend].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize
} }
if upstreams[defBackend].LoadBalancing == "" { if upstreams[defBackend].LoadBalancing == "" {
upstreams[defBackend].LoadBalancing = anns.LoadBalancing upstreams[defBackend].LoadBalancing = anns.LoadBalancing
} }
@ -725,8 +729,10 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
upstreams[name].SecureCACert = anns.SecureUpstream.CACert upstreams[name].SecureCACert = anns.SecureUpstream.CACert
} }
if upstreams[name].UpstreamHashBy == "" { if upstreams[name].UpstreamHashBy.UpstreamHashBy == "" {
upstreams[name].UpstreamHashBy = anns.UpstreamHashBy upstreams[name].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy
upstreams[name].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset
upstreams[name].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize
} }
if upstreams[name].LoadBalancing == "" { if upstreams[name].LoadBalancing == "" {

View file

@ -113,6 +113,14 @@ type Backend struct {
// http://nginx.org/en/docs/http/ngx_http_upstream_module.html#hash // http://nginx.org/en/docs/http/ngx_http_upstream_module.html#hash
UpstreamHashBy string `json:"upstream-hash-by"` UpstreamHashBy string `json:"upstream-hash-by"`
// Consistent hashing subset flag.
// Default: false
UpstreamHashBySubset bool `json:"upstream-hash-by-subset"`
// Subset consistent hashing, subset size.
// Default 3
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size"`
// Let's us choose a load balancing algorithm per ingress // Let's us choose a load balancing algorithm per ingress
LoadBalancing string `json:"load-balance"` LoadBalancing string `json:"load-balance"`

View file

@ -93,7 +93,7 @@ type Backend struct {
// StickySessionAffinitySession contains the StickyConfig object with stickyness configuration // StickySessionAffinitySession contains the StickyConfig object with stickyness configuration
SessionAffinity SessionAffinityConfig `json:"sessionAffinityConfig"` SessionAffinity SessionAffinityConfig `json:"sessionAffinityConfig"`
// Consistent hashing by NGINX variable // Consistent hashing by NGINX variable
UpstreamHashBy string `json:"upstream-hash-by,omitempty"` UpstreamHashBy UpstreamHashByConfig `json:"upstreamHashByConfig,omitempty"`
// LB algorithm configuration per ingress // LB algorithm configuration per ingress
LoadBalancing string `json:"load-balance,omitempty"` LoadBalancing string `json:"load-balance,omitempty"`
// Denotes if a backend has no server. The backend instead shares a server with another backend and acts as an // Denotes if a backend has no server. The backend instead shares a server with another backend and acts as an
@ -150,6 +150,13 @@ type CookieSessionAffinity struct {
Path string `json:"path,omitempty"` Path string `json:"path,omitempty"`
} }
// UpstreamHashByConfig described setting from the upstream-hash-by* annotations.
type UpstreamHashByConfig struct {
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
}
// Endpoint describes a kubernetes endpoint in a backend // Endpoint describes a kubernetes endpoint in a backend
// +k8s:deepcopy-gen=true // +k8s:deepcopy-gen=true
type Endpoint struct { type Endpoint struct {

View file

@ -234,6 +234,27 @@ func (csa1 *CookieSessionAffinity) Equal(csa2 *CookieSessionAffinity) bool {
return true return true
} }
//Equal checks the equality between UpstreamByConfig types
func (u1 *UpstreamHashByConfig) Equal(u2 *UpstreamHashByConfig) bool {
if u1 == u2 {
return true
}
if u1 == nil || u2 == nil {
return false
}
if u1.UpstreamHashBy != u2.UpstreamHashBy {
return false
}
if u1.UpstreamHashBySubset != u2.UpstreamHashBySubset {
return false
}
if u1.UpstreamHashBySubsetSize != u2.UpstreamHashBySubsetSize {
return false
}
return true
}
// Equal checks the equality against an Endpoint // Equal checks the equality against an Endpoint
func (e1 *Endpoint) Equal(e2 *Endpoint) bool { func (e1 *Endpoint) Equal(e2 *Endpoint) bool {
if e1 == e2 { if e1 == e2 {

View file

@ -5,6 +5,7 @@ local dns_util = require("util.dns")
local configuration = require("configuration") local configuration = require("configuration")
local round_robin = require("balancer.round_robin") local round_robin = require("balancer.round_robin")
local chash = require("balancer.chash") local chash = require("balancer.chash")
local chashsubset = require("balancer.chashsubset")
local sticky = require("balancer.sticky") local sticky = require("balancer.sticky")
local ewma = require("balancer.ewma") local ewma = require("balancer.ewma")
@ -17,6 +18,7 @@ local DEFAULT_LB_ALG = "round_robin"
local IMPLEMENTATIONS = { local IMPLEMENTATIONS = {
round_robin = round_robin, round_robin = round_robin,
chash = chash, chash = chash,
chashsubset = chashsubset,
sticky = sticky, sticky = sticky,
ewma = ewma, ewma = ewma,
} }
@ -29,8 +31,12 @@ local function get_implementation(backend)
if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then
name = "sticky" name = "sticky"
elseif backend["upstream-hash-by"] then elseif backend["upstreamHashByConfig"] and backend["upstreamHashByConfig"]["upstream-hash-by"] then
name = "chash" if backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
name = "chashsubset"
else
name = "chash"
end
end end
local implementation = IMPLEMENTATIONS[name] local implementation = IMPLEMENTATIONS[name]

View file

@ -8,7 +8,7 @@ function _M.new(self, backend)
local nodes = util.get_nodes(backend.endpoints) local nodes = util.get_nodes(backend.endpoints)
local o = { local o = {
instance = self.factory:new(nodes), instance = self.factory:new(nodes),
hash_by = backend["upstream-hash-by"], hash_by = backend["upstreamHashByConfig"]["upstream-hash-by"],
traffic_shaping_policy = backend.trafficShapingPolicy, traffic_shaping_policy = backend.trafficShapingPolicy,
alternative_backends = backend.alternativeBackends, alternative_backends = backend.alternativeBackends,
} }

View file

@ -0,0 +1,84 @@
-- Consistent hashing to a subset of nodes. Instead of returning the same node
-- always, we return the same subset always.
local resty_chash = require("resty.chash")
local util = require("util")
local _M = { name = "chashsubset" }
local function build_subset_map(backend)
local endpoints = {}
local subset_map = {}
local subsets = {}
local subset_size = backend["upstreamHashByConfig"]["upstream-hash-by-subset-size"]
for _, endpoint in pairs(backend.endpoints) do
table.insert(endpoints, endpoint)
end
local set_count = math.ceil(#endpoints/subset_size)
local node_count = set_count * subset_size
-- if we don't have enough endpoints, we reuse endpoints in the last set to
-- keep the same number on all of them.
local j = 1
for _ = #endpoints+1, node_count do
table.insert(endpoints, endpoints[j])
j = j+1
end
local k = 1
for i = 1, set_count do
local subset = {}
local subset_id = "set" .. tostring(i)
for _ = 1, subset_size do
table.insert(subset, endpoints[k])
k = k+1
end
subsets[subset_id] = subset
subset_map[subset_id] = 1
end
return subset_map, subsets
end
function _M.new(self, backend)
local subset_map, subsets = build_subset_map(backend)
local o = {
instance = resty_chash:new(subset_map),
hash_by = backend["upstreamHashByConfig"]["upstream-hash-by"],
subsets = subsets,
current_endpoints = backend.endpoints
}
setmetatable(o, self)
self.__index = self
return o
end
function _M.balance(self)
local key = util.lua_ngx_var(self.hash_by)
local subset_id = self.instance:find(key)
local endpoints = self.subsets[subset_id]
local endpoint = endpoints[math.random(#endpoints)]
return endpoint.address .. ":" .. endpoint.port
end
function _M.sync(self, backend)
local subset_map
local changed = not util.deep_compare(self.current_endpoints, backend.endpoints)
if not changed then
return
end
self.current_endpoints = backend.endpoints
subset_map, self.subsets = build_subset_map(backend)
self.instance:reinit(subset_map)
return
end
return _M

View file

@ -16,7 +16,7 @@ describe("Balancer chash", function()
end end
local backend = { local backend = {
name = "my-dummy-backend", ["upstream-hash-by"] = "$request_uri", name = "my-dummy-backend", upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri" },
endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } } endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } }
} }
local instance = balancer_chash:new(backend) local instance = balancer_chash:new(backend)

View file

@ -0,0 +1,82 @@
local function get_test_backend(n_endpoints)
local backend = {
name = "my-dummy-backend",
["upstreamHashByConfig"] = {
["upstream-hash-by"] = "$request_uri",
["upstream-hash-by-subset"] = true,
["upstream-hash-by-subset-size"] = 3
},
endpoints = {}
}
for i = 1, n_endpoints do
backend.endpoints[i] = { address = "10.184.7." .. tostring(i), port = "8080", maxFails = 0, failTimeout = 0 }
end
return backend
end
describe("Balancer chash subset", function()
local balancer_chashsubset = require("balancer.chashsubset")
describe("balance()", function()
it("returns peers from the same subset", function()
_G.ngx = { var = { request_uri = "/alma/armud" }}
local backend = get_test_backend(9)
local instance = balancer_chashsubset:new(backend)
instance:sync(backend)
local first_node = instance:balance()
local subset_id
local endpoint_strings
local function has_value (tab, val)
for _, value in ipairs(tab) do
if value == val then
return true
end
end
return false
end
for id, endpoints in pairs(instance["subsets"]) do
endpoint_strings = {}
for _, endpoint in pairs(endpoints) do
local endpoint_string = endpoint.address .. ":" .. endpoint.port
table.insert(endpoint_strings, endpoint_string)
if first_node == endpoint_string then
-- found the set of first_node
subset_id = id
end
end
if subset_id then
break
end
end
-- multiple calls to balance must return nodes from the same subset
for i = 0, 10 do
assert.True(has_value(endpoint_strings, instance:balance()))
end
end)
end)
describe("new(backend)", function()
it("fills last subset correctly", function()
_G.ngx = { var = { request_uri = "/alma/armud" }}
local backend = get_test_backend(7)
local instance = balancer_chashsubset:new(backend)
instance:sync(backend)
for id, endpoints in pairs(instance["subsets"]) do
assert.are.equal(#endpoints, 3)
end
end)
end)
end)

View file

@ -32,7 +32,10 @@ local function reset_backends()
sessionAffinityConfig = { name = "", cookieSessionAffinity = { name = "", hash = "" } }, sessionAffinityConfig = { name = "", cookieSessionAffinity = { name = "", hash = "" } },
}, },
{ name = "my-dummy-app-1", ["load-balance"] = "round_robin", }, { name = "my-dummy-app-1", ["load-balance"] = "round_robin", },
{ name = "my-dummy-app-2", ["load-balance"] = "round_robin", ["upstream-hash-by"] = "$request_uri", }, {
name = "my-dummy-app-2", ["load-balance"] = "chash",
upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", },
},
{ {
name = "my-dummy-app-3", ["load-balance"] = "ewma", name = "my-dummy-app-3", ["load-balance"] = "ewma",
sessionAffinityConfig = { name = "cookie", cookieSessionAffinity = { name = "route", hash = "sha1" } } sessionAffinityConfig = { name = "cookie", cookieSessionAffinity = { name = "route", hash = "sha1" } }

View file

@ -0,0 +1,106 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package annotations
import (
"fmt"
"net/http"
"regexp"
"strings"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/parnurzeal/gorequest"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/ingress-nginx/test/e2e/framework"
)
func startIngress(f *framework.Framework, annotations *map[string]string) map[string]bool {
host := "upstream-hash-by.foo.com"
ing := framework.NewSingleIngress(host, "/", host, f.IngressController.Namespace, "http-svc", 80, annotations)
f.EnsureIngress(ing)
f.WaitForNginxServer(host,
func(server string) bool {
return strings.Contains(server, fmt.Sprintf("server_name %s ;", host))
})
err := wait.PollImmediate(framework.Poll, framework.DefaultTimeout, func() (bool, error) {
resp, _, _ := gorequest.New().
Get(f.IngressController.HTTPURL).
Set("Host", host).
End()
if resp.StatusCode == http.StatusOK {
return true, nil
}
return false, nil
})
Expect(err).Should(BeNil())
re, _ := regexp.Compile(`Hostname: http-svc.*`)
podMap := map[string]bool{}
for i := 0; i < 100; i++ {
_, body, errs := gorequest.New().
Get(f.IngressController.HTTPURL).
Set("Host", host).
End()
Expect(errs).Should(BeEmpty())
podName := re.FindString(body)
Expect(podName).ShouldNot(Equal(""))
podMap[podName] = true
}
return podMap
}
var _ = framework.IngressNginxDescribe("Annotations - UpstreamHashBy", func() {
f := framework.NewDefaultFramework("upstream-hash-by")
BeforeEach(func() {
f.NewEchoDeploymentWithReplicas(6)
})
AfterEach(func() {
})
It("should connect to the same pod", func() {
annotations := map[string]string{
"nginx.ingress.kubernetes.io/upstream-hash-by": "$request_uri",
}
podMap := startIngress(f, &annotations)
Expect(len(podMap)).Should(Equal(1))
})
It("should connect to the same subset of pods", func() {
annotations := map[string]string{
"nginx.ingress.kubernetes.io/upstream-hash-by": "$request_uri",
"nginx.ingress.kubernetes.io/upstream-hash-by-subset": "true",
"nginx.ingress.kubernetes.io/upstream-hash-by-subset-size": "3",
}
podMap := startIngress(f, &annotations)
Expect(len(podMap)).Should(Equal(3))
})
})