Merge pull request #2341 from Shopify/custom-sticky

Add session affinity to custom load balancing
This commit is contained in:
k8s-ci-robot 2018-04-12 17:22:59 -07:00 committed by GitHub
commit 8855460817
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 283 additions and 1 deletions

File diff suppressed because one or more lines are too long

View file

@ -193,6 +193,7 @@ func buildLuaSharedDictionaries(s interface{}, dynamicConfigurationEnabled bool,
"lua_shared_dict locks 512k", "lua_shared_dict locks 512k",
"lua_shared_dict balancer_ewma 1M", "lua_shared_dict balancer_ewma 1M",
"lua_shared_dict balancer_ewma_last_touched_at 1M", "lua_shared_dict balancer_ewma_last_touched_at 1M",
"lua_shared_dict sticky_sessions 1M",
) )
} }

View file

@ -5,6 +5,7 @@ local util = require("util")
local lrucache = require("resty.lrucache") local lrucache = require("resty.lrucache")
local resty_lock = require("resty.lock") local resty_lock = require("resty.lock")
local ewma = require("balancer.ewma") local ewma = require("balancer.ewma")
local sticky = require("sticky")
-- measured in seconds -- measured in seconds
-- for an Nginx worker to pick up the new list of upstream peers -- for an Nginx worker to pick up the new list of upstream peers
@ -48,6 +49,15 @@ end
local function balance() local function balance()
local backend = get_current_backend() local backend = get_current_backend()
local lb_alg = get_current_lb_alg() local lb_alg = get_current_lb_alg()
local is_sticky = sticky.is_sticky(backend)
if is_sticky then
local endpoint = sticky.get_endpoint(backend)
if endpoint ~= nil then
return endpoint.address, endpoint.port
end
lb_alg = "round_robin"
end
if lb_alg == "ip_hash" then if lb_alg == "ip_hash" then
-- TODO(elvinefendi) implement me -- TODO(elvinefendi) implement me
@ -77,6 +87,9 @@ local function balance()
if forcible then if forcible then
ngx.log(ngx.WARN, "round_robin_state:set valid items forcibly overwritten") ngx.log(ngx.WARN, "round_robin_state:set valid items forcibly overwritten")
end end
if is_sticky then
sticky.set_endpoint(endpoint, backend)
end
round_robin_lock:unlock(backend.name .. ROUND_ROBIN_LOCK_KEY) round_robin_lock:unlock(backend.name .. ROUND_ROBIN_LOCK_KEY)
return endpoint.address, endpoint.port return endpoint.address, endpoint.port

View file

@ -0,0 +1,133 @@
local json = require('cjson')
local str = require("resty.string")
local sha1_crypto = require("resty.sha1")
local md5_crypto = require("resty.md5")
local sticky_sessions = ngx.shared.sticky_sessions
local DEFAULT_SESSION_COOKIE_NAME = "route"
local DEFAULT_SESSION_COOKIE_HASH = "md5"
-- Currently STICKY_TIMEOUT never expires
local STICKY_TIMEOUT = 0
local _M = {}
local function md5_digest(raw)
local md5 = md5_crypto:new()
if not md5 then
return nil, "md5: failed to create object"
end
local ok = md5:update(raw)
if not ok then
return nil, "md5: failed to add data"
end
local digest = md5:final()
if digest == nil then
return nil, "md5: failed to create digest"
end
return str.to_hex(digest), nil
end
local function sha1_digest(raw)
local sha1 = sha1_crypto:new()
if not sha1 then
return nil, "sha1: failed to create object"
end
local ok = sha1:update(raw)
if not ok then
return nil, "sha1: failed to add data"
end
local digest = sha1:final()
if digest == nil then
return nil, "sha1: failed to create digest"
end
return str.to_hex(digest), nil
end
local function get_cookie_name(backend)
local name = backend["sessionAffinityConfig"]["cookieSessionAffinity"]["name"]
return name or DEFAULT_SESSION_COOKIE_NAME
end
local function is_valid_endpoint(backend, address, port)
for _, ep in ipairs(backend.endpoints) do
if ep.address == address and ep.port == port then
return true
end
end
return false
end
function _M.is_sticky(backend)
return backend["sessionAffinityConfig"]["name"] == "cookie"
end
function _M.get_endpoint(backend)
local cookie_name = get_cookie_name(backend)
local cookie_key = "cookie_" .. cookie_name
local endpoint_key = ngx.var[cookie_key]
if endpoint_key == nil then
ngx.log(ngx.INFO, string.format(
"[backend=%s, affinity=cookie] cookie \"%s\" is not set for this request",
backend.name,
cookie_name
))
return nil
end
local endpoint_string = sticky_sessions:get(endpoint_key)
if endpoint_string == nil then
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] no endpoint assigned", backend.name))
return nil
end
local endpoint = json.decode(endpoint_string)
local valid = is_valid_endpoint(backend, endpoint.address, endpoint.port)
if not valid then
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] assigned endpoint is no longer valid", backend.name))
sticky_sessions:delete(endpoint_key)
return nil
end
return endpoint
end
function _M.set_endpoint(endpoint, backend)
local cookie_name = get_cookie_name(backend)
local encrypted, err
local endpoint_string = json.encode(endpoint)
local hash = backend["sessionAffinityConfig"]["cookieSessionAffinity"]["hash"]
if hash == "sha1" then
encrypted, err = sha1_digest(endpoint_string)
else
if hash ~= DEFAULT_SESSION_COOKIE_HASH then
ngx.log(ngx.WARN, string.format(
"[backend=%s, affinity=cookie] session-cookie-hash \"%s\" is not valid, defaulting to %s",
backend.name,
hash,
DEFAULT_SESSION_COOKIE_HASH
))
end
encrypted, err = md5_digest(endpoint_string)
end
if err ~= nil then
ngx.log(ngx.WARN, string.format("[backend=%s, affinity=cookie] failed to assign endpoint: %s", backend.name, err))
return
end
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] assigning a new endpoint", backend.name))
ngx.header["Set-Cookie"] = cookie_name .. "=" .. encrypted .. ";"
local success, forcible
success, err, forcible = sticky_sessions:set(encrypted, endpoint_string, STICKY_TIMEOUT)
if not success then
ngx.log(ngx.WARN, string.format("[backend=%s, affinity=cookie] failed to assign endpoint: %s", backend.name, err))
end
if forcible then
ngx.log(ngx.WARN, string.format(
"[backend=%s, affinity=cookie] sticky_sessions shared dict is full; endpoint forcibly overwritten",
backend.name
))
end
end
return _M

View file

@ -19,6 +19,7 @@ package lua
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"regexp"
"strings" "strings"
"time" "time"
@ -184,6 +185,108 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
}) })
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
Context("when session affinity annotation is present", func() {
It("should use sticky sessions when ingress rules are configured", func() {
cookieName := "STICKYSESSION"
By("Updating affinity annotation on ingress")
ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Get("foo.com", metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
ingress.ObjectMeta.Annotations = map[string]string{
"nginx.ingress.kubernetes.io/affinity": "cookie",
"nginx.ingress.kubernetes.io/session-cookie-name": cookieName,
}
_, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Update(ingress)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
By("Increasing the number of service replicas")
replicas := 2
err = framework.UpdateDeployment(f.KubeClientSet, f.Namespace.Name, "http-svc", replicas,
func(deployment *appsv1beta1.Deployment) error {
deployment.Spec.Replicas = framework.NewInt32(int32(replicas))
_, err := f.KubeClientSet.AppsV1beta1().Deployments(f.Namespace.Name).Update(deployment)
return err
})
Expect(err).NotTo(HaveOccurred())
By("Making a first request")
host := "foo.com"
resp, _, errs := gorequest.New().
Get(f.NginxHTTPURL).
Set("Host", host).
End()
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
cookies := (*http.Response)(resp).Cookies()
sessionCookie, err := getCookie(cookieName, cookies)
Expect(err).ToNot(HaveOccurred())
By("Making a second request with the previous session cookie")
resp, _, errs = gorequest.New().
Get(f.NginxHTTPURL).
AddCookie(sessionCookie).
Set("Host", host).
End()
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
By("Making a third request with no cookie")
resp, _, errs = gorequest.New().
Get(f.NginxHTTPURL).
Set("Host", host).
End()
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
log, err := f.NginxLogs()
Expect(err).ToNot(HaveOccurred())
Expect(log).ToNot(BeEmpty())
By("Checking that upstreams are sticky when session cookie is used")
index := strings.Index(log, fmt.Sprintf("reason: 'UPDATE' Ingress %s/foo.com", f.Namespace.Name))
reqLogs := log[index:]
re := regexp.MustCompile(`\d{1,3}(?:\.\d{1,3}){3}(?::\d{1,5})`)
upstreams := re.FindAllString(reqLogs, -1)
Expect(len(upstreams)).Should(BeNumerically("==", 3))
Expect(upstreams[0]).To(Equal(upstreams[1]))
Expect(upstreams[1]).ToNot(Equal(upstreams[2]))
})
It("should NOT use sticky sessions when a default backend and no ingress rules configured", func() {
By("Updating affinity annotation and rules on ingress")
ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Get("foo.com", metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
ingress.Spec = v1beta1.IngressSpec{
Backend: &v1beta1.IngressBackend{
ServiceName: "http-svc",
ServicePort: intstr.FromInt(80),
},
}
ingress.ObjectMeta.Annotations = map[string]string{
"nginx.ingress.kubernetes.io/affinity": "cookie",
}
_, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Update(ingress)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
By("Making a request")
host := "foo.com"
resp, _, errs := gorequest.New().
Get(f.NginxHTTPURL).
Set("Host", host).
End()
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
By("Ensuring no cookies are set")
cookies := (*http.Response)(resp).Cookies()
Expect(len(cookies)).Should(BeNumerically("==", 0))
})
})
}) })
func enableDynamicConfiguration(kubeClientSet kubernetes.Interface) error { func enableDynamicConfiguration(kubeClientSet kubernetes.Interface) error {
@ -252,3 +355,12 @@ func ensureIngress(f *framework.Framework, host string) (*extensions.Ingress, er
}, },
}) })
} }
func getCookie(name string, cookies []*http.Cookie) (*http.Cookie, error) {
for _, cookie := range cookies {
if cookie.Name == name {
return cookie, nil
}
}
return &http.Cookie{}, fmt.Errorf("Cookie does not exist")
}