Add session affinity to custom load balancing

This commit is contained in:
Zenara Daley 2018-04-12 14:21:42 -04:00
parent 353c63153e
commit 6ed256dde6
6 changed files with 444 additions and 104 deletions

File diff suppressed because one or more lines are too long

View file

@ -193,6 +193,7 @@ func buildLuaSharedDictionaries(s interface{}, dynamicConfigurationEnabled bool,
"lua_shared_dict locks 512k",
"lua_shared_dict balancer_ewma 1M",
"lua_shared_dict balancer_ewma_last_touched_at 1M",
"lua_shared_dict sticky_sessions 1M",
)
}

View file

@ -5,6 +5,7 @@ local util = require("util")
local lrucache = require("resty.lrucache")
local resty_lock = require("resty.lock")
local ewma = require("balancer.ewma")
local sticky = require("sticky")
-- measured in seconds
-- for an Nginx worker to pick up the new list of upstream peers
@ -48,6 +49,15 @@ end
local function balance()
local backend = get_current_backend()
local lb_alg = get_current_lb_alg()
local is_sticky = sticky.is_sticky(backend)
if is_sticky then
local endpoint = sticky.get_endpoint(backend)
if endpoint ~= nil then
return endpoint.address, endpoint.port
end
lb_alg = "round_robin"
end
if lb_alg == "ip_hash" then
-- TODO(elvinefendi) implement me
@ -77,6 +87,9 @@ local function balance()
if forcible then
ngx.log(ngx.WARN, "round_robin_state:set valid items forcibly overwritten")
end
if is_sticky then
sticky.set_endpoint(endpoint, backend)
end
round_robin_lock:unlock(backend.name .. ROUND_ROBIN_LOCK_KEY)
return endpoint.address, endpoint.port

View file

@ -0,0 +1,133 @@
local json = require('cjson')
local str = require("resty.string")
local sha1_crypto = require("resty.sha1")
local md5_crypto = require("resty.md5")
local sticky_sessions = ngx.shared.sticky_sessions
local DEFAULT_SESSION_COOKIE_NAME = "route"
local DEFAULT_SESSION_COOKIE_HASH = "md5"
-- Currently STICKY_TIMEOUT never expires
local STICKY_TIMEOUT = 0
local _M = {}
local function md5_digest(raw)
local md5 = md5_crypto:new()
if not md5 then
return nil, "md5: failed to create object"
end
local ok = md5:update(raw)
if not ok then
return nil, "md5: failed to add data"
end
local digest = md5:final()
if digest == nil then
return nil, "md5: failed to create digest"
end
return str.to_hex(digest), nil
end
local function sha1_digest(raw)
local sha1 = sha1_crypto:new()
if not sha1 then
return nil, "sha1: failed to create object"
end
local ok = sha1:update(raw)
if not ok then
return nil, "sha1: failed to add data"
end
local digest = sha1:final()
if digest == nil then
return nil, "sha1: failed to create digest"
end
return str.to_hex(digest), nil
end
local function get_cookie_name(backend)
local name = backend["sessionAffinityConfig"]["cookieSessionAffinity"]["name"]
return name or DEFAULT_SESSION_COOKIE_NAME
end
local function is_valid_endpoint(backend, address, port)
for _, ep in ipairs(backend.endpoints) do
if ep.address == address and ep.port == port then
return true
end
end
return false
end
function _M.is_sticky(backend)
return backend["sessionAffinityConfig"]["name"] == "cookie"
end
function _M.get_endpoint(backend)
local cookie_name = get_cookie_name(backend)
local cookie_key = "cookie_" .. cookie_name
local endpoint_key = ngx.var[cookie_key]
if endpoint_key == nil then
ngx.log(ngx.INFO, string.format(
"[backend=%s, affinity=cookie] cookie \"%s\" is not set for this request",
backend.name,
cookie_name
))
return nil
end
local endpoint_string = sticky_sessions:get(endpoint_key)
if endpoint_string == nil then
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] no endpoint assigned", backend.name))
return nil
end
local endpoint = json.decode(endpoint_string)
local valid = is_valid_endpoint(backend, endpoint.address, endpoint.port)
if not valid then
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] assigned endpoint is no longer valid", backend.name))
sticky_sessions:delete(endpoint_key)
return nil
end
return endpoint
end
function _M.set_endpoint(endpoint, backend)
local cookie_name = get_cookie_name(backend)
local encrypted, err
local endpoint_string = json.encode(endpoint)
local hash = backend["sessionAffinityConfig"]["cookieSessionAffinity"]["hash"]
if hash == "sha1" then
encrypted, err = sha1_digest(endpoint_string)
else
if hash ~= DEFAULT_SESSION_COOKIE_HASH then
ngx.log(ngx.WARN, string.format(
"[backend=%s, affinity=cookie] session-cookie-hash \"%s\" is not valid, defaulting to %s",
backend.name,
hash,
DEFAULT_SESSION_COOKIE_HASH
))
end
encrypted, err = md5_digest(endpoint_string)
end
if err ~= nil then
ngx.log(ngx.WARN, string.format("[backend=%s, affinity=cookie] failed to assign endpoint: %s", backend.name, err))
return
end
ngx.log(ngx.INFO, string.format("[backend=%s, affinity=cookie] assigning a new endpoint", backend.name))
ngx.header["Set-Cookie"] = cookie_name .. "=" .. encrypted .. ";"
local success, forcible
success, err, forcible = sticky_sessions:set(encrypted, endpoint_string, STICKY_TIMEOUT)
if not success then
ngx.log(ngx.WARN, string.format("[backend=%s, affinity=cookie] failed to assign endpoint: %s", backend.name, err))
end
if forcible then
ngx.log(ngx.WARN, string.format(
"[backend=%s, affinity=cookie] sticky_sessions shared dict is full; endpoint forcibly overwritten",
backend.name
))
end
end
return _M

View file

@ -30,11 +30,11 @@ import (
"k8s.io/ingress-nginx/test/e2e/framework"
// tests to run
_ "k8s.io/ingress-nginx/test/e2e/annotations"
_ "k8s.io/ingress-nginx/test/e2e/defaultbackend"
// _ "k8s.io/ingress-nginx/test/e2e/annotations"
// _ "k8s.io/ingress-nginx/test/e2e/defaultbackend"
_ "k8s.io/ingress-nginx/test/e2e/lua"
_ "k8s.io/ingress-nginx/test/e2e/settings"
_ "k8s.io/ingress-nginx/test/e2e/ssl"
// _ "k8s.io/ingress-nginx/test/e2e/settings"
// _ "k8s.io/ingress-nginx/test/e2e/ssl"
)
// RunE2ETests checks configuration parameters (specified through flags) and then runs

View file

@ -19,6 +19,7 @@ package lua
import (
"fmt"
"net/http"
"regexp"
"strings"
"time"
@ -43,7 +44,7 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
err := enableDynamicConfiguration(f.KubeClientSet)
Expect(err).NotTo(HaveOccurred())
err = f.NewEchoDeploymentWithReplicas(1)
err = f.NewEchoDeploymentWithReplicas(3)
Expect(err).NotTo(HaveOccurred())
host := "foo.com"
@ -75,115 +76,275 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
Expect(err).NotTo(HaveOccurred())
})
Context("when only backends change", func() {
It("should handle endpoints only changes", func() {
// Context("when only backends change", func() {
// It("should handle endpoints only changes", func() {
// resp, _, errs := gorequest.New().
// Get(fmt.Sprintf("%s?id=endpoints_only_changes", f.NginxHTTPURL)).
// Set("Host", "foo.com").
// End()
// Expect(len(errs)).Should(BeNumerically("==", 0))
// Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// replicas := 2
// err := framework.UpdateDeployment(f.KubeClientSet, f.Namespace.Name, "http-svc", replicas,
// func(deployment *appsv1beta1.Deployment) error {
// deployment.Spec.Replicas = framework.NewInt32(int32(replicas))
// _, err := f.KubeClientSet.AppsV1beta1().Deployments(f.Namespace.Name).Update(deployment)
// return err
// })
// Expect(err).NotTo(HaveOccurred())
// time.Sleep(5 * time.Second)
// log, err := f.NginxLogs()
// Expect(err).ToNot(HaveOccurred())
// Expect(log).ToNot(BeEmpty())
// index := strings.Index(log, "id=endpoints_only_changes")
// restOfLogs := log[index:]
// By("POSTing new backends to Lua endpoint")
// Expect(restOfLogs).To(ContainSubstring("dynamic reconfiguration succeeded"))
// Expect(restOfLogs).ToNot(ContainSubstring("could not dynamically reconfigure"))
// By("skipping Nginx reload")
// Expect(restOfLogs).ToNot(ContainSubstring("backend reload required"))
// Expect(restOfLogs).ToNot(ContainSubstring("ingress backend successfully reloaded"))
// Expect(restOfLogs).To(ContainSubstring("skipping reload"))
// Expect(restOfLogs).ToNot(ContainSubstring("first sync of Nginx configuration"))
// })
// It("should handle annotation changes", func() {
// ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Get("foo.com", metav1.GetOptions{})
// Expect(err).ToNot(HaveOccurred())
// ingress.ObjectMeta.Annotations["nginx.ingress.kubernetes.io/load-balance"] = "round_robin"
// _, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Update(ingress)
// Expect(err).ToNot(HaveOccurred())
// time.Sleep(5 * time.Second)
// log, err := f.NginxLogs()
// Expect(err).ToNot(HaveOccurred())
// Expect(log).ToNot(BeEmpty())
// index := strings.Index(log, fmt.Sprintf("reason: 'UPDATE' Ingress %s/foo.com", f.Namespace.Name))
// restOfLogs := log[index:]
// By("POSTing new backends to Lua endpoint")
// Expect(restOfLogs).To(ContainSubstring("dynamic reconfiguration succeeded"))
// Expect(restOfLogs).ToNot(ContainSubstring("could not dynamically reconfigure"))
// By("skipping Nginx reload")
// Expect(restOfLogs).ToNot(ContainSubstring("backend reload required"))
// Expect(restOfLogs).ToNot(ContainSubstring("ingress backend successfully reloaded"))
// Expect(restOfLogs).To(ContainSubstring("skipping reload"))
// Expect(restOfLogs).ToNot(ContainSubstring("first sync of Nginx configuration"))
// })
// })
// It("should handle a non backend update", func() {
// ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Get("foo.com", metav1.GetOptions{})
// Expect(err).ToNot(HaveOccurred())
// ingress.Spec.TLS = []v1beta1.IngressTLS{
// {
// Hosts: []string{"foo.com"},
// SecretName: "foo.com",
// },
// }
// _, _, _, err = framework.CreateIngressTLSSecret(f.KubeClientSet,
// ingress.Spec.TLS[0].Hosts,
// ingress.Spec.TLS[0].SecretName,
// ingress.Namespace)
// Expect(err).ToNot(HaveOccurred())
// _, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Update(ingress)
// Expect(err).ToNot(HaveOccurred())
// time.Sleep(5 * time.Second)
// log, err := f.NginxLogs()
// Expect(err).ToNot(HaveOccurred())
// Expect(log).ToNot(BeEmpty())
// By("reloading Nginx")
// Expect(log).To(ContainSubstring("ingress backend successfully reloaded"))
// By("POSTing new backends to Lua endpoint")
// Expect(log).To(ContainSubstring("dynamic reconfiguration succeeded"))
// By("still be proxying requests through Lua balancer")
// err = f.WaitForNginxServer("foo.com",
// func(server string) bool {
// return strings.Contains(server, "proxy_pass http://upstream_balancer;")
// })
// Expect(err).NotTo(HaveOccurred())
// By("generating the respective ssl listen directive")
// err = f.WaitForNginxServer("foo.com",
// func(server string) bool {
// return strings.Contains(server, "server_name foo.com") &&
// strings.Contains(server, "listen 443")
// })
// Expect(err).ToNot(HaveOccurred())
// })
Context("when session affinity annotation is present", func() {
// It("should use sticky sessions when ingress rules are configured", func() {
// cookieName := "STICKYSESSION"
// By("Updating affinity annotation on ingress")
// ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Get("foo.com", metav1.GetOptions{})
// Expect(err).ToNot(HaveOccurred())
// ingress.ObjectMeta.Annotations = map[string]string{
// "nginx.ingress.kubernetes.io/affinity": "cookie",
// "nginx.ingress.kubernetes.io/session-cookie-name": cookieName,
// }
// _, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Update(ingress)
// Expect(err).ToNot(HaveOccurred())
// time.Sleep(5 * time.Second)
// By("Making a first request")
// host := "foo.com"
// resp, _, errs := gorequest.New().
// Get(f.NginxHTTPURL).
// Set("Host", host).
// End()
// Expect(len(errs)).Should(BeNumerically("==", 0))
// Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// cookies := (*http.Response)(resp).Cookies()
// sessionCookie, err := getCookie(cookieName, cookies)
// Expect(err).ToNot(HaveOccurred())
// By("Making a second request with the previous session cookie")
// resp, _, errs = gorequest.New().
// Get(f.NginxHTTPURL).
// AddCookie(sessionCookie).
// Set("Host", host).
// End()
// Expect(len(errs)).Should(BeNumerically("==", 0))
// Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// By("Making a third request with no cookie")
// resp, _, errs = gorequest.New().
// Get(f.NginxHTTPURL).
// Set("Host", host).
// End()
// Expect(len(errs)).Should(BeNumerically("==", 0))
// Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// log, err := f.NginxLogs()
// Expect(err).ToNot(HaveOccurred())
// Expect(log).ToNot(BeEmpty())
// By("Checking that upstreams are sticky when session cookie is used")
// index := strings.Index(log, fmt.Sprintf("reason: 'UPDATE' Ingress %s/foo.com", f.Namespace.Name))
// reqLogs := log[index:]
// re := regexp.MustCompile(`\d{1,3}(?:\.\d{1,3}){3}(?::\d{1,5})`)
// upstreams := re.FindAllString(reqLogs, -1)
// Expect(len(upstreams)).Should(BeNumerically("==", 3))
// Expect(upstreams[0]).To(Equal(upstreams[1]))
// Expect(upstreams[1]).ToNot(Equal(upstreams[2]))
// })
// It("should NOT use sticky sessions when a default backend and no ingress rules configured", func() {
// By("Updating affinity annotation and rules on ingress")
// ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Get("foo.com", metav1.GetOptions{})
// Expect(err).ToNot(HaveOccurred())
// ingress.Spec = v1beta1.IngressSpec{
// Backend: &v1beta1.IngressBackend{
// ServiceName: "http-svc",
// ServicePort: intstr.FromInt(80),
// },
// }
// ingress.ObjectMeta.Annotations = map[string]string{
// "nginx.ingress.kubernetes.io/affinity": "cookie",
// }
// _, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Update(ingress)
// Expect(err).ToNot(HaveOccurred())
// time.Sleep(5 * time.Second)
// By("Making a request")
// host := "foo.com"
// resp, _, errs := gorequest.New().
// Get(f.NginxHTTPURL).
// Set("Host", host).
// End()
// Expect(len(errs)).Should(BeNumerically("==", 0))
// Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// By("Ensuring no cookies are set")
// cookies := (*http.Response)(resp).Cookies()
// Expect(len(cookies)).Should(BeNumerically("==", 0))
// })
It("should use sticky sessions when dynamic configuration is disabled", func() {
cookieName := "STICKYSESSION"
By("Disabling dynamic configuration")
err := disableDynamicConfiguration(f.KubeClientSet)
Expect(err).NotTo(HaveOccurred())
By("Updating affinity annotation on ingress")
ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Get("foo.com", metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
ingress.ObjectMeta.Annotations = map[string]string{
"nginx.ingress.kubernetes.io/affinity": "cookie",
"nginx.ingress.kubernetes.io/session-cookie-name": cookieName,
}
_, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Update(ingress)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
By("Making a first request")
host := "foo.com"
resp, _, errs := gorequest.New().
Get(fmt.Sprintf("%s?id=endpoints_only_changes", f.NginxHTTPURL)).
Set("Host", "foo.com").
Get(f.NginxHTTPURL).
Set("Host", host).
End()
// Expect(len(errs)).Should(BeNumerically("==", 0))
// Expect(resp.StatusCode).Should(Equal(http.StatusOK))
// Expect(resp).To(BeNil())
log, err := f.NginxLogs()
Expect(err).ToNot(HaveOccurred())
Expect(log).To(BeEmpty())
cookies := (*http.Response)(resp).Cookies()
sessionCookie, err := getCookie(cookieName, cookies)
Expect(err).ToNot(HaveOccurred())
By("Making a second request with the previous session cookie")
resp, _, errs = gorequest.New().
Get(f.NginxHTTPURL).
AddCookie(sessionCookie).
Set("Host", host).
End()
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
replicas := 2
err := framework.UpdateDeployment(f.KubeClientSet, f.Namespace.Name, "http-svc", replicas,
func(deployment *appsv1beta1.Deployment) error {
deployment.Spec.Replicas = framework.NewInt32(int32(replicas))
_, err := f.KubeClientSet.AppsV1beta1().Deployments(f.Namespace.Name).Update(deployment)
return err
})
Expect(err).NotTo(HaveOccurred())
By("Making a third request with no cookie")
resp, _, errs = gorequest.New().
Get(f.NginxHTTPURL).
Set("Host", host).
End()
time.Sleep(5 * time.Second)
log, err := f.NginxLogs()
Expect(err).ToNot(HaveOccurred())
Expect(log).ToNot(BeEmpty())
index := strings.Index(log, "id=endpoints_only_changes")
restOfLogs := log[index:]
Expect(len(errs)).Should(BeNumerically("==", 0))
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
By("POSTing new backends to Lua endpoint")
Expect(restOfLogs).To(ContainSubstring("dynamic reconfiguration succeeded"))
Expect(restOfLogs).ToNot(ContainSubstring("could not dynamically reconfigure"))
// log, err := f.NginxLogs()
// Expect(err).ToNot(HaveOccurred())
// Expect(log).ToNot(BeEmpty())
By("skipping Nginx reload")
Expect(restOfLogs).ToNot(ContainSubstring("backend reload required"))
Expect(restOfLogs).ToNot(ContainSubstring("ingress backend successfully reloaded"))
Expect(restOfLogs).To(ContainSubstring("skipping reload"))
Expect(restOfLogs).ToNot(ContainSubstring("first sync of Nginx configuration"))
})
It("should handle annotation changes", func() {
ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Get("foo.com", metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
ingress.ObjectMeta.Annotations["nginx.ingress.kubernetes.io/load-balance"] = "round_robin"
_, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Update(ingress)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
log, err := f.NginxLogs()
Expect(err).ToNot(HaveOccurred())
Expect(log).ToNot(BeEmpty())
By("Checking that upstreams are sticky when session cookie is used")
index := strings.Index(log, fmt.Sprintf("reason: 'UPDATE' Ingress %s/foo.com", f.Namespace.Name))
restOfLogs := log[index:]
By("POSTing new backends to Lua endpoint")
Expect(restOfLogs).To(ContainSubstring("dynamic reconfiguration succeeded"))
Expect(restOfLogs).ToNot(ContainSubstring("could not dynamically reconfigure"))
By("skipping Nginx reload")
Expect(restOfLogs).ToNot(ContainSubstring("backend reload required"))
Expect(restOfLogs).ToNot(ContainSubstring("ingress backend successfully reloaded"))
Expect(restOfLogs).To(ContainSubstring("skipping reload"))
Expect(restOfLogs).ToNot(ContainSubstring("first sync of Nginx configuration"))
reqLogs := log[index:]
re := regexp.MustCompile(`\d{1,3}(?:\.\d{1,3}){3}(?::\d{1,5})`)
upstreams := re.FindAllString(reqLogs, -1)
Expect(len(upstreams)).Should(BeNumerically("==", 3))
Expect(upstreams[0]).To(Equal(upstreams[1]))
Expect(upstreams[1]).ToNot(Equal(upstreams[2]))
})
})
It("should handle a non backend update", func() {
ingress, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Get("foo.com", metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
ingress.Spec.TLS = []v1beta1.IngressTLS{
{
Hosts: []string{"foo.com"},
SecretName: "foo.com",
},
}
_, _, _, err = framework.CreateIngressTLSSecret(f.KubeClientSet,
ingress.Spec.TLS[0].Hosts,
ingress.Spec.TLS[0].SecretName,
ingress.Namespace)
Expect(err).ToNot(HaveOccurred())
_, err = f.KubeClientSet.ExtensionsV1beta1().Ingresses(f.Namespace.Name).Update(ingress)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
log, err := f.NginxLogs()
Expect(err).ToNot(HaveOccurred())
Expect(log).ToNot(BeEmpty())
By("reloading Nginx")
Expect(log).To(ContainSubstring("ingress backend successfully reloaded"))
By("POSTing new backends to Lua endpoint")
Expect(log).To(ContainSubstring("dynamic reconfiguration succeeded"))
By("still be proxying requests through Lua balancer")
err = f.WaitForNginxServer("foo.com",
func(server string) bool {
return strings.Contains(server, "proxy_pass http://upstream_balancer;")
})
Expect(err).NotTo(HaveOccurred())
By("generating the respective ssl listen directive")
err = f.WaitForNginxServer("foo.com",
func(server string) bool {
return strings.Contains(server, "server_name foo.com") &&
strings.Contains(server, "listen 443")
})
Expect(err).ToNot(HaveOccurred())
})
})
func enableDynamicConfiguration(kubeClientSet kubernetes.Interface) error {
@ -252,3 +413,12 @@ func ensureIngress(f *framework.Framework, host string) (*extensions.Ingress, er
},
})
}
func getCookie(name string, cookies []*http.Cookie) (*http.Cookie, error) {
for _, cookie := range cookies {
if cookie.Name == name {
return cookie, nil
}
}
return &http.Cookie{}, fmt.Errorf("Cookie does not exist")
}