From bbd7b75007534355712b5ae973e6f82600271f9f Mon Sep 17 00:00:00 2001 From: Ricardo Katz Date: Sun, 3 Sep 2023 19:02:10 -0300 Subject: [PATCH] Add proper passthrough config and JS --- internal/ingress/controller/nginx.go | 92 +++++++------- internal/nginx/main.go | 29 +++++ pkg/tcpproxy/tcp.go | 140 --------------------- rootfs/etc/nginx/njs/passthrough.js | 179 ++++++++++++++++----------- rootfs/etc/nginx/template/nginx.tmpl | 93 +++----------- 5 files changed, 205 insertions(+), 328 deletions(-) delete mode 100644 pkg/tcpproxy/tcp.go diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index e2047e56a..8cda23345 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -42,7 +42,6 @@ import ( v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" - "k8s.io/ingress-nginx/pkg/tcpproxy" adm_controller "k8s.io/ingress-nginx/internal/admission/controller" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" @@ -102,8 +101,6 @@ func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXContro runningConfig: new(ingress.Configuration), - Proxy: &tcpproxy.TCPProxy{}, - metricCollector: mc, command: NewNginxCommand(), @@ -243,8 +240,6 @@ type NGINXController struct { isShuttingDown bool - Proxy *tcpproxy.TCPProxy - store store.Storer metricCollector metric.Collector @@ -436,43 +431,6 @@ func (n *NGINXController) DefaultEndpoint() ingress.Endpoint { // //nolint:gocritic // the cfg shouldn't be changed, and shouldn't be mutated by other processes while being rendered. func (n *NGINXController) generateTemplate(cfg ngx_config.Configuration, ingressCfg ingress.Configuration) ([]byte, error) { - if n.cfg.EnableSSLPassthrough { - servers := []*tcpproxy.TCPServer{} - for _, pb := range ingressCfg.PassthroughBackends { - svc := pb.Service - if svc == nil { - klog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend) - continue - } - port, err := strconv.Atoi(pb.Port.String()) // #nosec - if err != nil { - for _, sp := range svc.Spec.Ports { - if sp.Name == pb.Port.String() { - port = int(sp.Port) - break - } - } - } else { - for _, sp := range svc.Spec.Ports { - //nolint:gosec // Ignore G109 error - if sp.Port == int32(port) { - port = int(sp.Port) - break - } - } - } - - // TODO: Allow PassthroughBackends to specify they support proxy-protocol - servers = append(servers, &tcpproxy.TCPServer{ - Hostname: pb.Hostname, - IP: svc.Spec.ClusterIP, - Port: port, - ProxyProtocol: false, - }) - } - - n.Proxy.ServerList = servers - } // NGINX cannot resize the hash tables used to store server names. For // this reason we check if the current size is correct for the host @@ -756,9 +714,59 @@ func nextPowerOf2(v int) int { return v } +// TODO: Move to the right place +type PassthroughConfig map[string]PassthrougBackend +type PassthrougBackend struct { + Endpoint string `json:"endpoint,omitempty"` +} + +func configurePassthroughBackends(backends []*ingress.SSLPassthroughBackend) error { + configPassthrough := make(PassthroughConfig) + + for _, pb := range backends { + svc := pb.Service + if svc == nil { + klog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend) + continue + } + port, err := strconv.Atoi(pb.Port.String()) // #nosec + if err != nil { + for _, sp := range svc.Spec.Ports { + if sp.Name == pb.Port.String() { + port = int(sp.Port) + break + } + } + } else { + for _, sp := range svc.Spec.Ports { + if sp.Port == int32(port) { + port = int(sp.Port) + break + } + } + } + configPassthrough[pb.Hostname] = PassthrougBackend{ + Endpoint: net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprintf("%v", port)), + } + } + status, err := nginx.NewPassthroughConfigRequest(configPassthrough) + if err != nil || status != "OK" { + return fmt.Errorf("error configuring passthrough: %s %v", status, err) + } + return nil + +} + // configureDynamically encodes new Backends in JSON format and POSTs the // payload to an internal HTTP endpoint handled by Lua. func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error { + + if n.cfg.EnableSSLPassthrough { + if err := configurePassthroughBackends(pcfg.PassthroughBackends); err != nil { + return err + } + } + backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends) if backendsChanged { err := configureBackends(pcfg.Backends) diff --git a/internal/nginx/main.go b/internal/nginx/main.go index fc586e9e8..e95018e3e 100644 --- a/internal/nginx/main.go +++ b/internal/nginx/main.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "os" "os/exec" @@ -104,6 +105,34 @@ func NewPostStatusRequest(path, contentType string, data interface{}) (statusCod return res.StatusCode, body, nil } +// TODO: Turn port configurable +func NewPassthroughConfigRequest(data interface{}) (status string, err error) { + + buf, err := json.Marshal(data) + if err != nil { + return "NOK", err + } + + conn, err := net.Dial("tcp", "127.0.0.1:19090") + if err != nil { + return "NOK", err + } + + defer conn.Close() + _, err = conn.Write(buf) + if err != nil { + return "NOK", err + } + // We need a really small reply + reply := make([]byte, 64) + _, err = conn.Read(reply) + if err != nil { + return "NOK", err + } + + return string(reply), nil +} + // GetServerBlock takes an nginx.conf file and a host and tries to find the server block for that host func GetServerBlock(conf, host string) (string, error) { startMsg := fmt.Sprintf("## start server %v\n", host) diff --git a/pkg/tcpproxy/tcp.go b/pkg/tcpproxy/tcp.go deleted file mode 100644 index eda4a2746..000000000 --- a/pkg/tcpproxy/tcp.go +++ /dev/null @@ -1,140 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package tcpproxy - -import ( - "fmt" - "io" - "net" - - "k8s.io/klog/v2" - - "pault.ag/go/sniff/parser" -) - -// TCPServer describes a server that works in passthrough mode. -type TCPServer struct { - Hostname string - IP string - Port int - ProxyProtocol bool -} - -// TCPProxy describes the passthrough servers and a default as catch all. -type TCPProxy struct { - ServerList []*TCPServer - Default *TCPServer -} - -// Get returns the TCPServer to use for a given host. -func (p *TCPProxy) Get(host string) *TCPServer { - if p.ServerList == nil { - return p.Default - } - - for _, s := range p.ServerList { - if s.Hostname == host { - return s - } - } - - return p.Default -} - -// Handle reads enough information from the connection to extract the hostname -// and open a connection to the passthrough server. -func (p *TCPProxy) Handle(conn net.Conn) { - defer conn.Close() - // See: https://www.ibm.com/docs/en/ztpf/1.1.0.15?topic=sessions-ssl-record-format - data := make([]byte, 16384) - - length, err := conn.Read(data) - if err != nil { - klog.V(4).ErrorS(err, "Error reading data from the connection") - return - } - - proxy := p.Default - hostname, err := parser.GetHostname(data) - if err == nil { - klog.V(4).InfoS("TLS Client Hello", "host", hostname) - proxy = p.Get(hostname) - } - - if proxy == nil { - klog.V(4).InfoS("There is no configured proxy for SSL connections.") - return - } - - hostPort := net.JoinHostPort(proxy.IP, fmt.Sprintf("%v", proxy.Port)) - klog.V(4).InfoS("passing to", "hostport", hostPort) - clientConn, err := net.Dial("tcp", hostPort) - if err != nil { - klog.V(4).ErrorS(err, "error dialing proxy", "ip", proxy.IP, "port", proxy.Port, "hostname", proxy.Hostname) - return - } - defer clientConn.Close() - - if proxy.ProxyProtocol { - // write out the Proxy Protocol header - localAddr, ok := conn.LocalAddr().(*net.TCPAddr) - if !ok { - klog.Errorf("unexpected type: %T", conn.LocalAddr()) - } - remoteAddr, ok := conn.RemoteAddr().(*net.TCPAddr) - if !ok { - klog.Errorf("unexpected type: %T", conn.RemoteAddr()) - } - protocol := "UNKNOWN" - if remoteAddr.IP.To4() != nil { - protocol = "TCP4" - } else if remoteAddr.IP.To16() != nil { - protocol = "TCP6" - } - proxyProtocolHeader := fmt.Sprintf("PROXY %s %s %s %d %d\r\n", protocol, remoteAddr.IP.String(), localAddr.IP.String(), remoteAddr.Port, localAddr.Port) - klog.V(4).InfoS("Writing Proxy Protocol", "header", proxyProtocolHeader) - _, err = fmt.Fprint(clientConn, proxyProtocolHeader) - } - if err != nil { - klog.ErrorS(err, "Error writing Proxy Protocol header") - clientConn.Close() - } else { - _, err = clientConn.Write(data[:length]) - if err != nil { - klog.Errorf("Error writing the first 4k of proxy data: %v", err) - clientConn.Close() - } - } - - pipe(clientConn, conn) -} - -func pipe(client, server net.Conn) { - doCopy := func(s, c net.Conn, cancel chan<- bool) { - if _, err := io.Copy(s, c); err != nil { - klog.Errorf("Error copying data: %v", err) - } - cancel <- true - } - - cancel := make(chan bool, 2) - - go doCopy(server, client, cancel) - go doCopy(client, server, cancel) - - <-cancel -} diff --git a/rootfs/etc/nginx/njs/passthrough.js b/rootfs/etc/nginx/njs/passthrough.js index 03f4914c8..2d585a6c2 100644 --- a/rootfs/etc/nginx/njs/passthrough.js +++ b/rootfs/etc/nginx/njs/passthrough.js @@ -1,80 +1,119 @@ -function truncate(r) { +/* This is the passthrough configuration endpoint +It will work as following: +* Ingress controller calls the configEndpoint with a full json of hosts, + endpoints +* This json will be parsed and each key will be added as the host, and the + config will be a json containing the endpoint +* Once a request arrive on the public endpoint (port 443), the getPTBackend + function will be called (as part of a js_set, and return the right backend) + * It will read the host from variables.ssl_preread_server_name + * It will get the config from the shared map + * PROXY protocol is not supported today, as NGINX does not allows us to set + conditional proxy with a variable + * It will return the backend as a single value +* We don't support multiple backends right now + +expectedJson = ` +{ + "server1.tld": { + "endpoint": "10.20.30.40:12345", + }, + "server2.tld": { + "endpoint": "10.20.30.50:8080", + }, +} +` +*/ + +/** +configstatus will be the variable that will contain the latest configuration status return, +so the caller will be able to check it. It is returned by getConfigStatus, which is the last +operation on the configuration endpoint +*/ + +const OK = "OK"; +const NOK = "NOK"; +const KEYNAME = "passthroughmap"; + +var configstatus = '' +function getConfigStatus() { + return configstatus; +} + +/** +configPTBackends will receive a JSON as defined above, and: + * Parse the input to validate if it is valid + * Truncate the previous map + * Configure with this new structure +We need to be careful here, if some situation where a config is ongoing may break a starting communication. Something we could look at +is to put a lock somewhere, maybe on a different shared map, and release the connection just once it is unlocked +*/ +function configBackends(s) { + s.log("Start of configuration"); + var req = ''; + s.on('upload', function(data, flags) { + // so far, we just want to receive and store the data until the stream finishes + req += data; + if (data.length || flags.last) { + configstatus = configureWithData(req, s) + s.warn(configstatus) + s.done(); + } + }) +} + +function configureWithData(configdata, s) { try { - ngx.shared.ptbackends.clear() // TODO: We should instead try to compare and clean - r.return(200, "ok") - r.finish() + let backends = {}; + const parsed = JSON.parse(configdata); + const keys = Object.keys(parsed); + + // We create a separate array/object as a safety measure, so if something is broken + // it will not break the whole reconfiguration + keys.forEach((key) => { + let serviceitem = parsed[key]; + if (typeof serviceitem.endpoint != "string") { + s.warn(`endpoint of ${key} is not string, skipping`) + return; + } + backends[key] = serviceitem.endpoint; + }); + + // Clear method is not working, we should verify with NGX folks + //ngx.shared.passthrough.clear(); + ngx.shared.passthrough.set(KEYNAME, JSON.stringify(backends)) + + return OK } catch (e) { - r.error(e) - r.return(400, "error truncating the map json payload") - r.finish() + s.error(`failed configuring data: ${e}`); + return NOK; } } -function set(r) { - var service; - service = r.args.key - if (service == "" || service == null ) { - r.return(400, "key should not be null") - r.finish() - return - } - +// getBackend fetches the backend given a hostname sent via SNI +function getBackend(s) { try { - JSON.parse(r.requestText) - ngx.shared.ptbackends.set(r.args.key, r.requestText) - r.return(200, "ok") - r.finish() + var hostname = s.variables.ssl_preread_server_name; + if (hostname == null || hostname == "undefined" || hostname == "") { + throw("hostname was not provided") + } + let backends = ngx.shared.passthrough.get(KEYNAME) + if (backends == null || backends == "") { + throw('no entry on endpoint map') + } + const backendmap = JSON.parse(backends) + s.warn(JSON.stringify(backendmap)) + if (backendmap[hostname] == null || backendmap[hostname] == undefined) { + throw `no endpoint is configured for service ${hostname}"` + } + + return backendmap[hostname] + } catch (e) { - r.error(e) - r.return(400, "error parsing json payload") - r.finish() + s.warn(`error occurred while getting the backend ` + + `sending to default backend: ${e}`) + return "127.0.0.1:442" } } -function getUpstream(r) { - var service; - - try { - if ("variables" in r) { - service = r.variables.ssl_preread_server_name; - } - - if (service == "") { - // TODO: This should be a parameter with the port that NGINX is listening - // for non Passthrough - return "127.0.0.1:442" - } - - const backends = ngx.shared.ptbackends.get(service) - if (backends == "" || backends == null) { - throw "no backend configured" - } - - const objBackend = JSON.parse(backends) - if (objBackend["endpoints"] == null || objBackend["endpoints"] == undefined) { - throw "bad endpoints object" // TODO: This validation should happen when receiving the json - } - - // TODO: We can loadbalance between backends, but right now let's receive just the ClusterIP - if (!Array.isArray(objBackend["endpoints"])) { - throw "endpoint object is not an array" - } - - if (objBackend["endpoints"].length < 1) { - throw "no backends available for the service" - } - - // TODO: Do we want to implement a different LB for Passthrough when it is composed of multiple backends? - var randomBackend = Math.floor(Math.random() * (objBackend["endpoints"].length)); - if (typeof objBackend["endpoints"][randomBackend] != 'string') { - throw "endpoint is not a string" - } - return objBackend["endpoints"][randomBackend] - - } catch (e) { - // If there's an error we should give user a return saying it - return "@invalidbackend" - } -} - -export default {set, truncate, getUpstream}; \ No newline at end of file +export default {getConfigStatus, configBackends, getBackend}; \ No newline at end of file diff --git a/rootfs/etc/nginx/template/nginx.tmpl b/rootfs/etc/nginx/template/nginx.tmpl index 0421b4831..23c3b9f58 100644 --- a/rootfs/etc/nginx/template/nginx.tmpl +++ b/rootfs/etc/nginx/template/nginx.tmpl @@ -37,7 +37,6 @@ load_module /etc/nginx/modules/ngx_http_opentracing_module.so; load_module /modules_mount/etc/nginx/modules/otel/otel_ngx_module.so; {{ end }} -load_module modules/ngx_http_js_module.so; load_module modules/ngx_stream_js_module.so; daemon off; @@ -763,76 +762,7 @@ http { } } - # NGX Server for NJS Operations - # TODO: Check if the shared map can be accessed between stream and server directives - server { - listen 127.0.0.1:11111; # TODO: Turn this configurable - set $proxy_upstream_name "njs"; - - keepalive_timeout 0; - gzip off; - - access_log off; - - {{ if $cfg.EnableOpentracing }} - opentracing off; - {{ end }} - - {{ if $cfg.EnableOpentelemetry }} - opentelemetry off; - {{ end }} - - {{ if $cfg.EnableModsecurity }} - modsecurity off; - {{ end }} - - location {{ $healthzURI }} { - return 200; - } - - location /is-dynamic-lb-initialized { - content_by_lua_block { - local configuration = require("configuration") - local backend_data = configuration.get_backends_data() - if not backend_data then - ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) - return - end - - ngx.say("OK") - ngx.exit(ngx.HTTP_OK) - } - } - - location {{ .StatusPath }} { - stub_status on; - } - - location /ptcfg/truncate { - client_max_body_size {{ luaConfigurationRequestBodySize $cfg }}; - client_body_buffer_size {{ luaConfigurationRequestBodySize $cfg }}; - proxy_buffering off; - - js_content passthrough.truncate; - } - - location /ptcfg/set { - client_max_body_size {{ luaConfigurationRequestBodySize $cfg }}; - client_body_buffer_size {{ luaConfigurationRequestBodySize $cfg }}; - proxy_buffering off; - - js_content passthrough.set; - } - - location / { - content_by_lua_block { - ngx.exit(ngx.HTTP_NOT_FOUND) - } - } - } - -} - + stream { lua_package_path "/etc/nginx/lua/?.lua;/etc/nginx/lua/vendor/?.lua;;"; @@ -900,19 +830,30 @@ stream { } } - {{ if and $all.IsSSLPassthroughEnabled }} - # Start SSLPassthrough configuration - + {{/* Begin of the new SSLPassthrough approach */}} + + {{/* This is the configuration endpoint for dynamic passthrough + It should be made configurable before reaching GA + We want this server to be always running + */}} + js_import njs/passthrough.js; js_shared_dict_zone zone=ptbackends:32m type=string; - js_set $ptupstream passthrough.getUpstream; - + server { + listen 19090; + js_preread passthrough.configBackends; + js_set $cfgreturn passthrough.getConfigStatus; + return $cfgreturn; + } + {{ if and $all.IsSSLPassthroughEnabled }} server { {{ buildSSLPassthroughListener $all }} ssl_preread on; + js_set $ptupstream passthrough.getBackend; proxy_pass $ptupstream; } {{ end }} + {{/* End of new SSLPassthrough approach */}} server { listen 127.0.0.1:{{ .StreamPort }};