Add proper passthrough config and JS

This commit is contained in:
Ricardo Katz 2023-09-03 19:02:10 -03:00 committed by Ricardo Katz
parent 7de9759532
commit bbd7b75007
5 changed files with 205 additions and 328 deletions

View file

@ -42,7 +42,6 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/ingress-nginx/pkg/tcpproxy"
adm_controller "k8s.io/ingress-nginx/internal/admission/controller"
ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config"
@ -102,8 +101,6 @@ func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXContro
runningConfig: new(ingress.Configuration),
Proxy: &tcpproxy.TCPProxy{},
metricCollector: mc,
command: NewNginxCommand(),
@ -243,8 +240,6 @@ type NGINXController struct {
isShuttingDown bool
Proxy *tcpproxy.TCPProxy
store store.Storer
metricCollector metric.Collector
@ -436,43 +431,6 @@ func (n *NGINXController) DefaultEndpoint() ingress.Endpoint {
//
//nolint:gocritic // the cfg shouldn't be changed, and shouldn't be mutated by other processes while being rendered.
func (n *NGINXController) generateTemplate(cfg ngx_config.Configuration, ingressCfg ingress.Configuration) ([]byte, error) {
if n.cfg.EnableSSLPassthrough {
servers := []*tcpproxy.TCPServer{}
for _, pb := range ingressCfg.PassthroughBackends {
svc := pb.Service
if svc == nil {
klog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend)
continue
}
port, err := strconv.Atoi(pb.Port.String()) // #nosec
if err != nil {
for _, sp := range svc.Spec.Ports {
if sp.Name == pb.Port.String() {
port = int(sp.Port)
break
}
}
} else {
for _, sp := range svc.Spec.Ports {
//nolint:gosec // Ignore G109 error
if sp.Port == int32(port) {
port = int(sp.Port)
break
}
}
}
// TODO: Allow PassthroughBackends to specify they support proxy-protocol
servers = append(servers, &tcpproxy.TCPServer{
Hostname: pb.Hostname,
IP: svc.Spec.ClusterIP,
Port: port,
ProxyProtocol: false,
})
}
n.Proxy.ServerList = servers
}
// NGINX cannot resize the hash tables used to store server names. For
// this reason we check if the current size is correct for the host
@ -756,9 +714,59 @@ func nextPowerOf2(v int) int {
return v
}
// TODO: Move to the right place
type PassthroughConfig map[string]PassthrougBackend
type PassthrougBackend struct {
Endpoint string `json:"endpoint,omitempty"`
}
func configurePassthroughBackends(backends []*ingress.SSLPassthroughBackend) error {
configPassthrough := make(PassthroughConfig)
for _, pb := range backends {
svc := pb.Service
if svc == nil {
klog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend)
continue
}
port, err := strconv.Atoi(pb.Port.String()) // #nosec
if err != nil {
for _, sp := range svc.Spec.Ports {
if sp.Name == pb.Port.String() {
port = int(sp.Port)
break
}
}
} else {
for _, sp := range svc.Spec.Ports {
if sp.Port == int32(port) {
port = int(sp.Port)
break
}
}
}
configPassthrough[pb.Hostname] = PassthrougBackend{
Endpoint: net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprintf("%v", port)),
}
}
status, err := nginx.NewPassthroughConfigRequest(configPassthrough)
if err != nil || status != "OK" {
return fmt.Errorf("error configuring passthrough: %s %v", status, err)
}
return nil
}
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
if n.cfg.EnableSSLPassthrough {
if err := configurePassthroughBackends(pcfg.PassthroughBackends); err != nil {
return err
}
}
backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
if backendsChanged {
err := configureBackends(pcfg.Backends)

View file

@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
@ -104,6 +105,34 @@ func NewPostStatusRequest(path, contentType string, data interface{}) (statusCod
return res.StatusCode, body, nil
}
// TODO: Turn port configurable
func NewPassthroughConfigRequest(data interface{}) (status string, err error) {
buf, err := json.Marshal(data)
if err != nil {
return "NOK", err
}
conn, err := net.Dial("tcp", "127.0.0.1:19090")
if err != nil {
return "NOK", err
}
defer conn.Close()
_, err = conn.Write(buf)
if err != nil {
return "NOK", err
}
// We need a really small reply
reply := make([]byte, 64)
_, err = conn.Read(reply)
if err != nil {
return "NOK", err
}
return string(reply), nil
}
// GetServerBlock takes an nginx.conf file and a host and tries to find the server block for that host
func GetServerBlock(conf, host string) (string, error) {
startMsg := fmt.Sprintf("## start server %v\n", host)

View file

@ -1,140 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tcpproxy
import (
"fmt"
"io"
"net"
"k8s.io/klog/v2"
"pault.ag/go/sniff/parser"
)
// TCPServer describes a server that works in passthrough mode.
type TCPServer struct {
Hostname string
IP string
Port int
ProxyProtocol bool
}
// TCPProxy describes the passthrough servers and a default as catch all.
type TCPProxy struct {
ServerList []*TCPServer
Default *TCPServer
}
// Get returns the TCPServer to use for a given host.
func (p *TCPProxy) Get(host string) *TCPServer {
if p.ServerList == nil {
return p.Default
}
for _, s := range p.ServerList {
if s.Hostname == host {
return s
}
}
return p.Default
}
// Handle reads enough information from the connection to extract the hostname
// and open a connection to the passthrough server.
func (p *TCPProxy) Handle(conn net.Conn) {
defer conn.Close()
// See: https://www.ibm.com/docs/en/ztpf/1.1.0.15?topic=sessions-ssl-record-format
data := make([]byte, 16384)
length, err := conn.Read(data)
if err != nil {
klog.V(4).ErrorS(err, "Error reading data from the connection")
return
}
proxy := p.Default
hostname, err := parser.GetHostname(data)
if err == nil {
klog.V(4).InfoS("TLS Client Hello", "host", hostname)
proxy = p.Get(hostname)
}
if proxy == nil {
klog.V(4).InfoS("There is no configured proxy for SSL connections.")
return
}
hostPort := net.JoinHostPort(proxy.IP, fmt.Sprintf("%v", proxy.Port))
klog.V(4).InfoS("passing to", "hostport", hostPort)
clientConn, err := net.Dial("tcp", hostPort)
if err != nil {
klog.V(4).ErrorS(err, "error dialing proxy", "ip", proxy.IP, "port", proxy.Port, "hostname", proxy.Hostname)
return
}
defer clientConn.Close()
if proxy.ProxyProtocol {
// write out the Proxy Protocol header
localAddr, ok := conn.LocalAddr().(*net.TCPAddr)
if !ok {
klog.Errorf("unexpected type: %T", conn.LocalAddr())
}
remoteAddr, ok := conn.RemoteAddr().(*net.TCPAddr)
if !ok {
klog.Errorf("unexpected type: %T", conn.RemoteAddr())
}
protocol := "UNKNOWN"
if remoteAddr.IP.To4() != nil {
protocol = "TCP4"
} else if remoteAddr.IP.To16() != nil {
protocol = "TCP6"
}
proxyProtocolHeader := fmt.Sprintf("PROXY %s %s %s %d %d\r\n", protocol, remoteAddr.IP.String(), localAddr.IP.String(), remoteAddr.Port, localAddr.Port)
klog.V(4).InfoS("Writing Proxy Protocol", "header", proxyProtocolHeader)
_, err = fmt.Fprint(clientConn, proxyProtocolHeader)
}
if err != nil {
klog.ErrorS(err, "Error writing Proxy Protocol header")
clientConn.Close()
} else {
_, err = clientConn.Write(data[:length])
if err != nil {
klog.Errorf("Error writing the first 4k of proxy data: %v", err)
clientConn.Close()
}
}
pipe(clientConn, conn)
}
func pipe(client, server net.Conn) {
doCopy := func(s, c net.Conn, cancel chan<- bool) {
if _, err := io.Copy(s, c); err != nil {
klog.Errorf("Error copying data: %v", err)
}
cancel <- true
}
cancel := make(chan bool, 2)
go doCopy(server, client, cancel)
go doCopy(client, server, cancel)
<-cancel
}

View file

@ -1,80 +1,119 @@
function truncate(r) {
/* This is the passthrough configuration endpoint
It will work as following:
* Ingress controller calls the configEndpoint with a full json of hosts,
endpoints
* This json will be parsed and each key will be added as the host, and the
config will be a json containing the endpoint
* Once a request arrive on the public endpoint (port 443), the getPTBackend
function will be called (as part of a js_set, and return the right backend)
* It will read the host from variables.ssl_preread_server_name
* It will get the config from the shared map
* PROXY protocol is not supported today, as NGINX does not allows us to set
conditional proxy with a variable
* It will return the backend as a single value
* We don't support multiple backends right now
expectedJson = `
{
"server1.tld": {
"endpoint": "10.20.30.40:12345",
},
"server2.tld": {
"endpoint": "10.20.30.50:8080",
},
}
`
*/
/**
configstatus will be the variable that will contain the latest configuration status return,
so the caller will be able to check it. It is returned by getConfigStatus, which is the last
operation on the configuration endpoint
*/
const OK = "OK";
const NOK = "NOK";
const KEYNAME = "passthroughmap";
var configstatus = ''
function getConfigStatus() {
return configstatus;
}
/**
configPTBackends will receive a JSON as defined above, and:
* Parse the input to validate if it is valid
* Truncate the previous map
* Configure with this new structure
We need to be careful here, if some situation where a config is ongoing may break a starting communication. Something we could look at
is to put a lock somewhere, maybe on a different shared map, and release the connection just once it is unlocked
*/
function configBackends(s) {
s.log("Start of configuration");
var req = '';
s.on('upload', function(data, flags) {
// so far, we just want to receive and store the data until the stream finishes
req += data;
if (data.length || flags.last) {
configstatus = configureWithData(req, s)
s.warn(configstatus)
s.done();
}
})
}
function configureWithData(configdata, s) {
try {
ngx.shared.ptbackends.clear() // TODO: We should instead try to compare and clean
r.return(200, "ok")
r.finish()
let backends = {};
const parsed = JSON.parse(configdata);
const keys = Object.keys(parsed);
// We create a separate array/object as a safety measure, so if something is broken
// it will not break the whole reconfiguration
keys.forEach((key) => {
let serviceitem = parsed[key];
if (typeof serviceitem.endpoint != "string") {
s.warn(`endpoint of ${key} is not string, skipping`)
return;
}
backends[key] = serviceitem.endpoint;
});
// Clear method is not working, we should verify with NGX folks
//ngx.shared.passthrough.clear();
ngx.shared.passthrough.set(KEYNAME, JSON.stringify(backends))
return OK
} catch (e) {
r.error(e)
r.return(400, "error truncating the map json payload")
r.finish()
s.error(`failed configuring data: ${e}`);
return NOK;
}
}
function set(r) {
var service;
service = r.args.key
if (service == "" || service == null ) {
r.return(400, "key should not be null")
r.finish()
return
}
// getBackend fetches the backend given a hostname sent via SNI
function getBackend(s) {
try {
JSON.parse(r.requestText)
ngx.shared.ptbackends.set(r.args.key, r.requestText)
r.return(200, "ok")
r.finish()
var hostname = s.variables.ssl_preread_server_name;
if (hostname == null || hostname == "undefined" || hostname == "") {
throw("hostname was not provided")
}
let backends = ngx.shared.passthrough.get(KEYNAME)
if (backends == null || backends == "") {
throw('no entry on endpoint map')
}
const backendmap = JSON.parse(backends)
s.warn(JSON.stringify(backendmap))
if (backendmap[hostname] == null || backendmap[hostname] == undefined) {
throw `no endpoint is configured for service ${hostname}"`
}
return backendmap[hostname]
} catch (e) {
r.error(e)
r.return(400, "error parsing json payload")
r.finish()
s.warn(`error occurred while getting the backend ` +
`sending to default backend: ${e}`)
return "127.0.0.1:442"
}
}
function getUpstream(r) {
var service;
try {
if ("variables" in r) {
service = r.variables.ssl_preread_server_name;
}
if (service == "") {
// TODO: This should be a parameter with the port that NGINX is listening
// for non Passthrough
return "127.0.0.1:442"
}
const backends = ngx.shared.ptbackends.get(service)
if (backends == "" || backends == null) {
throw "no backend configured"
}
const objBackend = JSON.parse(backends)
if (objBackend["endpoints"] == null || objBackend["endpoints"] == undefined) {
throw "bad endpoints object" // TODO: This validation should happen when receiving the json
}
// TODO: We can loadbalance between backends, but right now let's receive just the ClusterIP
if (!Array.isArray(objBackend["endpoints"])) {
throw "endpoint object is not an array"
}
if (objBackend["endpoints"].length < 1) {
throw "no backends available for the service"
}
// TODO: Do we want to implement a different LB for Passthrough when it is composed of multiple backends?
var randomBackend = Math.floor(Math.random() * (objBackend["endpoints"].length));
if (typeof objBackend["endpoints"][randomBackend] != 'string') {
throw "endpoint is not a string"
}
return objBackend["endpoints"][randomBackend]
} catch (e) {
// If there's an error we should give user a return saying it
return "@invalidbackend"
}
}
export default {set, truncate, getUpstream};
export default {getConfigStatus, configBackends, getBackend};

View file

@ -37,7 +37,6 @@ load_module /etc/nginx/modules/ngx_http_opentracing_module.so;
load_module /modules_mount/etc/nginx/modules/otel/otel_ngx_module.so;
{{ end }}
load_module modules/ngx_http_js_module.so;
load_module modules/ngx_stream_js_module.so;
daemon off;
@ -763,76 +762,7 @@ http {
}
}
# NGX Server for NJS Operations
# TODO: Check if the shared map can be accessed between stream and server directives
server {
listen 127.0.0.1:11111; # TODO: Turn this configurable
set $proxy_upstream_name "njs";
keepalive_timeout 0;
gzip off;
access_log off;
{{ if $cfg.EnableOpentracing }}
opentracing off;
{{ end }}
{{ if $cfg.EnableOpentelemetry }}
opentelemetry off;
{{ end }}
{{ if $cfg.EnableModsecurity }}
modsecurity off;
{{ end }}
location {{ $healthzURI }} {
return 200;
}
location /is-dynamic-lb-initialized {
content_by_lua_block {
local configuration = require("configuration")
local backend_data = configuration.get_backends_data()
if not backend_data then
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
return
end
ngx.say("OK")
ngx.exit(ngx.HTTP_OK)
}
}
location {{ .StatusPath }} {
stub_status on;
}
location /ptcfg/truncate {
client_max_body_size {{ luaConfigurationRequestBodySize $cfg }};
client_body_buffer_size {{ luaConfigurationRequestBodySize $cfg }};
proxy_buffering off;
js_content passthrough.truncate;
}
location /ptcfg/set {
client_max_body_size {{ luaConfigurationRequestBodySize $cfg }};
client_body_buffer_size {{ luaConfigurationRequestBodySize $cfg }};
proxy_buffering off;
js_content passthrough.set;
}
location / {
content_by_lua_block {
ngx.exit(ngx.HTTP_NOT_FOUND)
}
}
}
}
stream {
lua_package_path "/etc/nginx/lua/?.lua;/etc/nginx/lua/vendor/?.lua;;";
@ -900,19 +830,30 @@ stream {
}
}
{{ if and $all.IsSSLPassthroughEnabled }}
# Start SSLPassthrough configuration
{{/* Begin of the new SSLPassthrough approach */}}
{{/* This is the configuration endpoint for dynamic passthrough
It should be made configurable before reaching GA
We want this server to be always running
*/}}
js_import njs/passthrough.js;
js_shared_dict_zone zone=ptbackends:32m type=string;
js_set $ptupstream passthrough.getUpstream;
server {
listen 19090;
js_preread passthrough.configBackends;
js_set $cfgreturn passthrough.getConfigStatus;
return $cfgreturn;
}
{{ if and $all.IsSSLPassthroughEnabled }}
server {
{{ buildSSLPassthroughListener $all }}
ssl_preread on;
js_set $ptupstream passthrough.getBackend;
proxy_pass $ptupstream;
}
{{ end }}
{{/* End of new SSLPassthrough approach */}}
server {
listen 127.0.0.1:{{ .StreamPort }};