Merge pull request #2804 from ElvinEfendi/external-nama-in-dynamic-mode

add support for ExternalName service type in dynamic mode
This commit is contained in:
k8s-ci-robot 2018-07-25 07:25:57 -07:00 committed by GitHub
commit e89c5697bc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 327 additions and 18 deletions

View file

@ -1,16 +0,0 @@
#!/usr/bin/env resty
local ffi = require("ffi")
-- without this we get errors such as "attempt to redefine XXX"
local old_cdef = ffi.cdef
local exists = {}
ffi.cdef = function(def)
if exists[def] then
return
end
exists[def] = true
return old_cdef(def)
end
require "busted.runner"({ standalone = false })

View file

@ -25,4 +25,4 @@ resty \
--shdict "configuration_data 5M" \ --shdict "configuration_data 5M" \
--shdict "balancer_ewma 1M" \ --shdict "balancer_ewma 1M" \
--shdict "balancer_ewma_last_touched_at 1M" \ --shdict "balancer_ewma_last_touched_at 1M" \
./build/busted ${BUSTED_ARGS} ./rootfs/etc/nginx/lua/test/ ./rootfs/etc/nginx/lua/test/run.lua ${BUSTED_ARGS} ./rootfs/etc/nginx/lua/test/

View file

@ -741,6 +741,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int) error {
backends := make([]*ingress.Backend, len(pcfg.Backends)) backends := make([]*ingress.Backend, len(pcfg.Backends))
for i, backend := range pcfg.Backends { for i, backend := range pcfg.Backends {
service := &apiv1.Service{Spec: backend.Service.Spec}
luaBackend := &ingress.Backend{ luaBackend := &ingress.Backend{
Name: backend.Name, Name: backend.Name,
Port: backend.Port, Port: backend.Port,
@ -749,6 +750,7 @@ func configureDynamically(pcfg *ingress.Configuration, port int) error {
SessionAffinity: backend.SessionAffinity, SessionAffinity: backend.SessionAffinity,
UpstreamHashBy: backend.UpstreamHashBy, UpstreamHashBy: backend.UpstreamHashBy,
LoadBalancing: backend.LoadBalancing, LoadBalancing: backend.LoadBalancing,
Service: service,
} }
var endpoints []ingress.Endpoint var endpoints []ingress.Endpoint

View file

@ -130,6 +130,7 @@ var (
"filterRateLimits": filterRateLimits, "filterRateLimits": filterRateLimits,
"buildRateLimitZones": buildRateLimitZones, "buildRateLimitZones": buildRateLimitZones,
"buildRateLimit": buildRateLimit, "buildRateLimit": buildRateLimit,
"buildResolversForLua": buildResolversForLua,
"buildResolvers": buildResolvers, "buildResolvers": buildResolvers,
"buildUpstreamName": buildUpstreamName, "buildUpstreamName": buildUpstreamName,
"isLocationInLocationList": isLocationInLocationList, "isLocationInLocationList": isLocationInLocationList,
@ -220,6 +221,33 @@ func buildLuaSharedDictionaries(s interface{}, dynamicConfigurationEnabled bool,
return strings.Join(out, ";\n\r") + ";" return strings.Join(out, ";\n\r") + ";"
} }
func buildResolversForLua(res interface{}, disableIpv6 interface{}) string {
nss, ok := res.([]net.IP)
if !ok {
glog.Errorf("expected a '[]net.IP' type but %T was returned", res)
return ""
}
no6, ok := disableIpv6.(bool)
if !ok {
glog.Errorf("expected a 'bool' type but %T was returned", disableIpv6)
return ""
}
if len(nss) == 0 {
return ""
}
r := []string{}
for _, ns := range nss {
if ing_net.IsIPV6(ns) && no6 {
continue
}
r = append(r, fmt.Sprintf("\"%v\"", ns))
}
return strings.Join(r, ", ")
}
// buildResolvers returns the resolvers reading the /etc/resolv.conf file // buildResolvers returns the resolvers reading the /etc/resolv.conf file
func buildResolvers(res interface{}, disableIpv6 interface{}) string { func buildResolvers(res interface{}, disableIpv6 interface{}) string {
// NGINX need IPV6 addresses to be surrounded by brackets // NGINX need IPV6 addresses to be surrounded by brackets

View file

@ -601,6 +601,26 @@ func TestBuildForwardedFor(t *testing.T) {
} }
} }
func TestBuildResolversForLua(t *testing.T) {
ipOne := net.ParseIP("192.0.0.1")
ipTwo := net.ParseIP("2001:db8:1234:0000:0000:0000:0000:0000")
ipList := []net.IP{ipOne, ipTwo}
expected := "\"192.0.0.1\", \"2001:db8:1234::\""
actual := buildResolversForLua(ipList, false)
if expected != actual {
t.Errorf("Expected '%v' but returned '%v'", expected, actual)
}
expected = "\"192.0.0.1\""
actual = buildResolversForLua(ipList, true)
if expected != actual {
t.Errorf("Expected '%v' but returned '%v'", expected, actual)
}
}
func TestBuildResolvers(t *testing.T) { func TestBuildResolvers(t *testing.T) {
ipOne := net.ParseIP("192.0.0.1") ipOne := net.ParseIP("192.0.0.1")
ipTwo := net.ParseIP("2001:db8:1234:0000:0000:0000:0000:0000") ipTwo := net.ParseIP("2001:db8:1234:0000:0000:0000:0000:0000")

View file

@ -1,5 +1,7 @@
local ngx_balancer = require("ngx.balancer") local ngx_balancer = require("ngx.balancer")
local json = require("cjson") local json = require("cjson")
local util = require("util")
local dns_util = require("util.dns")
local configuration = require("configuration") local configuration = require("configuration")
local round_robin = require("balancer.round_robin") local round_robin = require("balancer.round_robin")
local chash = require("balancer.chash") local chash = require("balancer.chash")
@ -40,6 +42,19 @@ local function get_implementation(backend)
return implementation return implementation
end end
local function resolve_external_names(original_backend)
local backend = util.deepcopy(original_backend)
local endpoints = {}
for _, endpoint in ipairs(backend.endpoints) do
local ips = dns_util.resolve(endpoint.address)
for _, ip in ipairs(ips) do
table.insert(endpoints, { address = ip, port = endpoint.port })
end
end
backend.endpoints = endpoints
return backend
end
local function sync_backend(backend) local function sync_backend(backend)
local implementation = get_implementation(backend) local implementation = get_implementation(backend)
local balancer = balancers[backend.name] local balancer = balancers[backend.name]
@ -59,6 +74,11 @@ local function sync_backend(backend)
return return
end end
local service_type = backend.service and backend.service.spec and backend.service.spec["type"]
if service_type == "ExternalName" then
backend = resolve_external_names(backend)
end
balancer:sync(backend) balancer:sync(backend)
end end

View file

@ -1,7 +1,9 @@
-- this is the Lua representation of Configuration struct in internal/ingress/types.go -- this is the Lua representation of Configuration struct in internal/ingress/types.go
local configuration_data = ngx.shared.configuration_data local configuration_data = ngx.shared.configuration_data
local _M = {} local _M = {
nameservers = {}
}
function _M.get_backends_data() function _M.get_backends_data()
return configuration_data:get("backends") return configuration_data:get("backends")

View file

@ -77,6 +77,44 @@ describe("Balancer", function()
assert.spy(s).was_called_with(implementation, backend) assert.spy(s).was_called_with(implementation, backend)
end) end)
it("resolves external name to endpoints when service is of type External name", function()
backend = {
name = "exmaple-com", service = { spec = { ["type"] = "ExternalName" } },
endpoints = {
{ address = "example.com", port = "80", maxFails = 0, failTimeout = 0 }
}
}
local dns_helper = require("test/dns_helper")
dns_helper.mock_dns_query({
{
name = "example.com",
address = "192.168.1.1",
ttl = 3600,
},
{
name = "example.com",
address = "1.2.3.4",
ttl = 60,
}
})
expected_backend = {
name = "exmaple-com", service = { spec = { ["type"] = "ExternalName" } },
endpoints = {
{ address = "192.168.1.1", port = "80" },
{ address = "1.2.3.4", port = "80" },
}
}
local mock_instance = { sync = function(backend) end }
setmetatable(mock_instance, implementation)
implementation.new = function(self, backend) return mock_instance end
assert.has_no.errors(function() balancer.sync_backend(backend) end)
stub(mock_instance, "sync")
assert.has_no.errors(function() balancer.sync_backend(backend) end)
assert.stub(mock_instance.sync).was_called_with(mock_instance, expected_backend)
end)
it("replaces the existing balancer when load balancing config changes for backend", function() it("replaces the existing balancer when load balancing config changes for backend", function()
assert.has_no.errors(function() balancer.sync_backend(backend) end) assert.has_no.errors(function() balancer.sync_backend(backend) end)

View file

@ -0,0 +1,27 @@
local _M = {}
local configuration = require("configuration")
local resolver = require("resty.dns.resolver")
local old_resolver_new = resolver.new
local function reset(nameservers)
configuration.nameservers = nameservers or { "1.1.1.1" }
end
function _M.mock_new(func, nameservers)
reset(nameservers)
resolver.new = func
end
function _M.mock_dns_query(response, err)
reset()
resolver.new = function(self, options)
local r = old_resolver_new(self, options)
r.query = function(self, name, options, tries)
return response, err
end
return r
end
end
return _M

View file

@ -0,0 +1,32 @@
local ffi = require("ffi")
-- without this we get errors such as "attempt to redefine XXX"
local old_cdef = ffi.cdef
local exists = {}
ffi.cdef = function(def)
if exists[def] then
return
end
exists[def] = true
return old_cdef(def)
end
local old_udp = ngx.socket.udp
ngx.socket.udp = function(...)
local socket = old_udp(...)
socket.send = function(...)
error("ngx.socket.udp:send please mock this to use in tests")
end
return socket
end
local old_tcp = ngx.socket.tcp
ngx.socket.tcp = function(...)
local socket = old_tcp(...)
socket.send = function(...)
error("ngx.socket.tcp:send please mock this to use in tests")
end
return socket
end
require "busted.runner"({ standalone = false })

View file

@ -0,0 +1,74 @@
describe("resolve", function()
local dns = require("util.dns")
local dns_helper = require("test/dns_helper")
it("sets correct nameservers", function()
dns_helper.mock_new(function(self, options)
assert.are.same({ nameservers = { "1.2.3.4", "4.5.6.7" }, retrans = 5, timeout = 2000 }, options)
return nil, ""
end, { "1.2.3.4", "4.5.6.7" })
dns.resolve("example.com")
end)
it("returns host when an error happens", function()
local s_ngx_log = spy.on(ngx, "log")
dns_helper.mock_new(function(...) return nil, "an error" end)
assert.are.same({ "example.com" }, dns.resolve("example.com"))
assert.spy(s_ngx_log).was_called_with(ngx.ERR, "failed to instantiate the resolver: an error")
dns_helper.mock_dns_query(nil, "oops!")
assert.are.same({ "example.com" }, dns.resolve("example.com"))
assert.spy(s_ngx_log).was_called_with(ngx.ERR, "failed to query the DNS server: oops!")
dns_helper.mock_dns_query({ errcode = 1, errstr = "format error" })
assert.are.same({ "example.com" }, dns.resolve("example.com"))
assert.spy(s_ngx_log).was_called_with(ngx.ERR, "server returned error code: 1: format error")
dns_helper.mock_dns_query({})
assert.are.same({ "example.com" }, dns.resolve("example.com"))
assert.spy(s_ngx_log).was_called_with(ngx.ERR, "no A record resolved")
dns_helper.mock_dns_query({ { name = "example.com", cname = "sub.example.com", ttl = 60 } })
assert.are.same({ "example.com" }, dns.resolve("example.com"))
assert.spy(s_ngx_log).was_called_with(ngx.ERR, "no A record resolved")
end)
it("resolves all A records of given host, caches them with minimal ttl and returns from cache next time", function()
dns_helper.mock_dns_query({
{
name = "example.com",
address = "192.168.1.1",
ttl = 3600,
},
{
name = "example.com",
address = "1.2.3.4",
ttl = 60,
}
})
local lrucache = require("resty.lrucache")
local old_lrucache_new = lrucache.new
lrucache.new = function(...)
local cache = old_lrucache_new(...)
local old_set = cache.set
cache.set = function(self, key, value, ttl)
assert.equal("example.com", key)
assert.are.same({ "192.168.1.1", "1.2.3.4" }, value)
assert.equal(60, ttl)
return old_set(self, key, value, ttl)
end
return cache
end
assert.are.same({ "192.168.1.1", "1.2.3.4" }, dns.resolve("example.com"))
dns_helper.mock_new(function(...)
error("expected to short-circuit and return response from cache")
end)
assert.are.same({ "192.168.1.1", "1.2.3.4" }, dns.resolve("example.com"))
end)
end)

View file

@ -0,0 +1,70 @@
local resolver = require("resty.dns.resolver")
local lrucache = require("resty.lrucache")
local configuration = require("configuration")
local util = require("util")
local _M = {}
local CACHE_SIZE = 10000
local MAXIMUM_TTL_VALUE = 2147483647 -- maximum value according to https://tools.ietf.org/html/rfc2181
local cache, err = lrucache.new(CACHE_SIZE)
if not cache then
return error("failed to create the cache: " .. (err or "unknown"))
end
local function a_records_and_max_ttl(answers)
local addresses = {}
local ttl = MAXIMUM_TTL_VALUE -- maximum value according to https://tools.ietf.org/html/rfc2181
for _, ans in ipairs(answers) do
if ans.address then
table.insert(addresses, ans.address)
if ttl > ans.ttl then
ttl = ans.ttl
end
end
end
return addresses, ttl
end
function _M.resolve(host)
local cached_addresses = cache:get(host)
if cached_addresses then
ngx.log(ngx.INFO, string.format("addresses %s for host %s was resolved from cache", table.concat(cached_addresses, ", "), host))
return cached_addresses
end
local r, err = resolver:new{
nameservers = util.deepcopy(configuration.nameservers),
retrans = 5,
timeout = 2000, -- 2 sec
}
if not r then
ngx.log(ngx.ERR, "failed to instantiate the resolver: " .. tostring(err))
return { host }
end
local answers, err, _tries = r:query(host, { qtype = r.TYPE_A }, {})
if not answers then
ngx.log(ngx.ERR, "failed to query the DNS server: " .. tostring(err))
return { host }
end
if answers.errcode then
ngx.log(ngx.ERR, string.format("server returned error code: %s: %s", answers.errcode, answers.errstr))
return { host }
end
local addresses, ttl = a_records_and_max_ttl(answers)
if #addresses == 0 then
ngx.log(ngx.ERR, "no A record resolved")
return { host }
end
cache:set(host, addresses, ttl)
return addresses
end
return _M

View file

@ -64,6 +64,7 @@ http {
error("require failed: " .. tostring(res)) error("require failed: " .. tostring(res))
else else
configuration = res configuration = res
configuration.nameservers = { {{ buildResolversForLua $cfg.Resolver $cfg.DisableIpv6DNS }} }
end end
ok, res = pcall(require, "balancer") ok, res = pcall(require, "balancer")

View file

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/ingress-nginx/internal/net/dns"
"k8s.io/ingress-nginx/test/e2e/framework" "k8s.io/ingress-nginx/test/e2e/framework"
) )
@ -55,6 +56,16 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
err = f.NewEchoDeploymentWithReplicas(1) err = f.NewEchoDeploymentWithReplicas(1)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = f.WaitForNginxConfiguration(func(cfg string) bool {
servers, err := dns.GetSystemNameServers()
Expect(err).NotTo(HaveOccurred())
ips := []string{}
for _, server := range servers {
ips = append(ips, fmt.Sprintf("\"%v\"", server))
}
return strings.Contains(cfg, "configuration.nameservers = { "+strings.Join(ips, ", ")+" }")
})
host := "foo.com" host := "foo.com"
ing, err := ensureIngress(f, host) ing, err := ensureIngress(f, host)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())