Use a unix socket instead udp for reception of metrics (#2652)

This commit is contained in:
Manuel Alejandro de Brito Fontes 2018-06-17 11:04:03 -04:00 committed by GitHub
parent cd7625b72f
commit c4ec773966
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 94 deletions

View file

@ -130,13 +130,12 @@ func main() {
err = collector.InitNGINXStatusCollector(conf.Namespace, class.IngressClass, conf.ListenPorts.Status) err = collector.InitNGINXStatusCollector(conf.Namespace, class.IngressClass, conf.ListenPorts.Status)
if err != nil { if err != nil {
glog.Fatalf("Error generating metric collector: %v", err) glog.Fatalf("Error creating metric collector: %v", err)
} }
err = collector.InitUDPCollector(conf.Namespace, class.IngressClass, 8000) err = collector.NewInstance(conf.Namespace, class.IngressClass)
if err != nil { if err != nil {
glog.Fatalf("Error generating UDP collector: %v", err) glog.Fatalf("Error creating unix socket server: %v", err)
} }
ngx.Start() ngx.Start()

View file

@ -26,7 +26,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
type udpData struct { type socketData struct {
Host string `json:"host"` // Label Host string `json:"host"` // Label
Status string `json:"status"` // Label Status string `json:"status"` // Label
@ -53,8 +53,8 @@ type udpData struct {
Service string `json:"service"` // Label Service string `json:"service"` // Label
} }
// UDPCollector stores prometheus metrics and ingress meta-data // SocketCollector stores prometheus metrics and ingress meta-data
type UDPCollector struct { type SocketCollector struct {
upstreamResponseTime *prometheus.HistogramVec upstreamResponseTime *prometheus.HistogramVec
requestTime *prometheus.HistogramVec requestTime *prometheus.HistogramVec
requestLength *prometheus.HistogramVec requestLength *prometheus.HistogramVec
@ -62,20 +62,18 @@ type UDPCollector struct {
collectorSuccess *prometheus.GaugeVec collectorSuccess *prometheus.GaugeVec
collectorSuccessTime *prometheus.GaugeVec collectorSuccessTime *prometheus.GaugeVec
requests *prometheus.CounterVec requests *prometheus.CounterVec
listener *net.UDPConn listener net.Listener
ns string ns string
ingressClass string ingressClass string
port int
} }
// InitUDPCollector creates a new UDPCollector instance // NewInstance creates a new SocketCollector instance
func InitUDPCollector(ns string, class string, port int) error { func NewInstance(ns string, class string) error {
sc := UDPCollector{} sc := SocketCollector{}
ns = strings.Replace(ns, "-", "_", -1) ns = strings.Replace(ns, "-", "_", -1)
listener, err := newUDPListener(port) listener, err := net.Listen("unix", "/tmp/prometheus-nginx.socket")
if err != nil { if err != nil {
return err return err
} }
@ -83,7 +81,6 @@ func InitUDPCollector(ns string, class string, port int) error {
sc.listener = listener sc.listener = listener
sc.ns = ns sc.ns = ns
sc.ingressClass = class sc.ingressClass = class
sc.port = port
requestTags := []string{"host", "status", "remote_address", "real_ip_address", "remote_user", "protocol", "method", "uri", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"} requestTags := []string{"host", "status", "remote_address", "real_ip_address", "remote_user", "protocol", "method", "uri", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"}
collectorTags := []string{"namespace", "ingress_class"} collectorTags := []string{"namespace", "ingress_class"}
@ -166,13 +163,13 @@ func InitUDPCollector(ns string, class string, port int) error {
return nil return nil
} }
func (sc *UDPCollector) handleMessage(msg []byte) { func (sc *SocketCollector) handleMessage(msg []byte) {
glog.V(5).Infof("msg: %v", string(msg)) glog.V(5).Infof("msg: %v", string(msg))
collectorSuccess := true collectorSuccess := true
// Unmarshall bytes // Unmarshall bytes
var stats udpData var stats socketData
err := json.Unmarshal(msg, &stats) err := json.Unmarshal(msg, &stats)
if err != nil { if err != nil {
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err) glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
@ -271,7 +268,29 @@ func (sc *UDPCollector) handleMessage(msg []byte) {
} }
} }
// Run adds a message handler to a UDP listener // Run listen for connections in the unix socket and spawns a goroutine to process the content
func (sc *UDPCollector) Run() { func (sc *SocketCollector) Run() {
handleMessages(sc.listener, sc.handleMessage) for {
conn, err := sc.listener.Accept()
if err != nil {
continue
}
go handleMessages(conn, sc.handleMessage)
}
}
const packetSize = 1024 * 65
// handleMessages process the content received in a network connection
func handleMessages(conn net.Conn, fn func([]byte)) {
defer conn.Close()
msg := make([]byte, packetSize)
s, err := conn.Read(msg[0:])
if err != nil {
return
}
fn(msg[0:s])
} }

View file

@ -25,8 +25,6 @@ import (
) )
func TestNewUDPLogListener(t *testing.T) { func TestNewUDPLogListener(t *testing.T) {
port := freeUDPPort()
var count uint64 var count uint64
fn := func(message []byte) { fn := func(message []byte) {
@ -34,19 +32,30 @@ func TestNewUDPLogListener(t *testing.T) {
atomic.AddUint64(&count, 1) atomic.AddUint64(&count, 1)
} }
t.Logf("UDP Port: %v", port) tmpFile := fmt.Sprintf("/tmp/test-socket-%v", time.Now().Nanosecond())
l, err := newUDPListener(port) l, err := net.Listen("unix", tmpFile)
if err != nil { if err != nil {
t.Errorf("unexpected error creating UDP listener: %v", err) t.Fatalf("unexpected error creating unix socket: %v", err)
} }
if l == nil { if l == nil {
t.Errorf("expected a listener but none returned") t.Fatalf("expected a listener but none returned")
} }
go handleMessages(l, fn) defer l.Close()
conn, _ := net.Dial("udp", fmt.Sprintf(":%v", port)) go func() {
for {
conn, err := l.Accept()
if err != nil {
continue
}
go handleMessages(conn, fn)
}
}()
conn, _ := net.Dial("unix", tmpFile)
conn.Write([]byte("message")) conn.Write([]byte("message"))
conn.Close() conn.Close()
@ -55,16 +64,3 @@ func TestNewUDPLogListener(t *testing.T) {
t.Errorf("expected only one message from the UDP listern but %v returned", count) t.Errorf("expected only one message from the UDP listern but %v returned", count)
} }
} }
func freeUDPPort() int {
l, err := net.ListenUDP("udp", &net.UDPAddr{})
if err != nil {
return 0
}
if err := l.Close(); err != nil {
return 0
}
return l.LocalAddr().(*net.UDPAddr).Port
}

View file

@ -1,51 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"fmt"
"net"
)
const packetSize = 1024 * 65
// newUDPListener creates a new UDP listener used to process messages
// from the NGINX log phase containing information about a request
func newUDPListener(port int) (*net.UDPConn, error) {
service := fmt.Sprintf("127.0.0.1:%v", port)
udpAddr, err := net.ResolveUDPAddr("udp4", service)
if err != nil {
return nil, err
}
return net.ListenUDP("udp", udpAddr)
}
// handleMessages process packets received in an UDP connection
func handleMessages(conn *net.UDPConn, fn func([]byte)) {
msg := make([]byte, packetSize)
for {
s, _, err := conn.ReadFrom(msg[0:])
if err != nil {
continue
}
fn(msg[0:s])
}
}

View file

@ -1,4 +1,4 @@
local socket = ngx.socket.udp local socket = ngx.socket.tcp
local cjson = require('cjson') local cjson = require('cjson')
local defer = require('defer') local defer = require('defer')
local assert = assert local assert = assert
@ -7,7 +7,7 @@ local _M = {}
local function send_data(jsonData) local function send_data(jsonData)
local s = assert(socket()) local s = assert(socket())
assert(s:setpeername("127.0.0.1", 8000)) assert(s:connect('unix:/tmp/prometheus-nginx.socket'))
assert(s:send(jsonData)) assert(s:send(jsonData))
assert(s:close()) assert(s:close())
end end