Merge pull request #6673 from ElvinEfendi/global-rate-limit

Add Global/Distributed Rate Limiting support
This commit is contained in:
Kubernetes Prow Robot 2021-01-05 06:17:58 -08:00 committed by GitHub
commit efdd63f967
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1179 additions and 38 deletions

View file

@ -34,4 +34,5 @@ resty \
--shdict "balancer_ewma 1M" \
--shdict "balancer_ewma_last_touched_at 1M" \
--shdict "balancer_ewma_locks 512k" \
--shdict "global_throttle_cache 5M" \
./rootfs/etc/nginx/lua/test/run.lua ${BUSTED_ARGS} ./rootfs/etc/nginx/lua/test/ ./rootfs/etc/nginx/lua/plugins/**/test

View file

@ -56,6 +56,10 @@ You can add these Kubernetes annotations to specific Ingress objects to customiz
|[nginx.ingress.kubernetes.io/http2-push-preload](#http2-push-preload)|"true" or "false"|
|[nginx.ingress.kubernetes.io/limit-connections](#rate-limiting)|number|
|[nginx.ingress.kubernetes.io/limit-rps](#rate-limiting)|number|
|[nginx.ingress.kubernetes.io/global-rate-limit](#global-rate-limiting)|number|
|[nginx.ingress.kubernetes.io/global-rate-limit-window](#global-rate-limiting)|duration|
|[nginx.ingress.kubernetes.io/global-rate-limit-key](#global-rate-limiting)|string|
|[nginx.ingress.kubernetes.io/global-rate-limit-ignored-cidrs](#global-rate-limiting)|string|
|[nginx.ingress.kubernetes.io/permanent-redirect](#permanent-redirect)|string|
|[nginx.ingress.kubernetes.io/permanent-redirect-code](#permanent-redirect-code)|number|
|[nginx.ingress.kubernetes.io/temporal-redirect](#temporal-redirect)|string|
@ -474,7 +478,7 @@ By default the controller redirects all requests to an existing service that pro
!!! note
For more information please see [global-auth-url](./configmap.md#global-auth-url).
### Rate limiting
### Rate Limiting
These annotations define limits on connections and transmission rates. These can be used to mitigate [DDoS Attacks](https://www.nginx.com/blog/mitigating-ddos-attacks-with-nginx-and-nginx-plus).
@ -492,6 +496,46 @@ To configure settings globally for all Ingress rules, the `limit-rate-after` and
The client IP address will be set based on the use of [PROXY protocol](./configmap.md#use-proxy-protocol) or from the `X-Forwarded-For` header value when [use-forwarded-headers](./configmap.md#use-forwarded-headers) is enabled.
### Global Rate Limiting
**Note:** Be careful when configuring both (Local) Rate Limiting and Global Rate Limiting at the same time.
They are two completely different rate limiting implementations. Whichever limit exceeds first will reject the
requests. It might be a good idea to configure both of them to ease load on Global Rate Limiting backend
in cases of spike in traffic.
The stock NGINX rate limiting does not share its counters among different NGINX instances.
Given that most ingress-nginx deployments are elastic and number of replicas can change any day
it is impossible to configure a proper rate limit using stock NGINX functionalities.
Global Rate Limiting overcome this by using [lua-resty-global-throttle](https://github.com/ElvinEfendi/lua-resty-global-throttle). `lua-resty-global-throttle` shares its counters via a central store such as `memcached`.
The obvious shortcoming of this is users have to deploy and operate a `memcached` instance
in order to benefit from this functionality. Configure the `memcached`
using [these configmap settings](./configmap.md#memcached).
**Here are a few remarks for ingress-nginx integration of `lua-resty-global-throttle`:**
1. We minimize `memcached` access by caching exceeding limit decisions. The expiry of
cache entry is the desired delay `lua-resty-global-throttle` calculates for us.
The Lua Shared Dictionary used for that is `global_throttle_cache`. Currently its size defaults to 10M.
Customize it as per your needs using [lua-shared-dicts](./configmap.md#lua-shared-dicts).
When we fail to cache the exceeding limit decision then we log an NGINX error. You can monitor
for that error to decide if you need to bump the cache size. Without cache the cost of processing a
request is two memcached commands: `GET`, and `INCR`. With the cache it is only `INCR`.
1. Log NGINX variable `$global_rate_limit_exceeding`'s value to have some visibility into
what portion of requests are rejected (value `y`), whether they are rejected using cached decision (value `c`),
or if they are not rejeced (default value `n`). You can use [log-format-upstream](./configmap.md#log-format-upstream)
to include that in access logs.
1. In case of an error it will log the error message and **fail open**.
1. The annotations below creates Global Rate Limiting instance per ingress.
That means if there are multuple paths configured under the same ingress,
the Global Rate Limiting will count requests to all the paths under the same counter.
Extract a path out into its own ingres if you need to isolate a certain path.
* `nginx.ingress.kubernetes.io/global-rate-limit`: Configures maximum allowed number of requests per window. Required.
* `nginx.ingress.kubernetes.io/global-rate-limit-window`: Configures a time window (i.e `1m`) that the limit is applied. Required.
* `nginx.ingress.kubernetes.io/global-rate-limit-key`: Configures a key for counting the samples. Defaults to `$remote_addr`. You can also combine multiple NGINX variables here, like `${remote_addr}-${http_x_api_client}` which would mean the limit will be applied to requests coming from the same API client (indicated by `X-API-Client` HTTP request header) with the same source IP address.
* `nginx.ingress.kubernetes.io/global-rate-limit-ignored-cidrs`: comma separated list of IPs and CIDRs to match client IP against. When there's a match request is not considered for rate limiting.
### Permanent Redirect
This annotation allows to return a permanent redirect (Return Code 301) instead of sending data to the upstream. For example `nginx.ingress.kubernetes.io/permanent-redirect: https://www.google.com` would redirect everything to Google.

View file

@ -192,6 +192,12 @@ The following table shows a configuration option's name, type, and the default v
|[block-referers](#block-referers)|[]string|""|
|[proxy-ssl-location-only](#proxy-ssl-location-only)|bool|"false"|
|[default-type](#default-type)|string|"text/html"|
|[global-rate-limit-memcached-host](#global-rate-limit)|string|""|
|[global-rate-limit-memcached-port](#global-rate-limit)|int|11211|
|[global-rate-limit-memcached-connect-timeout](#global-rate-limit)|int|50|
|[global-rate-limit-memcached-max-idle-timeout](#global-rate-limit)|int|10000|
|[global-rate-limit-memcached-pool-size](#global-rate-limit)|int|50|
|[global-rate-limit-status-code](#global-rate-limit)|int|429|
## add-headers
@ -1152,3 +1158,19 @@ _**default:**_ text/html
_References:_
[http://nginx.org/en/docs/http/ngx_http_core_module.html#default_type](http://nginx.org/en/docs/http/ngx_http_core_module.html#default_type)
## global-rate-limit
* `global-rate-limit-status-code`: configure HTTP status code to return when rejecting requests. Defaults to 429.
Configure `memcached` client for [Global Rate Limiting](https://github.com/kubernetes/ingress-nginx/blob/master/docs/user-guide/nginx-configuration/annotations.md#global-rate-limiting).
* `global-rate-limit-memcached-host`: IP/FQDN of memcached server to use. Required to enable Global Rate Limiting.
* `global-rate-limit-memcached-port`: port of memcached server to use. Defaults default memcached port of `11211`.
* `global-rate-limit-memcached-connect-timeout`: configure timeout for connect, send and receive operations. Unit is millisecond. Defaults to 50ms.
* `global-rate-limit-memcached-max-idle-timeout`: configure timeout for cleaning idle connections. Unit is millisecond. Defaults to 50ms.
* `global-rate-limit-memcached-pool-size`: configure number of max connections to keep alive. Make sure your `memcached` server can handle
`global-rate-limit-memcached-pool-size * worker-processes * <number of ingress-nginx replicas>` simultaneous connections.
These settings get used by [lua-resty-global-throttle](https://github.com/ElvinEfendi/lua-resty-global-throttle)
that ingress-nginx includes. Refer to the link to learn more about `lua-resty-global-throttle`.

View file

@ -40,6 +40,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/customhttperrors"
"k8s.io/ingress-nginx/internal/ingress/annotations/defaultbackend"
"k8s.io/ingress-nginx/internal/ingress/annotations/fastcgi"
"k8s.io/ingress-nginx/internal/ingress/annotations/globalratelimit"
"k8s.io/ingress-nginx/internal/ingress/annotations/http2pushpreload"
"k8s.io/ingress-nginx/internal/ingress/annotations/influxdb"
"k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist"
@ -94,6 +95,7 @@ type Ingress struct {
Proxy proxy.Config
ProxySSL proxyssl.Config
RateLimit ratelimit.Config
GlobalRateLimit globalratelimit.Config
Redirect redirect.Config
Rewrite rewrite.Config
Satisfy string
@ -142,6 +144,7 @@ func NewAnnotationExtractor(cfg resolver.Resolver) Extractor {
"Proxy": proxy.NewParser(cfg),
"ProxySSL": proxyssl.NewParser(cfg),
"RateLimit": ratelimit.NewParser(cfg),
"GlobalRateLimit": globalratelimit.NewParser(cfg),
"Redirect": redirect.NewParser(cfg),
"Rewrite": rewrite.NewParser(cfg),
"Satisfy": satisfy.NewParser(cfg),

View file

@ -0,0 +1,111 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package globalratelimit
import (
"strings"
"time"
"github.com/pkg/errors"
networking "k8s.io/api/networking/v1beta1"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
ing_errors "k8s.io/ingress-nginx/internal/ingress/errors"
"k8s.io/ingress-nginx/internal/ingress/resolver"
"k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/sets"
)
const defaultKey = "$remote_addr"
// Config encapsulates all global rate limit attributes
type Config struct {
Namespace string `json:"namespace"`
Limit int `json:"limit"`
WindowSize int `json:"window-size"`
Key string `json:"key"`
IgnoredCIDRs []string `json:"ignored-cidrs"`
}
// Equal tests for equality between two Config types
func (l *Config) Equal(r *Config) bool {
if l.Namespace != r.Namespace {
return false
}
if l.Limit != r.Limit {
return false
}
if l.WindowSize != r.WindowSize {
return false
}
if l.Key != r.Key {
return false
}
if len(l.IgnoredCIDRs) != len(r.IgnoredCIDRs) || !sets.StringElementsMatch(l.IgnoredCIDRs, r.IgnoredCIDRs) {
return false
}
return true
}
type globalratelimit struct {
r resolver.Resolver
}
// NewParser creates a new globalratelimit annotation parser
func NewParser(r resolver.Resolver) parser.IngressAnnotation {
return globalratelimit{r}
}
// Parse extracts globalratelimit annotations from the given ingress
// and returns them structured as Config type
func (a globalratelimit) Parse(ing *networking.Ingress) (interface{}, error) {
config := &Config{}
limit, _ := parser.GetIntAnnotation("global-rate-limit", ing)
rawWindowSize, _ := parser.GetStringAnnotation("global-rate-limit-window", ing)
if limit == 0 || len(rawWindowSize) == 0 {
return config, nil
}
windowSize, err := time.ParseDuration(rawWindowSize)
if err != nil {
return config, ing_errors.LocationDenied{
Reason: errors.Wrap(err, "failed to parse 'global-rate-limit-window' value"),
}
}
key, _ := parser.GetStringAnnotation("global-rate-limit-key", ing)
if len(key) == 0 {
key = defaultKey
}
rawIgnoredCIDRs, _ := parser.GetStringAnnotation("global-rate-limit-ignored-cidrs", ing)
ignoredCIDRs, err := net.ParseCIDRs(rawIgnoredCIDRs)
if err != nil {
return nil, err
}
config.Namespace = strings.Replace(string(ing.UID), "-", "", -1)
config.Limit = limit
config.WindowSize = int(windowSize.Seconds())
config.Key = key
config.IgnoredCIDRs = ignoredCIDRs
return config, nil
}

View file

@ -0,0 +1,179 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package globalratelimit
import (
"encoding/json"
"fmt"
"testing"
"github.com/pkg/errors"
api "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1beta1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
ing_errors "k8s.io/ingress-nginx/internal/ingress/errors"
"k8s.io/ingress-nginx/internal/ingress/resolver"
)
const UID = "31285d47-b150-4dcf-bd6f-12c46d769f6e"
const expectedUID = "31285d47b1504dcfbd6f12c46d769f6e"
func buildIngress() *networking.Ingress {
defaultBackend := networking.IngressBackend{
ServiceName: "default-backend",
ServicePort: intstr.FromInt(80),
}
return &networking.Ingress{
ObjectMeta: meta_v1.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
UID: UID,
},
Spec: networking.IngressSpec{
Backend: &networking.IngressBackend{
ServiceName: "default-backend",
ServicePort: intstr.FromInt(80),
},
Rules: []networking.IngressRule{
{
Host: "foo.bar.com",
IngressRuleValue: networking.IngressRuleValue{
HTTP: &networking.HTTPIngressRuleValue{
Paths: []networking.HTTPIngressPath{
{
Path: "/foo",
Backend: defaultBackend,
},
},
},
},
},
},
},
}
}
type mockBackend struct {
resolver.Mock
}
func TestGlobalRateLimiting(t *testing.T) {
ing := buildIngress()
annRateLimit := parser.GetAnnotationWithPrefix("global-rate-limit")
annRateLimitWindow := parser.GetAnnotationWithPrefix("global-rate-limit-window")
annRateLimitKey := parser.GetAnnotationWithPrefix("global-rate-limit-key")
annRateLimitIgnoredCIDRs := parser.GetAnnotationWithPrefix("global-rate-limit-ignored-cidrs")
testCases := []struct {
title string
annotations map[string]string
expectedConfig *Config
expectedErr error
}{
{
"no annotation",
nil,
&Config{},
nil,
},
{
"minimum required annotations",
map[string]string{
annRateLimit: "100",
annRateLimitWindow: "2m",
},
&Config{
Namespace: expectedUID,
Limit: 100,
WindowSize: 120,
Key: "$remote_addr",
IgnoredCIDRs: make([]string, 0),
},
nil,
},
{
"global-rate-limit-key annotation",
map[string]string{
annRateLimit: "100",
annRateLimitWindow: "2m",
annRateLimitKey: "$http_x_api_user",
},
&Config{
Namespace: expectedUID,
Limit: 100,
WindowSize: 120,
Key: "$http_x_api_user",
IgnoredCIDRs: make([]string, 0),
},
nil,
},
{
"global-rate-limit-ignored-cidrs annotation",
map[string]string{
annRateLimit: "100",
annRateLimitWindow: "2m",
annRateLimitKey: "$http_x_api_user",
annRateLimitIgnoredCIDRs: "127.0.0.1, 200.200.24.0/24",
},
&Config{
Namespace: expectedUID,
Limit: 100,
WindowSize: 120,
Key: "$http_x_api_user",
IgnoredCIDRs: []string{"127.0.0.1", "200.200.24.0/24"},
},
nil,
},
{
"incorrect duration for window",
map[string]string{
annRateLimit: "100",
annRateLimitWindow: "2mb",
annRateLimitKey: "$http_x_api_user",
},
&Config{},
ing_errors.LocationDenied{
Reason: errors.Wrap(fmt.Errorf(`time: unknown unit "mb" in duration "2mb"`),
"failed to parse 'global-rate-limit-window' value"),
},
},
}
for _, testCase := range testCases {
ing.SetAnnotations(testCase.annotations)
i, actualErr := NewParser(mockBackend{}).Parse(ing)
if (testCase.expectedErr == nil || actualErr == nil) && testCase.expectedErr != actualErr {
t.Errorf("expected error 'nil' but got '%v'", actualErr)
} else if testCase.expectedErr != nil && actualErr != nil &&
testCase.expectedErr.Error() != actualErr.Error() {
t.Errorf("expected error '%v' but got '%v'", testCase.expectedErr, actualErr)
}
actualConfig := i.(*Config)
if !testCase.expectedConfig.Equal(actualConfig) {
expectedJSON, _ := json.Marshal(testCase.expectedConfig)
actualJSON, _ := json.Marshal(actualConfig)
t.Errorf("%v: expected config '%s' but got '%s'", testCase.title, expectedJSON, actualJSON)
}
}
}

View file

@ -708,6 +708,31 @@ type Configuration struct {
// http://nginx.org/en/docs/http/ngx_http_core_module.html#default_type
// Default: text/html
DefaultType string `json:"default-type"`
// GlobalRateLimitMemcachedHost configures memcached host.
GlobalRateLimitMemcachedHost string `json:"global-rate-limit-memcached-host"`
// GlobalRateLimitMemcachedPort configures memcached port.
GlobalRateLimitMemcachedPort int `json:"global-rate-limit-memcached-port"`
// GlobalRateLimitMemcachedConnectTimeout configures timeout when connecting to memcached.
// The unit is millisecond.
GlobalRateLimitMemcachedConnectTimeout int `json:"global-rate-limit-memcached-connect-timeout"`
// GlobalRateLimitMemcachedMaxIdleTimeout configured how long connections
// should be kept alive in idle state. The unit is millisecond.
GlobalRateLimitMemcachedMaxIdleTimeout int `json:"global-rate-limit-memcached-max-idle-timeout"`
// GlobalRateLimitMemcachedPoolSize configures how many connections
// should be kept alive in the pool.
// Note that this is per NGINX worker. Make sure your memcached server can
// handle `MemcachedPoolSize * <nginx worker count> * <nginx replica count>`
// simultaneous connections.
GlobalRateLimitMemcachedPoolSize int `json:"global-rate-limit-memcached-pool-size"`
// GlobalRateLimitStatucCode determines the HTTP status code to return
// when limit is exceeding during global rate limiting.
GlobalRateLimitStatucCode int `json:"global-rate-limit-status-code"`
}
// NewDefault returns the default nginx configuration
@ -829,35 +854,40 @@ func NewDefault() Configuration {
ProxyHTTPVersion: "1.1",
ProxyMaxTempFileSize: "1024m",
},
UpstreamKeepaliveConnections: 320,
UpstreamKeepaliveTimeout: 60,
UpstreamKeepaliveRequests: 10000,
LimitConnZoneVariable: defaultLimitConnZoneVariable,
BindAddressIpv4: defBindAddress,
BindAddressIpv6: defBindAddress,
ZipkinCollectorPort: 9411,
ZipkinServiceName: "nginx",
ZipkinSampleRate: 1.0,
JaegerCollectorPort: 6831,
JaegerServiceName: "nginx",
JaegerSamplerType: "const",
JaegerSamplerParam: "1",
JaegerSamplerPort: 5778,
JaegerSamplerHost: "http://127.0.0.1",
DatadogServiceName: "nginx",
DatadogEnvironment: "prod",
DatadogCollectorPort: 8126,
DatadogOperationNameOverride: "nginx.handle",
DatadogSampleRate: 1.0,
DatadogPrioritySampling: true,
LimitReqStatusCode: 503,
LimitConnStatusCode: 503,
SyslogPort: 514,
NoTLSRedirectLocations: "/.well-known/acme-challenge",
NoAuthLocations: "/.well-known/acme-challenge",
GlobalExternalAuth: defGlobalExternalAuth,
ProxySSLLocationOnly: false,
DefaultType: "text/html",
UpstreamKeepaliveConnections: 320,
UpstreamKeepaliveTimeout: 60,
UpstreamKeepaliveRequests: 10000,
LimitConnZoneVariable: defaultLimitConnZoneVariable,
BindAddressIpv4: defBindAddress,
BindAddressIpv6: defBindAddress,
ZipkinCollectorPort: 9411,
ZipkinServiceName: "nginx",
ZipkinSampleRate: 1.0,
JaegerCollectorPort: 6831,
JaegerServiceName: "nginx",
JaegerSamplerType: "const",
JaegerSamplerParam: "1",
JaegerSamplerPort: 5778,
JaegerSamplerHost: "http://127.0.0.1",
DatadogServiceName: "nginx",
DatadogEnvironment: "prod",
DatadogCollectorPort: 8126,
DatadogOperationNameOverride: "nginx.handle",
DatadogSampleRate: 1.0,
DatadogPrioritySampling: true,
LimitReqStatusCode: 503,
LimitConnStatusCode: 503,
SyslogPort: 514,
NoTLSRedirectLocations: "/.well-known/acme-challenge",
NoAuthLocations: "/.well-known/acme-challenge",
GlobalExternalAuth: defGlobalExternalAuth,
ProxySSLLocationOnly: false,
DefaultType: "text/html",
GlobalRateLimitMemcachedPort: 11211,
GlobalRateLimitMemcachedConnectTimeout: 50,
GlobalRateLimitMemcachedMaxIdleTimeout: 10000,
GlobalRateLimitMemcachedPoolSize: 50,
GlobalRateLimitStatucCode: 429,
}
if klog.V(5).Enabled() {

View file

@ -232,6 +232,17 @@ func (n *NGINXController) CheckIngress(ing *networking.Ingress) error {
k8s.SetDefaultNGINXPathType(ing)
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
if len(cfg.GlobalRateLimitMemcachedHost) == 0 {
for key := range ing.ObjectMeta.GetAnnotations() {
if strings.HasPrefix(key, fmt.Sprintf("%s/%s", parser.AnnotationsPrefix, "global-rate-limit")) {
return fmt.Errorf("'global-rate-limit*' annotations require 'global-rate-limit-memcached-host' settings configured in the global configmap")
}
}
}
allIngresses := n.store.ListIngresses()
filter := func(toCheck *ingress.Ingress) bool {
@ -244,9 +255,6 @@ func (n *NGINXController) CheckIngress(ing *networking.Ingress) error {
ParsedAnnotations: annotations.NewAnnotationExtractor(n.store).Extract(ing),
})
cfg := n.store.GetBackendConfiguration()
cfg.Resolver = n.resolver
_, servers, pcfg := n.getConfiguration(ings)
err := checkOverlap(ing, allIngresses, servers)
@ -1253,6 +1261,7 @@ func locationApplyAnnotations(loc *ingress.Location, anns *annotations.Ingress)
loc.Proxy = anns.Proxy
loc.ProxySSL = anns.ProxySSL
loc.RateLimit = anns.RateLimit
loc.GlobalRateLimit = anns.GlobalRateLimit
loc.Redirect = anns.Redirect
loc.Rewrite = anns.Rewrite
loc.UpstreamVhost = anns.UpstreamVhost

View file

@ -75,6 +75,7 @@ var (
"balancer_ewma_locks": 1,
"certificate_servers": 5,
"ocsp_response_cache": 5, // keep this same as certificate_servers
"global_throttle_cache": 10,
}
defaultGlobalAuthRedirectParam = "rd"
)

View file

@ -289,6 +289,13 @@ func configForLua(input interface{}) string {
hsts_max_age = %v,
hsts_include_subdomains = %t,
hsts_preload = %t,
global_throttle = {
memcached = {
host = "%v", port = %d, connect_timeout = %d, max_idle_timeout = %d, pool_size = %d,
},
status_code = %d,
}
}`,
all.Cfg.UseForwardedHeaders,
all.Cfg.UseProxyProtocol,
@ -301,6 +308,13 @@ func configForLua(input interface{}) string {
all.Cfg.HSTSMaxAge,
all.Cfg.HSTSIncludeSubdomains,
all.Cfg.HSTSPreload,
all.Cfg.GlobalRateLimitMemcachedHost,
all.Cfg.GlobalRateLimitMemcachedPort,
all.Cfg.GlobalRateLimitMemcachedConnectTimeout,
all.Cfg.GlobalRateLimitMemcachedMaxIdleTimeout,
all.Cfg.GlobalRateLimitMemcachedPoolSize,
all.Cfg.GlobalRateLimitStatucCode,
)
}
@ -318,16 +332,28 @@ func locationConfigForLua(l interface{}, a interface{}) string {
return "{}"
}
ignoredCIDRs, err := convertGoSliceIntoLuaTable(location.GlobalRateLimit.IgnoredCIDRs, false)
if err != nil {
klog.Errorf("failed to convert %v into Lua table: %q", location.GlobalRateLimit.IgnoredCIDRs, err)
ignoredCIDRs = "{}"
}
return fmt.Sprintf(`{
force_ssl_redirect = %t,
ssl_redirect = %t,
force_no_ssl_redirect = %t,
use_port_in_redirects = %t,
global_throttle = { namespace = "%v", limit = %d, window_size = %d, key = %v, ignored_cidrs = %v },
}`,
location.Rewrite.ForceSSLRedirect,
location.Rewrite.SSLRedirect,
isLocationInLocationList(l, all.Cfg.NoTLSRedirectLocations),
location.UsePortInRedirects,
location.GlobalRateLimit.Namespace,
location.GlobalRateLimit.Limit,
location.GlobalRateLimit.WindowSize,
parseComplexNginxVarIntoLuaTable(location.GlobalRateLimit.Key),
ignoredCIDRs,
)
}
@ -1501,3 +1527,51 @@ func buildServerName(hostname string) string {
return `~^(?<subdomain>[\w-]+)\.` + strings.Join(parts, "\\.") + `$`
}
// parseComplexNGINXVar parses things like "$my${complex}ngx\$var" into
// [["$var", "complex", "my", "ngx"]]. In other words, 2nd and 3rd elements
// in the result are actual NGINX variable names, whereas first and 4th elements
// are string literals.
func parseComplexNginxVarIntoLuaTable(ngxVar string) string {
r := regexp.MustCompile(`(\\\$[0-9a-zA-Z_]+)|\$\{([0-9a-zA-Z_]+)\}|\$([0-9a-zA-Z_]+)|(\$|[^$\\]+)`)
matches := r.FindAllStringSubmatch(ngxVar, -1)
components := make([][]string, len(matches))
for i, match := range matches {
components[i] = match[1:]
}
luaTable, err := convertGoSliceIntoLuaTable(components, true)
if err != nil {
klog.Errorf("unexpected error: %v", err)
luaTable = "{}"
}
return luaTable
}
func convertGoSliceIntoLuaTable(goSliceInterface interface{}, emptyStringAsNil bool) (string, error) {
goSlice := reflect.ValueOf(goSliceInterface)
kind := goSlice.Kind()
switch kind {
case reflect.String:
if emptyStringAsNil && len(goSlice.Interface().(string)) == 0 {
return "nil", nil
}
return fmt.Sprintf(`"%v"`, goSlice.Interface()), nil
case reflect.Int, reflect.Bool:
return fmt.Sprintf(`%v`, goSlice.Interface()), nil
case reflect.Slice, reflect.Array:
luaTable := "{ "
for i := 0; i < goSlice.Len(); i++ {
luaEl, err := convertGoSliceIntoLuaTable(goSlice.Index(i).Interface(), emptyStringAsNil)
if err != nil {
return "", err
}
luaTable = luaTable + luaEl + ", "
}
luaTable += "}"
return luaTable, nil
default:
return "", fmt.Errorf("could not process type: %s", kind)
}
}

View file

@ -1472,3 +1472,86 @@ func TestBuildServerName(t *testing.T) {
}
}
}
func TestParseComplexNginxVarIntoLuaTable(t *testing.T) {
testCases := []struct {
ngxVar string
expectedLuaTable string
}{
{"foo", `{ { nil, nil, nil, "foo", }, }`},
{"$foo", `{ { nil, nil, "foo", nil, }, }`},
{"${foo}", `{ { nil, "foo", nil, nil, }, }`},
{"\\$foo", `{ { "\$foo", nil, nil, nil, }, }`},
{
"foo\\$bar$baz${daz}xiyar$pomidor",
`{ { nil, nil, nil, "foo", }, { "\$bar", nil, nil, nil, }, { nil, nil, "baz", nil, }, ` +
`{ nil, "daz", nil, nil, }, { nil, nil, nil, "xiyar", }, { nil, nil, "pomidor", nil, }, }`,
},
}
for _, testCase := range testCases {
actualLuaTable := parseComplexNginxVarIntoLuaTable(testCase.ngxVar)
if actualLuaTable != testCase.expectedLuaTable {
t.Errorf("expected %v but returned %v", testCase.expectedLuaTable, actualLuaTable)
}
}
}
func TestConvertGoSliceIntoLuaTablet(t *testing.T) {
testCases := []struct {
title string
goSlice interface{}
emptyStringAsNil bool
expectedLuaTable string
expectedErr error
}{
{
"flat string slice",
[]string{"one", "two", "three"},
false,
`{ "one", "two", "three", }`,
nil,
},
{
"nested string slice",
[][]string{{"one", "", "three"}, {"foo", "bar"}},
false,
`{ { "one", "", "three", }, { "foo", "bar", }, }`,
nil,
},
{
"converts empty string to nil when enabled",
[][]string{{"one", "", "three"}, {"foo", "bar"}},
true,
`{ { "one", nil, "three", }, { "foo", "bar", }, }`,
nil,
},
{
"boolean slice",
[]bool{true, true, false},
false,
`{ true, true, false, }`,
nil,
},
{
"integer slice",
[]int{4, 3, 6},
false,
`{ 4, 3, 6, }`,
nil,
},
}
for _, testCase := range testCases {
actualLuaTable, err := convertGoSliceIntoLuaTable(testCase.goSlice, testCase.emptyStringAsNil)
if testCase.expectedErr != nil && err != nil && testCase.expectedErr.Error() != err.Error() {
t.Errorf("expected error '%v' but returned '%v'", testCase.expectedErr, err)
}
if testCase.expectedErr == nil && err != nil {
t.Errorf("expected error to be nil but returned '%v'", err)
}
if testCase.expectedLuaTable != actualLuaTable {
t.Errorf("%v: expected '%v' but returned '%v'", testCase.title, testCase.expectedLuaTable, actualLuaTable)
}
}
}

View file

@ -28,6 +28,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/connection"
"k8s.io/ingress-nginx/internal/ingress/annotations/cors"
"k8s.io/ingress-nginx/internal/ingress/annotations/fastcgi"
"k8s.io/ingress-nginx/internal/ingress/annotations/globalratelimit"
"k8s.io/ingress-nginx/internal/ingress/annotations/influxdb"
"k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist"
"k8s.io/ingress-nginx/internal/ingress/annotations/log"
@ -270,6 +271,10 @@ type Location struct {
// The Redirect annotation precedes RateLimit
// +optional
RateLimit ratelimit.Config `json:"rateLimit,omitempty"`
// GlobalRateLimit similar to RateLimit
// but this is applied globally across multiple replicas.
// +optional
GlobalRateLimit globalratelimit.Config `json:"globalRateLimit,omitempty"`
// Redirect describes a temporal o permanent redirection this location.
// +optional
Redirect redirect.Config `json:"redirect,omitempty"`

View file

@ -379,6 +379,9 @@ func (l1 *Location) Equal(l2 *Location) bool {
if !(&l1.RateLimit).Equal(&l2.RateLimit) {
return false
}
if !(&l1.GlobalRateLimit).Equal(&l2.GlobalRateLimit) {
return false
}
if !(&l1.Redirect).Equal(&l2.Redirect) {
return false
}

View file

@ -0,0 +1,131 @@
local resty_global_throttle = require("resty.global_throttle")
local resty_ipmatcher = require("resty.ipmatcher")
local util = require("util")
local ngx = ngx
local ngx_exit = ngx.exit
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_INFO = ngx.INFO
local _M = {}
local DECISION_CACHE = ngx.shared.global_throttle_cache
-- it does not make sense to cache decision for too little time
-- the benefit of caching likely is negated if we cache for too little time
-- Lua Shared Dict's time resolution for expiry is 0.001.
local CACHE_THRESHOLD = 0.001
local DEFAULT_RAW_KEY = "remote_addr"
local function should_ignore_request(ignored_cidrs)
if not ignored_cidrs or #ignored_cidrs == 0 then
return false
end
local ignored_cidrs_matcher, err = resty_ipmatcher.new(ignored_cidrs)
if not ignored_cidrs_matcher then
ngx_log(ngx_ERR, "failed to initialize resty-ipmatcher: ", err)
return false
end
local is_ignored
is_ignored, err = ignored_cidrs_matcher:match(ngx.var.remote_addr)
if err then
ngx_log(ngx_ERR, "failed to match ip: '",
ngx.var.remote_addr, "': ", err)
return false
end
return is_ignored
end
local function is_enabled(config, location_config)
if config.memcached.host == "" or config.memcached.port == 0 then
return false
end
if location_config.limit == 0 or
location_config.window_size == 0 then
return false
end
if should_ignore_request(location_config.ignored_cidrs) then
return false
end
return true
end
local function get_namespaced_key_value(namespace, key_value)
return namespace .. key_value
end
function _M.throttle(config, location_config)
if not is_enabled(config, location_config) then
return
end
local key_value = util.generate_var_value(location_config.key)
if not key_value or key_value == "" then
key_value = ngx.var[DEFAULT_RAW_KEY]
end
local namespaced_key_value =
get_namespaced_key_value(location_config.namespace, key_value)
local is_limit_exceeding = DECISION_CACHE:get(namespaced_key_value)
if is_limit_exceeding then
ngx.var.global_rate_limit_exceeding = "c"
return ngx_exit(config.status_code)
end
local my_throttle, err = resty_global_throttle.new(
location_config.namespace,
location_config.limit,
location_config.window_size,
{
provider = "memcached",
host = config.memcached.host,
port = config.memcached.port,
connect_timeout = config.memcached.connect_timeout,
max_idle_timeout = config.memcached.max_idle_timeout,
pool_size = config.memcached.pool_size,
}
)
if err then
ngx.log(ngx.ERR, "faled to initialize resty_global_throttle: ", err)
-- fail open
return
end
local desired_delay, estimated_final_count
estimated_final_count, desired_delay, err = my_throttle:process(key_value)
if err then
ngx.log(ngx.ERR, "error while processing key: ", err)
-- fail open
return
end
if desired_delay then
if desired_delay > CACHE_THRESHOLD then
local ok
ok, err =
DECISION_CACHE:safe_add(namespaced_key_value, true, desired_delay)
if not ok then
if err ~= "exists" then
ngx_log(ngx_ERR, "failed to cache decision: ", err)
end
end
end
ngx.var.global_rate_limit_exceeding = "y"
ngx_log(ngx_INFO, "limit is exceeding for ",
location_config.namespace, "/", key_value,
" with estimated_final_count: ", estimated_final_count)
return ngx_exit(config.status_code)
end
end
return _M

View file

@ -2,6 +2,7 @@ local ngx_re_split = require("ngx.re").split
local certificate_configured_for_current_request =
require("certificate").configured_for_current_request
local global_throttle = require("global_throttle")
local ngx = ngx
local io = io
@ -160,6 +161,8 @@ function _M.rewrite(location_config)
return ngx_redirect(uri, config.http_redirect_code)
end
global_throttle.throttle(config.global_throttle, location_config.global_throttle)
end
function _M.header()

View file

@ -0,0 +1,258 @@
local util = require("util")
local function assert_request_rejected(config, location_config, opts)
stub(ngx, "exit")
local global_throttle = require_without_cache("global_throttle")
assert.has_no.errors(function()
global_throttle.throttle(config, location_config)
end)
assert.stub(ngx.exit).was_called_with(config.status_code)
if opts.with_cache then
assert.are.same("c", ngx.var.global_rate_limit_exceeding)
else
assert.are.same("y", ngx.var.global_rate_limit_exceeding)
end
end
local function assert_request_not_rejected(config, location_config)
stub(ngx, "exit")
local cache_safe_add_spy = spy.on(ngx.shared.global_throttle_cache, "safe_add")
local global_throttle = require_without_cache("global_throttle")
assert.has_no.errors(function()
global_throttle.throttle(config, location_config)
end)
assert.stub(ngx.exit).was_not_called()
assert.is_nil(ngx.var.global_rate_limit_exceeding)
assert.spy(cache_safe_add_spy).was_not_called()
end
local function assert_short_circuits(f)
local cache_get_spy = spy.on(ngx.shared.global_throttle_cache, "get")
local resty_global_throttle = require_without_cache("resty.global_throttle")
local resty_global_throttle_new_spy = spy.on(resty_global_throttle, "new")
local global_throttle = require_without_cache("global_throttle")
f(global_throttle)
assert.spy(resty_global_throttle_new_spy).was_not_called()
assert.spy(cache_get_spy).was_not_called()
end
local function assert_fails_open(config, location_config, ...)
stub(ngx, "exit")
stub(ngx, "log")
local global_throttle = require_without_cache("global_throttle")
assert.has_no.errors(function()
global_throttle.throttle(config, location_config)
end)
assert.stub(ngx.exit).was_not_called()
assert.stub(ngx.log).was_called_with(ngx.ERR, ...)
assert.is_nil(ngx.var.global_rate_limit_exceeding)
end
local function stub_resty_global_throttle_process(ret1, ret2, ret3, f)
local resty_global_throttle = require_without_cache("resty.global_throttle")
local resty_global_throttle_mock = {
process = function(self, key) return ret1, ret2, ret3 end
}
stub(resty_global_throttle, "new", resty_global_throttle_mock)
f()
assert.stub(resty_global_throttle.new).was_called()
end
local function cache_rejection_decision(namespace, key_value, desired_delay)
local namespaced_key_value = namespace .. key_value
local ok, err = ngx.shared.global_throttle_cache:safe_add(namespaced_key_value, true, desired_delay)
assert.is_nil(err)
assert.is_true(ok)
assert.is_true(ngx.shared.global_throttle_cache:get(namespaced_key_value))
end
describe("global_throttle", function()
local snapshot
local NAMESPACE = "31285d47b1504dcfbd6f12c46d769f6e"
local LOCATION_CONFIG = {
namespace = NAMESPACE,
limit = 10,
window_size = 60,
key = {},
ignored_cidrs = {},
}
local CONFIG = {
memcached = {
host = "memc.default.svc.cluster.local", port = 11211,
connect_timeout = 50, max_idle_timeout = 10000, pool_size = 50,
},
status_code = 429,
}
before_each(function()
snapshot = assert:snapshot()
ngx.var = { remote_addr = "127.0.0.1", global_rate_limit_exceeding = nil }
end)
after_each(function()
snapshot:revert()
ngx.shared.global_throttle_cache:flush_all()
reset_ngx()
end)
it("short circuits when memcached is not configured", function()
assert_short_circuits(function(global_throttle)
assert.has_no.errors(function()
global_throttle.throttle({ memcached = { host = "", port = 0 } }, LOCATION_CONFIG)
end)
end)
end)
it("short circuits when limit or window_size is not configured", function()
assert_short_circuits(function(global_throttle)
local location_config_copy = util.deepcopy(LOCATION_CONFIG)
location_config_copy.limit = 0
assert.has_no.errors(function()
global_throttle.throttle(CONFIG, location_config_copy)
end)
end)
assert_short_circuits(function(global_throttle)
local location_config_copy = util.deepcopy(LOCATION_CONFIG)
location_config_copy.window_size = 0
assert.has_no.errors(function()
global_throttle.throttle(CONFIG, location_config_copy)
end)
end)
end)
it("short circuits when remote_addr is in ignored_cidrs", function()
local global_throttle = require_without_cache("global_throttle")
local location_config = util.deepcopy(LOCATION_CONFIG)
location_config.ignored_cidrs = { ngx.var.remote_addr }
assert_short_circuits(function(global_throttle)
assert.has_no.errors(function()
global_throttle.throttle(CONFIG, location_config)
end)
end)
end)
it("rejects when exceeding limit has already been cached", function()
local key_value = "foo"
local location_config = util.deepcopy(LOCATION_CONFIG)
location_config.key = { { nil, nil, nil, key_value } }
cache_rejection_decision(NAMESPACE, key_value, 0.5)
assert_request_rejected(CONFIG, location_config, { with_cache = true })
end)
describe("when resty_global_throttle fails", function()
it("fails open in case of initialization error", function()
local too_long_namespace = ""
for i=1,36,1 do
too_long_namespace = too_long_namespace .. "a"
end
local location_config = util.deepcopy(LOCATION_CONFIG)
location_config.namespace = too_long_namespace
assert_fails_open(CONFIG, location_config, "faled to initialize resty_global_throttle: ", "'namespace' can be at most 35 characters")
end)
it("fails open in case of key processing error", function()
stub_resty_global_throttle_process(nil, nil, "failed to process", function()
assert_fails_open(CONFIG, LOCATION_CONFIG, "error while processing key: ", "failed to process")
end)
end)
end)
it("initializes resty_global_throttle with the right parameters", function()
local resty_global_throttle = require_without_cache("resty.global_throttle")
local resty_global_throttle_original_new = resty_global_throttle.new
resty_global_throttle.new = function(namespace, limit, window_size, store_opts)
local o, err = resty_global_throttle_original_new(namespace, limit, window_size, store_opts)
if not o then
return nil, err
end
o.process = function(self, key) return 1, nil, nil end
local expected = LOCATION_CONFIG
assert.are.same(expected.namespace, namespace)
assert.are.same(expected.limit, limit)
assert.are.same(expected.window_size, window_size)
assert.are.same("memcached", store_opts.provider)
assert.are.same(CONFIG.memcached.host, store_opts.host)
assert.are.same(CONFIG.memcached.port, store_opts.port)
assert.are.same(CONFIG.memcached.connect_timeout, store_opts.connect_timeout)
assert.are.same(CONFIG.memcached.max_idle_timeout, store_opts.max_idle_timeout)
assert.are.same(CONFIG.memcached.pool_size, store_opts.pool_size)
return o, nil
end
local resty_global_throttle_new_spy = spy.on(resty_global_throttle, "new")
local global_throttle = require_without_cache("global_throttle")
assert.has_no.errors(function()
global_throttle.throttle(CONFIG, LOCATION_CONFIG)
end)
assert.spy(resty_global_throttle_new_spy).was_called()
end)
it("rejects request and caches decision when limit is exceeding after processing a key", function()
local desired_delay = 0.015
stub_resty_global_throttle_process(LOCATION_CONFIG.limit + 1, desired_delay, nil, function()
assert_request_rejected(CONFIG, LOCATION_CONFIG, { with_cache = false })
local cache_key = LOCATION_CONFIG.namespace .. ngx.var.remote_addr
assert.is_true(ngx.shared.global_throttle_cache:get(cache_key))
-- we assume it won't take more than this after caching
-- until we execute the assertion below
local delta = 0.001
local ttl = ngx.shared.global_throttle_cache:ttl(cache_key)
assert.is_true(ttl > desired_delay - delta)
assert.is_true(ttl <= desired_delay)
end)
end)
it("rejects request and skip caching of decision when limit is exceeding after processing a key but desired delay is lower than the threshold", function()
local desired_delay = 0.0009
stub_resty_global_throttle_process(LOCATION_CONFIG.limit, desired_delay, nil, function()
assert_request_rejected(CONFIG, LOCATION_CONFIG, { with_cache = false })
local cache_key = LOCATION_CONFIG.namespace .. ngx.var.remote_addr
assert.is_nil(ngx.shared.global_throttle_cache:get(cache_key))
end)
end)
it("allows the request when limit is not exceeding after processing a key", function()
stub_resty_global_throttle_process(LOCATION_CONFIG.limit - 3, nil, nil,
function()
assert_request_not_rejected(CONFIG, LOCATION_CONFIG)
end
)
end)
it("rejects with custom status code", function()
cache_rejection_decision(NAMESPACE, ngx.var.remote_addr, 0.3)
local config = util.deepcopy(CONFIG)
config.status_code = 503
assert_request_rejected(config, LOCATION_CONFIG, { with_cache = true })
end)
end)

View file

@ -1085,6 +1085,7 @@ stream {
set $service_name {{ $ing.Service | quote }};
set $service_port {{ $ing.ServicePort | quote }};
set $location_path {{ $ing.Path | escapeLiteralDollar | quote }};
set $global_rate_limit_exceeding n;
{{ buildOpentracingForLocation $all.Cfg.EnableOpentracing $location }}

View file

@ -0,0 +1,83 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package annotations
import (
"fmt"
"net/http"
"strings"
"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"
"k8s.io/ingress-nginx/test/e2e/framework"
)
var _ = framework.DescribeAnnotation("annotation-global-rate-limit", func() {
f := framework.NewDefaultFramework("global-rate-limit")
host := "global-rate-limit-annotation"
ginkgo.BeforeEach(func() {
f.NewEchoDeployment()
})
ginkgo.It("generates correct configuration", func() {
annotations := make(map[string]string)
annotations["nginx.ingress.kubernetes.io/global-rate-limit"] = "5"
annotations["nginx.ingress.kubernetes.io/global-rate-limit-window"] = "2m"
ing := framework.NewSingleIngress(host, "/", host, f.Namespace, framework.EchoService, 80, annotations)
ing = f.EnsureIngress(ing)
namespace := strings.Replace(string(ing.UID), "-", "", -1)
serverConfig := ""
f.WaitForNginxServer(host, func(server string) bool {
serverConfig = server
return true
})
assert.Contains(ginkgo.GinkgoT(), serverConfig,
fmt.Sprintf(`global_throttle = { namespace = "%v", `+
`limit = 5, window_size = 120, key = { { nil, nil, "remote_addr", nil, }, }, `+
`ignored_cidrs = { } }`,
namespace))
f.HTTPTestClient().GET("/").WithHeader("Host", host).Expect().Status(http.StatusOK)
ginkgo.By("regenerating the correct configuration after update")
annotations["nginx.ingress.kubernetes.io/global-rate-limit-key"] = "${remote_addr}${http_x_api_client}"
annotations["nginx.ingress.kubernetes.io/global-rate-limit-ignored-cidrs"] = "192.168.1.1, 234.234.234.0/24"
ing.SetAnnotations(annotations)
f.WaitForReload(func() {
ing = f.UpdateIngress(ing)
})
serverConfig = ""
f.WaitForNginxServer(host, func(server string) bool {
serverConfig = server
return true
})
assert.Contains(ginkgo.GinkgoT(), serverConfig,
fmt.Sprintf(`global_throttle = { namespace = "%v", `+
`limit = 5, window_size = 120, `+
`key = { { nil, "remote_addr", nil, nil, }, { nil, "http_x_api_client", nil, nil, }, }, `+
`ignored_cidrs = { "192.168.1.1", "234.234.234.0/24", } }`,
namespace))
f.HTTPTestClient().GET("/").WithHeader("Host", host).Expect().Status(http.StatusOK)
})
})

View file

@ -215,7 +215,8 @@ func (f *Framework) updateIngressNGINXPod() error {
return err
}
// WaitForNginxServer waits until the nginx configuration contains a particular server section
// WaitForNginxServer waits until the nginx configuration contains a particular server section.
// `cfg` passed to matcher is normalized by replacing all tabs and spaces with single space.
func (f *Framework) WaitForNginxServer(name string, matcher func(cfg string) bool) {
err := wait.Poll(Poll, DefaultTimeout, f.matchNginxConditions(name, matcher))
assert.Nil(ginkgo.GinkgoT(), err, "waiting for nginx server condition/s")
@ -223,6 +224,7 @@ func (f *Framework) WaitForNginxServer(name string, matcher func(cfg string) boo
}
// WaitForNginxConfiguration waits until the nginx configuration contains a particular configuration
// `cfg` passed to matcher is normalized by replacing all tabs and spaces with single space.
func (f *Framework) WaitForNginxConfiguration(matcher func(cfg string) bool) {
err := wait.Poll(Poll, DefaultTimeout, f.matchNginxConditions("", matcher))
assert.Nil(ginkgo.GinkgoT(), err, "waiting for nginx server condition/s")
@ -325,7 +327,7 @@ func (f *Framework) SetNginxConfigMapData(cmData map[string]string) {
assert.Nil(ginkgo.GinkgoT(), err, "updating configuration configmap")
}
f.waitForReload(fn)
f.WaitForReload(fn)
}
// CreateConfigMap creates a new configmap in the current namespace
@ -356,10 +358,12 @@ func (f *Framework) UpdateNginxConfigMapData(key string, value string) {
assert.Nil(ginkgo.GinkgoT(), err, "updating configuration configmap")
}
f.waitForReload(fn)
f.WaitForReload(fn)
}
func (f *Framework) waitForReload(fn func()) {
// WaitForReload calls the passed function and
// asser it has caused at least 1 reload.
func (f *Framework) WaitForReload(fn func()) {
initialReloadCount := getReloadCount(f.pod, f.Namespace, f.KubeClientSet)
fn()

View file

@ -76,7 +76,7 @@ func (f *Framework) EnsureIngress(ingress *networking.Ingress) *networking.Ingre
assert.Nil(ginkgo.GinkgoT(), err, "creating ingress")
}
f.waitForReload(fn)
f.WaitForReload(fn)
ing := f.GetIngress(f.Namespace, ingress.Name)
if ing.Annotations == nil {

View file

@ -0,0 +1,96 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package settings
import (
"fmt"
"net/http"
"strconv"
"strings"
"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"
"k8s.io/ingress-nginx/test/e2e/framework"
)
var _ = framework.DescribeSetting("settings-global-rate-limit", func() {
f := framework.NewDefaultFramework("global-rate-limit")
host := "global-rate-limit"
ginkgo.BeforeEach(func() {
f.NewEchoDeployment()
})
ginkgo.It("generates correct NGINX configuration", func() {
annotations := make(map[string]string)
ing := framework.NewSingleIngress(host, "/", host, f.Namespace, framework.EchoService, 80, annotations)
f.EnsureIngress(ing)
ginkgo.By("generating correct defaults")
ngxCfg := ""
f.WaitForNginxConfiguration(func(cfg string) bool {
if strings.Contains(cfg, "global_throttle") {
ngxCfg = cfg
return true
}
return false
})
assert.Contains(ginkgo.GinkgoT(), ngxCfg, fmt.Sprintf(`global_throttle = { `+
`memcached = { host = "%v", port = %d, connect_timeout = %d, max_idle_timeout = %d, `+
`pool_size = %d, }, status_code = %d, }`,
"", 11211, 50, 10000, 50, 429))
f.HTTPTestClient().GET("/").WithHeader("Host", host).Expect().Status(http.StatusOK)
ginkgo.By("applying customizations")
memcachedHost := "memc.default.svc.cluster.local"
memcachedPort := 11211
memcachedConnectTimeout := 100
memcachedMaxIdleTimeout := 5000
memcachedPoolSize := 100
statusCode := 503
f.SetNginxConfigMapData(map[string]string{
"global-rate-limit-memcached-host": memcachedHost,
"global-rate-limit-memcached-port": strconv.Itoa(memcachedPort),
"global-rate-limit-memcached-connect-timeout": strconv.Itoa(memcachedConnectTimeout),
"global-rate-limit-memcached-max-idle-timeout": strconv.Itoa(memcachedMaxIdleTimeout),
"global-rate-limit-memcached-pool-size": strconv.Itoa(memcachedPoolSize),
"global-rate-limit-status-code": strconv.Itoa(statusCode),
})
ngxCfg = ""
f.WaitForNginxConfiguration(func(cfg string) bool {
if strings.Contains(cfg, "global_throttle") {
ngxCfg = cfg
return true
}
return false
})
assert.Contains(ginkgo.GinkgoT(), ngxCfg, fmt.Sprintf(`global_throttle = { `+
`memcached = { host = "%v", port = %d, connect_timeout = %d, max_idle_timeout = %d, `+
`pool_size = %d, }, status_code = %d, }`,
memcachedHost, memcachedPort, memcachedConnectTimeout, memcachedMaxIdleTimeout,
memcachedPoolSize, statusCode))
f.HTTPTestClient().GET("/").WithHeader("Host", host).Expect().Status(http.StatusOK)
})
})