Merge pull request #2726 from aledbf/gather

Cleanup prometheus metrics after a reload
This commit is contained in:
k8s-ci-robot 2018-07-12 10:13:25 -07:00 committed by GitHub
commit 1cdd64384a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 1896 additions and 630 deletions

View file

@ -28,6 +28,8 @@ FOCUS ?= .*
# number of parallel test
E2E_NODES ?= 3
NODE_IP ?= $(shell minikube ip)
ifeq ($(GOHOSTOS),darwin)
SED_I=sed -i ''
endif
@ -165,6 +167,7 @@ static-check:
.PHONY: test
test:
@$(DEF_VARS) \
NODE_IP=$(NODE_IP) \
DOCKER_OPTS="--net=host" \
build/go-in-docker.sh build/test.sh
@ -180,6 +183,7 @@ e2e-test:
FOCUS=$(FOCUS) \
E2E_NODES=$(E2E_NODES) \
DOCKER_OPTS="--net=host" \
NODE_IP=$(NODE_IP) \
build/go-in-docker.sh build/e2e-tests.sh
.PHONY: cover

View file

@ -30,6 +30,10 @@ if [ -z "${E2E_NODES}" ]; then
echo "E2E_NODES must be set"
exit 1
fi
if [ -z "${NODE_IP}" ]; then
echo "NODE_IP must be set"
exit 1
fi
SCRIPT_ROOT=$(dirname ${BASH_SOURCE})/..
@ -46,13 +50,6 @@ if ! [ -x "$(command -v kubectl)" ]; then
chmod +x ${TEST_BINARIES}/kubectl
fi
if ! [ -x "$(command -v minikube)" ]; then
echo "downloading minikube..."
curl -sSLo ${TEST_BINARIES}/minikube \
https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
chmod +x ${TEST_BINARIES}/minikube
fi
ginkgo build ./test/e2e
ginkgo \

View file

@ -62,6 +62,7 @@ GOARCH=${GOARCH}
PWD=${PWD}
BUSTED_ARGS=${BUSTED_ARGS:-""}
REPO_INFO=${REPO_INFO:-local}
NODE_IP=${NODE_IP:-127.0.0.1}
EOF
docker run \

View file

@ -29,6 +29,7 @@ import (
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -39,9 +40,8 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/controller"
"k8s.io/ingress-nginx/internal/ingress/metric/collector"
"k8s.io/ingress-nginx/internal/ingress/metric"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/internal/net/ssl"
"k8s.io/ingress-nginx/version"
@ -118,25 +118,20 @@ func main() {
conf.Client = kubeClient
ngx := controller.NewNGINXController(conf, fs)
reg := prometheus.NewRegistry()
mc, err := metric.NewCollector(conf.ListenPorts.Status, reg)
if err != nil {
glog.Fatalf("Error creating prometheus collectos: %v", err)
}
mc.Start()
ngx := controller.NewNGINXController(conf, mc, fs)
go handleSigterm(ngx, func(code int) {
os.Exit(code)
})
mux := http.NewServeMux()
go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux)
err = collector.InitNGINXStatusCollector(conf.Namespace, class.IngressClass, conf.ListenPorts.Status)
if err != nil {
glog.Fatalf("Error creating metric collector: %v", err)
}
err = collector.NewInstance(conf.Namespace, class.IngressClass)
if err != nil {
glog.Fatalf("Error creating unix socket server: %v", err)
}
go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux, reg)
ngx.Start()
}
@ -240,14 +235,20 @@ func handleFatalInitError(err error) {
err)
}
func registerHandlers(enableProfiling bool, port int, ic *controller.NGINXController, mux *http.ServeMux) {
func registerHandlers(
enableProfiling bool,
port int,
ic *controller.NGINXController,
mux *http.ServeMux,
reg *prometheus.Registry) {
// expose health check endpoint (/healthz)
healthz.InstallHandler(mux,
healthz.PingHealthz,
ic,
)
mux.Handle("/metrics", promhttp.Handler())
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
mux.HandleFunc("/build", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)

View file

@ -76,7 +76,7 @@ func TestHandleSigterm(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
ngx := controller.NewNGINXController(conf, fs)
ngx := controller.NewNGINXController(conf, nil, fs)
go handleSigterm(ngx, func(code int) {
if code != 1 {

View file

@ -25,6 +25,7 @@ import (
"time"
"github.com/golang/glog"
"github.com/mitchellh/hashstructure"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
@ -148,38 +149,43 @@ func (n *NGINXController) syncIngress(interface{}) error {
}
}
pcfg := ingress.Configuration{
pcfg := &ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,
ConfigurationChecksum: n.store.GetBackendConfiguration().Checksum,
BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum,
}
if n.runningConfig.Equal(&pcfg) {
if n.runningConfig.Equal(pcfg) {
glog.V(3).Infof("No configuration change detected, skipping backend reload.")
return nil
}
if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(&pcfg) {
if n.cfg.DynamicConfigurationEnabled && n.IsDynamicConfigurationEnough(pcfg) {
glog.Infof("Changes handled by the dynamic configuration, skipping backend reload.")
} else {
glog.Infof("Configuration changes detected, backend reload required.")
err := n.OnUpdate(pcfg)
hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
TagName: "json",
})
pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
err := n.OnUpdate(*pcfg)
if err != nil {
IncReloadErrorCount()
ConfigSuccess(false)
n.metricCollector.IncReloadErrorCount()
n.metricCollector.ConfigSuccess(hash, false)
glog.Errorf("Unexpected failure reloading the backend:\n%v", err)
return err
}
glog.Infof("Backend successfully reloaded.")
ConfigSuccess(true)
IncReloadCount()
setSSLExpireTime(servers)
n.metricCollector.ConfigSuccess(hash, true)
n.metricCollector.IncReloadCount()
n.metricCollector.SetSSLExpireTime(servers)
}
if n.cfg.DynamicConfigurationEnabled {
@ -191,7 +197,7 @@ func (n *NGINXController) syncIngress(interface{}) error {
// it takes time for NGINX to start listening on the configured ports
time.Sleep(1 * time.Second)
}
err := configureDynamically(&pcfg, n.cfg.ListenPorts.Status)
err := configureDynamically(pcfg, n.cfg.ListenPorts.Status)
if err == nil {
glog.Infof("Dynamic reconfiguration succeeded.")
} else {
@ -200,7 +206,11 @@ func (n *NGINXController) syncIngress(interface{}) error {
}(isFirstSync)
}
n.runningConfig = &pcfg
ri := getRemovedIngresses(n.runningConfig, pcfg)
re := getRemovedHosts(n.runningConfig, pcfg)
n.metricCollector.RemoveMetrics(ri, re)
n.runningConfig = pcfg
return nil
}
@ -1112,3 +1122,57 @@ func extractTLSSecretName(host string, ing *extensions.Ingress,
return ""
}
// getRemovedHosts returns a list of the hostsnames
// that are not associated anymore to the NGINX configuration.
func getRemovedHosts(rucfg, newcfg *ingress.Configuration) []string {
old := sets.NewString()
new := sets.NewString()
for _, s := range rucfg.Servers {
if !old.Has(s.Hostname) {
old.Insert(s.Hostname)
}
}
for _, s := range newcfg.Servers {
if !new.Has(s.Hostname) {
new.Insert(s.Hostname)
}
}
return old.Difference(new).List()
}
func getRemovedIngresses(rucfg, newcfg *ingress.Configuration) []string {
oldIngresses := sets.NewString()
newIngresses := sets.NewString()
for _, server := range rucfg.Servers {
for _, location := range server.Locations {
if location.Ingress == nil {
continue
}
ingKey := k8s.MetaNamespaceKey(location.Ingress)
if !oldIngresses.Has(ingKey) {
oldIngresses.Insert(ingKey)
}
}
}
for _, server := range newcfg.Servers {
for _, location := range server.Locations {
if location.Ingress == nil {
continue
}
ingKey := k8s.MetaNamespaceKey(location.Ingress)
if !newIngresses.Has(ingKey) {
newIngresses.Insert(ingKey)
}
}
}
return oldIngresses.Difference(newIngresses).List()
}

View file

@ -1,120 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/internal/ingress"
)
const (
ns = "ingress_controller"
operation = "count"
reloadLabel = "reloads"
sslLabelExpire = "ssl_expire_time_seconds"
sslLabelHost = "host"
)
func init() {
prometheus.MustRegister(reloadOperation)
prometheus.MustRegister(reloadOperationErrors)
prometheus.MustRegister(sslExpireTime)
prometheus.MustRegister(configSuccess)
prometheus.MustRegister(configSuccessTime)
}
var (
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "config_last_reload_successfull",
Help: `Whether the last configuration reload attemp was successful.
Prometheus alert example:
alert: IngressControllerFailedReload
expr: ingress_controller_config_last_reload_successfull == 0
for: 10m`,
})
configSuccessTime = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "config_last_reload_successfull_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})
// TODO depreciate this metrics in favor of ingress_controller_config_last_reload_successfull_timestamp_seconds
reloadOperation = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: ns,
Name: "success",
Help: `DEPRECATED: use ingress_controller_config_last_reload_successfull_timestamp_seconds or ingress_controller_config_last_reload_successfull instead.
Cumulative number of Ingress controller reload operations`,
},
[]string{operation},
)
// TODO depreciate this metrics in favor of ingress_controller_config_last_reload_successfull_timestamp_seconds
reloadOperationErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: ns,
Name: "errors",
Help: `DEPRECATED: use ingress_controller_config_last_reload_successfull_timestamp_seconds or ingress_controller_config_last_reload_successfull instead.
Cumulative number of Ingress controller errors during reload operations`,
},
[]string{operation},
)
sslExpireTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: ns,
Name: sslLabelExpire,
Help: "Number of seconds since 1970 to the SSL Certificate expire. An example to check if this " +
"certificate will expire in 10 days is: \"ingress_controller_ssl_expire_time_seconds < (time() + (10 * 24 * 3600))\"",
},
[]string{sslLabelHost},
)
)
// IncReloadCount increment the reload counter
func IncReloadCount() {
reloadOperation.WithLabelValues(reloadLabel).Inc()
}
// IncReloadErrorCount increment the reload error counter
func IncReloadErrorCount() {
reloadOperationErrors.WithLabelValues(reloadLabel).Inc()
}
// ConfigSuccess set a boolean flag according to the output of the controller configuration reload
func ConfigSuccess(success bool) {
if success {
ConfigSuccessTime()
configSuccess.Set(1)
} else {
configSuccess.Set(0)
}
}
// ConfigSuccessTime set the current timestamp when the controller is successfully reloaded
func ConfigSuccessTime() {
configSuccessTime.Set(float64(time.Now().Unix()))
}
func setSSLExpireTime(servers []*ingress.Server) {
for _, s := range servers {
if s.Hostname != defServerName {
sslExpireTime.WithLabelValues(s.Hostname).Set(float64(s.SSLCert.ExpireTime.Unix()))
}
}
}

View file

@ -53,6 +53,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress/controller/process"
"k8s.io/ingress-nginx/internal/ingress/controller/store"
ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template"
"k8s.io/ingress-nginx/internal/ingress/metric"
"k8s.io/ingress-nginx/internal/ingress/status"
ing_net "k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/net/dns"
@ -70,7 +71,7 @@ var (
)
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
func NewNGINXController(config *Configuration, mc metric.Collector, fs file.Filesystem) *NGINXController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
@ -103,6 +104,8 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
runningConfig: new(ingress.Configuration),
Proxy: &TCPProxy{},
metricCollector: mc,
}
n.store = store.New(
@ -243,6 +246,8 @@ type NGINXController struct {
store store.Storer
fileSystem filesystem.Filesystem
metricCollector metric.Collector
}
// Start starts a new NGINX master process running in the foreground.
@ -590,6 +595,8 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
DisableLua: n.cfg.DisableLua,
}
tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
content, err := n.t.Write(tc)
if err != nil {
return err

View file

@ -144,11 +144,11 @@ func TestConfigureDynamically(t *testing.T) {
t.Fatal(err)
}
body := string(b)
if strings.Index(body, "target") != -1 {
if strings.Contains(body, "target") {
t.Errorf("unexpected target reference in JSON content: %v", body)
}
if strings.Index(body, "service") != -1 {
if strings.Contains(body, "service") {
t.Errorf("unexpected service reference in JSON content: %v", body)
}

View file

@ -1,289 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"encoding/json"
"net"
"strings"
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
type socketData struct {
Host string `json:"host"` // Label
Status string `json:"status"` // Label
BytesSent float64 `json:"bytesSent"` // Metric
Protocol string `json:"protocol"` // Label
Method string `json:"method"` // Label
RequestLength float64 `json:"requestLength"` // Metric
RequestTime float64 `json:"requestTime"` // Metric
UpstreamName string `json:"upstreamName"` // Label
UpstreamIP string `json:"upstreamIP"` // Label
UpstreamResponseTime float64 `json:"upstreamResponseTime"` // Metric
UpstreamStatus string `json:"upstreamStatus"` // Label
Namespace string `json:"namespace"` // Label
Ingress string `json:"ingress"` // Label
Service string `json:"service"` // Label
Path string `json:"path"` // Label
}
// SocketCollector stores prometheus metrics and ingress meta-data
type SocketCollector struct {
upstreamResponseTime *prometheus.HistogramVec
requestTime *prometheus.HistogramVec
requestLength *prometheus.HistogramVec
bytesSent *prometheus.HistogramVec
collectorSuccess *prometheus.GaugeVec
collectorSuccessTime *prometheus.GaugeVec
requests *prometheus.CounterVec
listener net.Listener
ns string
ingressClass string
}
// NewInstance creates a new SocketCollector instance
func NewInstance(ns string, class string) error {
sc := SocketCollector{}
ns = strings.Replace(ns, "-", "_", -1)
listener, err := net.Listen("unix", "/tmp/prometheus-nginx.socket")
if err != nil {
return err
}
sc.listener = listener
sc.ns = ns
sc.ingressClass = class
requestTags := []string{"host", "status", "protocol", "method", "path", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"}
collectorTags := []string{"namespace", "ingress_class"}
sc.upstreamResponseTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "upstream_response_time_seconds",
Help: "The time spent on receiving the response from the upstream server",
Namespace: ns,
},
requestTags,
)
sc.requestTime = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_duration_seconds",
Help: "The request processing time in seconds",
Namespace: ns,
},
requestTags,
)
sc.requestLength = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_length_bytes",
Help: "The request length (including request line, header, and request body)",
Namespace: ns,
Buckets: prometheus.LinearBuckets(10, 10, 10), // 10 buckets, each 10 bytes wide.
},
requestTags,
)
sc.requests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "requests",
Help: "The total number of client requests.",
Namespace: ns,
},
collectorTags,
)
sc.bytesSent = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "bytes_sent",
Help: "The the number of bytes sent to a client",
Namespace: ns,
Buckets: prometheus.ExponentialBuckets(10, 10, 7), // 7 buckets, exponential factor of 10.
},
requestTags,
)
sc.collectorSuccess = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "collector_last_run_successful",
Help: "Whether the last collector run was successful (success = 1, failure = 0).",
Namespace: ns,
},
collectorTags,
)
sc.collectorSuccessTime = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "collector_last_run_successful_timestamp_seconds",
Help: "Timestamp of the last successful collector run",
Namespace: ns,
},
collectorTags,
)
prometheus.MustRegister(sc.upstreamResponseTime)
prometheus.MustRegister(sc.requestTime)
prometheus.MustRegister(sc.requestLength)
prometheus.MustRegister(sc.requests)
prometheus.MustRegister(sc.bytesSent)
prometheus.MustRegister(sc.collectorSuccess)
prometheus.MustRegister(sc.collectorSuccessTime)
go sc.Run()
return nil
}
func (sc *SocketCollector) handleMessage(msg []byte) {
glog.V(5).Infof("msg: %v", string(msg))
collectorSuccess := true
// Unmarshall bytes
var stats socketData
err := json.Unmarshal(msg, &stats)
if err != nil {
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
collectorSuccess = false
return
}
// Create Request Labels Map
requestLabels := prometheus.Labels{
"host": stats.Host,
"status": stats.Status,
"protocol": stats.Protocol,
"method": stats.Method,
"path": stats.Path,
"upstream_name": stats.UpstreamName,
"upstream_ip": stats.UpstreamIP,
"upstream_status": stats.UpstreamStatus,
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}
// Create Collector Labels Map
collectorLabels := prometheus.Labels{
"namespace": sc.ns,
"ingress_class": sc.ingressClass,
}
// Emit metrics
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching requests metric: %v", err)
collectorSuccess = false
} else {
requestsMetric.Inc()
}
if stats.UpstreamResponseTime != -1 {
upstreamResponseTimeMetric, err := sc.upstreamResponseTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching upstream response time metric: %v", err)
collectorSuccess = false
} else {
upstreamResponseTimeMetric.Observe(stats.UpstreamResponseTime)
}
}
if stats.RequestTime != -1 {
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request duration metric: %v", err)
collectorSuccess = false
} else {
requestTimeMetric.Observe(stats.RequestTime)
}
}
if stats.RequestLength != -1 {
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request length metric: %v", err)
collectorSuccess = false
} else {
requestLengthMetric.Observe(stats.RequestLength)
}
}
if stats.BytesSent != -1 {
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
collectorSuccess = false
} else {
bytesSentMetric.Observe(stats.BytesSent)
}
}
collectorSuccessMetric, err := sc.collectorSuccess.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching collector success metric: %v", err)
} else {
if collectorSuccess {
collectorSuccessMetric.Set(1)
collectorSuccessTimeMetric, err := sc.collectorSuccessTime.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching collector success time metric: %v", err)
} else {
collectorSuccessTimeMetric.Set(float64(time.Now().Unix()))
}
} else {
collectorSuccessMetric.Set(0)
}
}
}
// Run listen for connections in the unix socket and spawns a goroutine to process the content
func (sc *SocketCollector) Run() {
for {
conn, err := sc.listener.Accept()
if err != nil {
continue
}
go handleMessages(conn, sc.handleMessage)
}
}
const packetSize = 1024 * 65
// handleMessages process the content received in a network connection
func handleMessages(conn net.Conn, fn func([]byte)) {
defer conn.Close()
msg := make([]byte, packetSize)
s, err := conn.Read(msg[0:])
if err != nil {
return
}
fn(msg[0:s])
}

View file

@ -1,66 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
import (
"fmt"
"net"
"sync/atomic"
"testing"
"time"
)
func TestNewUDPLogListener(t *testing.T) {
var count uint64
fn := func(message []byte) {
t.Logf("message: %v", string(message))
atomic.AddUint64(&count, 1)
}
tmpFile := fmt.Sprintf("/tmp/test-socket-%v", time.Now().Nanosecond())
l, err := net.Listen("unix", tmpFile)
if err != nil {
t.Fatalf("unexpected error creating unix socket: %v", err)
}
if l == nil {
t.Fatalf("expected a listener but none returned")
}
defer l.Close()
go func() {
for {
conn, err := l.Accept()
if err != nil {
continue
}
go handleMessages(conn, fn)
}
}()
conn, _ := net.Dial("unix", tmpFile)
conn.Write([]byte("message"))
conn.Close()
time.Sleep(1 * time.Millisecond)
if count != 1 {
t.Errorf("expected only one message from the UDP listern but %v returned", count)
}
}

View file

@ -0,0 +1,218 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"time"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-nginx/internal/ingress"
)
var (
operation = []string{"namespace", "class"}
sslLabelHost = []string{"namespace", "class", "host"}
)
// Controller defines base metrics about the ingress controller
type Controller struct {
prometheus.Collector
configHash prometheus.Gauge
configSuccess prometheus.Gauge
configSuccessTime prometheus.Gauge
reloadOperation *prometheus.CounterVec
reloadOperationErrors *prometheus.CounterVec
sslExpireTime *prometheus.GaugeVec
labels prometheus.Labels
}
// NewController creates a new prometheus collector for the
// Ingress controller operations
func NewController(pod, namespace, class string) *Controller {
constLabels := prometheus.Labels{
"controller_namespace": namespace,
"controller_class": class,
"controller_pod": pod,
}
cm := &Controller{
labels: prometheus.Labels{
"namespace": namespace,
"class": class,
},
configHash: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: PrometheusNamespace,
Name: "config_hash",
Help: "Running configuration hash actually running",
ConstLabels: constLabels,
},
),
configSuccess: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: PrometheusNamespace,
Name: "config_last_reload_successful",
Help: "Whether the last configuration reload attemp was successful",
ConstLabels: constLabels,
}),
configSuccessTime: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: PrometheusNamespace,
Name: "config_last_reload_successful_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
ConstLabels: constLabels,
}),
reloadOperation: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: PrometheusNamespace,
Name: "success",
Help: `Cumulative number of Ingress controller reload operations`,
},
operation,
),
reloadOperationErrors: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: PrometheusNamespace,
Name: "errors",
Help: `Cumulative number of Ingress controller errors during reload operations`,
},
operation,
),
sslExpireTime: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: PrometheusNamespace,
Name: "ssl_expire_time_seconds",
Help: `Number of seconds since 1970 to the SSL Certificate expire.
An example to check if this certificate will expire in 10 days is: "nginx_ingress_controller_ssl_expire_time_seconds < (time() + (10 * 24 * 3600))"`,
},
sslLabelHost,
),
}
return cm
}
// IncReloadCount increment the reload counter
func (cm *Controller) IncReloadCount() {
cm.reloadOperation.With(cm.labels).Inc()
}
// IncReloadErrorCount increment the reload error counter
func (cm *Controller) IncReloadErrorCount() {
cm.reloadOperationErrors.With(cm.labels).Inc()
}
// ConfigSuccess set a boolean flag according to the output of the controller configuration reload
func (cm *Controller) ConfigSuccess(hash uint64, success bool) {
if success {
cm.configSuccessTime.Set(float64(time.Now().Unix()))
cm.configSuccess.Set(1)
cm.configHash.Set(float64(hash))
return
}
cm.configSuccess.Set(0)
cm.configHash.Set(0)
}
// Describe implements prometheus.Collector
func (cm Controller) Describe(ch chan<- *prometheus.Desc) {
cm.configHash.Describe(ch)
cm.configSuccess.Describe(ch)
cm.configSuccessTime.Describe(ch)
cm.reloadOperation.Describe(ch)
cm.reloadOperationErrors.Describe(ch)
cm.sslExpireTime.Describe(ch)
}
// Collect implements the prometheus.Collector interface.
func (cm Controller) Collect(ch chan<- prometheus.Metric) {
cm.configHash.Collect(ch)
cm.configSuccess.Collect(ch)
cm.configSuccessTime.Collect(ch)
cm.reloadOperation.Collect(ch)
cm.reloadOperationErrors.Collect(ch)
cm.sslExpireTime.Collect(ch)
}
// SetSSLExpireTime sets the expiration time of SSL Certificates
func (cm *Controller) SetSSLExpireTime(servers []*ingress.Server) {
for _, s := range servers {
if s.Hostname != "" && s.SSLCert.ExpireTime.Unix() > 0 {
labels := make(prometheus.Labels, len(cm.labels)+1)
for k, v := range cm.labels {
labels[k] = v
}
labels["host"] = s.Hostname
cm.sslExpireTime.With(labels).Set(float64(s.SSLCert.ExpireTime.Unix()))
}
}
}
// RemoveMetrics removes metrics for hostames not available anymore
func (cm *Controller) RemoveMetrics(hosts []string, registry prometheus.Gatherer) {
mfs, err := registry.Gather()
if err != nil {
glog.Errorf("Error gathering metrics: %v", err)
return
}
glog.V(2).Infof("removing SSL certificate metrics for %v hosts", hosts)
toRemove := sets.NewString(hosts...)
for _, mf := range mfs {
metricName := mf.GetName()
if "ssl_expire_time_seconds" != metricName {
continue
}
for _, m := range mf.GetMetric() {
labels := make(map[string]string, len(m.GetLabel()))
for _, labelPair := range m.GetLabel() {
labels[*labelPair.Name] = *labelPair.Value
}
// remove labels that are constant
deleteConstants(labels)
host, ok := labels["host"]
if !ok {
continue
}
if !toRemove.Has(host) {
continue
}
glog.V(2).Infof("Removing prometheus metric from gauge %v for host %v", metricName, host)
removed := cm.sslExpireTime.Delete(labels)
if !removed {
glog.V(2).Infof("metric %v for host %v with labels not removed: %v", metricName, host, labels)
}
}
}
}

View file

@ -0,0 +1,122 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/internal/ingress"
)
func TestControllerCounters(t *testing.T) {
const metadata = `
# HELP nginx_ingress_controller_config_last_reload_successful Whether the last configuration reload attemp was successful
# TYPE nginx_ingress_controller_config_last_reload_successful gauge
# HELP nginx_ingress_controller_success Cumulative number of Ingress controller reload operations
# TYPE nginx_ingress_controller_success counter
`
cases := []struct {
name string
test func(*Controller)
metrics []string
want string
}{
{
name: "should return not increment in metrics if no operations are invoked",
test: func(cm *Controller) {
},
want: metadata + `
nginx_ingress_controller_config_last_reload_successful{controller_class="nginx",controller_namespace="default",controller_pod="pod"} 0
`,
metrics: []string{"nginx_ingress_controller_config_last_reload_successful", "nginx_ingress_controller_success"},
},
{
name: "single increase in reload count should return 1",
test: func(cm *Controller) {
cm.IncReloadCount()
cm.ConfigSuccess(0, true)
},
want: metadata + `
nginx_ingress_controller_config_last_reload_successful{controller_class="nginx",controller_namespace="default",controller_pod="pod"} 1
nginx_ingress_controller_success{class="nginx",namespace="default"} 1
`,
metrics: []string{"nginx_ingress_controller_config_last_reload_successful", "nginx_ingress_controller_success"},
},
{
name: "single increase in error reload count should return 1",
test: func(cm *Controller) {
cm.IncReloadErrorCount()
},
want: `
# HELP nginx_ingress_controller_errors Cumulative number of Ingress controller errors during reload operations
# TYPE nginx_ingress_controller_errors counter
nginx_ingress_controller_errors{class="nginx",namespace="default"} 1
`,
metrics: []string{"nginx_ingress_controller_errors"},
},
{
name: "should set SSL certificates metrics",
test: func(cm *Controller) {
t1, _ := time.Parse(
time.RFC3339,
"2012-11-01T22:08:41+00:00")
servers := []*ingress.Server{
{
Hostname: "demo",
SSLCert: ingress.SSLCert{
ExpireTime: t1,
},
},
{
Hostname: "invalid",
SSLCert: ingress.SSLCert{
ExpireTime: time.Unix(0, 0),
},
},
}
cm.SetSSLExpireTime(servers)
},
want: `
# HELP nginx_ingress_controller_ssl_expire_time_seconds Number of seconds since 1970 to the SSL Certificate expire.\n An example to check if this certificate will expire in 10 days is: "nginx_ingress_controller_ssl_expire_time_seconds < (time() + (10 * 24 * 3600))"
# TYPE nginx_ingress_controller_ssl_expire_time_seconds gauge
nginx_ingress_controller_ssl_expire_time_seconds{class="nginx",host="demo",namespace="default"} 1.351807721e+09
`,
metrics: []string{"nginx_ingress_controller_ssl_expire_time_seconds"},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
cm := NewController("pod", "default", "nginx")
reg := prometheus.NewPedanticRegistry()
if err := reg.Register(cm); err != nil {
t.Errorf("registering collector failed: %s", err)
}
c.test(cm)
if err := GatherAndCompare(cm, c.want, c.metrics, reg); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
reg.Unregister(cm)
})
}
}

View file

@ -0,0 +1,20 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
// PrometheusNamespace default metric namespace
var PrometheusNamespace = "nginx_ingress_controller"

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
package collectors
import (
"fmt"
@ -24,7 +24,6 @@ import (
"strconv"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
@ -39,11 +38,11 @@ var (
type (
nginxStatusCollector struct {
scrapeChan chan scrapeRequest
ngxHealthPort int
ngxStatusPath string
data *nginxStatusData
watchNamespace string
ingressClass string
}
nginxStatusData struct {
@ -70,44 +69,47 @@ type (
}
)
// InitNGINXStatusCollector returns a new prometheus collector the default nginx status module
func InitNGINXStatusCollector(watchNamespace, ingressClass string, ngxHealthPort int) error {
const ns string = "nginx"
const ngxStatusPath = "/nginx_status"
// NGINXStatusCollector defines a status collector interface
type NGINXStatusCollector interface {
prometheus.Collector
Start()
Stop()
}
// NewNGINXStatus returns a new prometheus collector the default nginx status module
func NewNGINXStatus(podName, namespace, ingressClass string, ngxHealthPort int) (NGINXStatusCollector, error) {
p := nginxStatusCollector{
scrapeChan: make(chan scrapeRequest),
ngxHealthPort: ngxHealthPort,
ngxStatusPath: ngxStatusPath,
watchNamespace: watchNamespace,
ingressClass: ingressClass,
ngxStatusPath: "/nginx_status",
}
constLabels := prometheus.Labels{
"controller_namespace": namespace,
"controller_class": ingressClass,
"controller_pod": podName,
}
p.data = &nginxStatusData{
connectionsTotal: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "connections_total"),
prometheus.BuildFQName(PrometheusNamespace, subSystem, "connections_total"),
"total number of connections with state {active, accepted, handled}",
[]string{"ingress_class", "namespace", "state"}, nil),
[]string{"state"}, constLabels),
requestsTotal: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "requests_total"),
prometheus.BuildFQName(PrometheusNamespace, subSystem, "requests_total"),
"total number of client requests",
[]string{"ingress_class", "namespace"}, nil),
nil, constLabels),
connections: prometheus.NewDesc(
prometheus.BuildFQName(ns, "", "connections"),
prometheus.BuildFQName(PrometheusNamespace, subSystem, "connections"),
"current number of client connections with state {reading, writing, waiting}",
[]string{"ingress_class", "namespace", "state"}, nil),
[]string{"state"}, constLabels),
}
err := prometheus.Register(p)
if err != nil {
return fmt.Errorf("error while registering nginx status collector : %v", err)
}
go p.Run()
return nil
return p, nil
}
// Describe implements prometheus.Collector.
@ -124,7 +126,7 @@ func (p nginxStatusCollector) Collect(ch chan<- prometheus.Metric) {
<-req.done
}
func (p nginxStatusCollector) Run() {
func (p nginxStatusCollector) Start() {
for req := range p.scrapeChan {
ch := req.results
p.scrape(ch)
@ -207,17 +209,17 @@ func (p nginxStatusCollector) scrape(ch chan<- prometheus.Metric) {
}
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
prometheus.CounterValue, float64(s.Active), p.ingressClass, p.watchNamespace, "active")
prometheus.CounterValue, float64(s.Active), "active")
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
prometheus.CounterValue, float64(s.Accepted), p.ingressClass, p.watchNamespace, "accepted")
prometheus.CounterValue, float64(s.Accepted), "accepted")
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
prometheus.CounterValue, float64(s.Handled), p.ingressClass, p.watchNamespace, "handled")
prometheus.CounterValue, float64(s.Handled), "handled")
ch <- prometheus.MustNewConstMetric(p.data.requestsTotal,
prometheus.CounterValue, float64(s.Requests), p.ingressClass, p.watchNamespace)
prometheus.CounterValue, float64(s.Requests))
ch <- prometheus.MustNewConstMetric(p.data.connections,
prometheus.GaugeValue, float64(s.Reading), p.ingressClass, p.watchNamespace, "reading")
prometheus.GaugeValue, float64(s.Reading), "reading")
ch <- prometheus.MustNewConstMetric(p.data.connections,
prometheus.GaugeValue, float64(s.Writing), p.ingressClass, p.watchNamespace, "writing")
prometheus.GaugeValue, float64(s.Writing), "writing")
ch <- prometheus.MustNewConstMetric(p.data.connections,
prometheus.GaugeValue, float64(s.Waiting), p.ingressClass, p.watchNamespace, "waiting")
prometheus.GaugeValue, float64(s.Waiting), "waiting")
}

View file

@ -0,0 +1,129 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
"github.com/prometheus/client_golang/prometheus"
)
func TestStatusCollector(t *testing.T) {
cases := []struct {
name string
mock string
metrics []string
want string
}{
{
name: "should return empty metrics",
mock: `
`,
want: `
# HELP nginx_ingress_controller_nginx_process_connections_total total number of connections with state {active, accepted, handled}
# TYPE nginx_ingress_controller_nginx_process_connections_total counter
nginx_ingress_controller_nginx_process_connections_total{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="accepted"} 0
nginx_ingress_controller_nginx_process_connections_total{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="active"} 0
nginx_ingress_controller_nginx_process_connections_total{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="handled"} 0
`,
metrics: []string{"nginx_ingress_controller_nginx_process_connections_total"},
},
{
name: "should return metrics for total connections",
mock: `
Active connections: 1
server accepts handled requests
1 2 3
Reading: 4 Writing: 5 Waiting: 6
`,
want: `
# HELP nginx_ingress_controller_nginx_process_connections_total total number of connections with state {active, accepted, handled}
# TYPE nginx_ingress_controller_nginx_process_connections_total counter
nginx_ingress_controller_nginx_process_connections_total{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="accepted"} 1
nginx_ingress_controller_nginx_process_connections_total{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="active"} 1
nginx_ingress_controller_nginx_process_connections_total{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="handled"} 2
`,
metrics: []string{"nginx_ingress_controller_nginx_process_connections_total"},
},
{
name: "should return nginx metrics all available metrics",
mock: `
Active connections: 1
server accepts handled requests
1 2 3
Reading: 4 Writing: 5 Waiting: 6
`,
want: `
# HELP nginx_ingress_controller_nginx_process_connections current number of client connections with state {reading, writing, waiting}
# TYPE nginx_ingress_controller_nginx_process_connections gauge
nginx_ingress_controller_nginx_process_connections{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="reading"} 4
nginx_ingress_controller_nginx_process_connections{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="waiting"} 6
nginx_ingress_controller_nginx_process_connections{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="writing"} 5
# HELP nginx_ingress_controller_nginx_process_connections_total total number of connections with state {active, accepted, handled}
# TYPE nginx_ingress_controller_nginx_process_connections_total counter
nginx_ingress_controller_nginx_process_connections_total{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="accepted"} 1
nginx_ingress_controller_nginx_process_connections_total{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="active"} 1
nginx_ingress_controller_nginx_process_connections_total{controller_class="nginx",controller_namespace="default",controller_pod="pod",state="handled"} 2
# HELP nginx_ingress_controller_nginx_process_requests_total total number of client requests
# TYPE nginx_ingress_controller_nginx_process_requests_total counter
nginx_ingress_controller_nginx_process_requests_total{controller_class="nginx",controller_namespace="default",controller_pod="pod"} 3
`,
metrics: []string{
"nginx_ingress_controller_nginx_process_connections_total",
"nginx_ingress_controller_nginx_process_requests_total",
"nginx_ingress_controller_nginx_process_connections",
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, c.mock)
}))
p := server.Listener.Addr().(*net.TCPAddr).Port
cm, err := NewNGINXStatus("pod", "default", "nginx", p)
if err != nil {
t.Errorf("unexpected error creating nginx status collector: %v", err)
}
go cm.Start()
defer func() {
server.Close()
cm.Stop()
}()
reg := prometheus.NewPedanticRegistry()
if err := reg.Register(cm); err != nil {
t.Errorf("registering collector failed: %s", err)
}
if err := GatherAndCompare(cm, c.want, c.metrics, reg); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
reg.Unregister(cm)
})
}
}

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package collector
package collectors
import (
"path/filepath"
@ -71,60 +71,84 @@ type namedProcess struct {
data namedProcessData
}
// newNamedProcess returns a new prometheus collector for the nginx process
func newNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) {
const subSystem = "nginx_process"
// NGINXProcessCollector defines a process collector interface
type NGINXProcessCollector interface {
prometheus.Collector
Start()
Stop()
}
var name = "nginx"
var binary = "/usr/bin/nginx"
// NewNGINXProcess returns a new prometheus collector for the nginx process
func NewNGINXProcess(pod, namespace, ingressClass string) (NGINXProcessCollector, error) {
fs, err := proc.NewFS("/proc")
if err != nil {
return nil, err
}
nm := BinaryNameMatcher{
Name: name,
Binary: binary,
}
p := namedProcess{
scrapeChan: make(chan scrapeRequest),
Grouper: proc.NewGrouper(children, mn),
Grouper: proc.NewGrouper(true, nm),
fs: fs,
}
_, err = p.Update(p.fs.AllProcs())
if err != nil {
return nil, err
}
p.data = namedProcessData{
numProcs: prometheus.NewDesc(
"num_procs",
"number of processes",
nil, nil),
cpuSecs: prometheus.NewDesc(
"cpu_seconds_total",
"Cpu usage in seconds",
nil, nil),
readBytes: prometheus.NewDesc(
"read_bytes_total",
"number of bytes read",
nil, nil),
writeBytes: prometheus.NewDesc(
"write_bytes_total",
"number of bytes written",
nil, nil),
memResidentbytes: prometheus.NewDesc(
"resident_memory_bytes",
"number of bytes of memory in use",
nil, nil),
memVirtualbytes: prometheus.NewDesc(
"virtual_memory_bytes",
"number of bytes of memory in use",
nil, nil),
startTime: prometheus.NewDesc(
"oldest_start_time_seconds",
"start time in seconds since 1970/01/01",
nil, nil),
constLabels := prometheus.Labels{
"controller_namespace": namespace,
"controller_class": ingressClass,
"controller_pod": pod,
}
go p.start()
p.data = namedProcessData{
numProcs: prometheus.NewDesc(
prometheus.BuildFQName(PrometheusNamespace, subSystem, "num_procs"),
"number of processes",
nil, constLabels),
cpuSecs: prometheus.NewDesc(
prometheus.BuildFQName(PrometheusNamespace, subSystem, "cpu_seconds_total"),
"Cpu usage in seconds",
nil, constLabels),
readBytes: prometheus.NewDesc(
prometheus.BuildFQName(PrometheusNamespace, subSystem, "read_bytes_total"),
"number of bytes read",
nil, constLabels),
writeBytes: prometheus.NewDesc(
prometheus.BuildFQName(PrometheusNamespace, subSystem, "write_bytes_total"),
"number of bytes written",
nil, constLabels),
memResidentbytes: prometheus.NewDesc(
prometheus.BuildFQName(PrometheusNamespace, subSystem, "resident_memory_bytes"),
"number of bytes of memory in use",
nil, constLabels),
memVirtualbytes: prometheus.NewDesc(
prometheus.BuildFQName(PrometheusNamespace, subSystem, "virtual_memory_bytes"),
"number of bytes of memory in use",
nil, constLabels),
startTime: prometheus.NewDesc(
prometheus.BuildFQName(PrometheusNamespace, subSystem, "oldest_start_time_seconds"),
"start time in seconds since 1970/01/01",
nil, constLabels),
}
return p, nil
}
@ -147,7 +171,7 @@ func (p namedProcess) Collect(ch chan<- prometheus.Metric) {
<-req.done
}
func (p namedProcess) start() {
func (p namedProcess) Start() {
for req := range p.scrapeChan {
ch := req.results
p.scrape(ch)

View file

@ -0,0 +1,93 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"os/exec"
"syscall"
"testing"
"github.com/prometheus/client_golang/prometheus"
)
func TestProcessCollector(t *testing.T) {
cases := []struct {
name string
metrics []string
}{
{
name: "should return metrics",
metrics: []string{"nginx_ingress_controller_nginx_process_num_procs"},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
name = "sleep"
binary = "/bin/sleep"
cmd := exec.Command(binary, "1000000")
err := cmd.Start()
if err != nil {
t.Errorf("unexpected error creating dummy process: %v", err)
}
done := make(chan struct{})
go func() {
cmd.Wait()
status := cmd.ProcessState.Sys().(syscall.WaitStatus)
if status.Signaled() {
t.Logf("Signal: %v", status.Signal())
} else {
t.Logf("Status: %v", status.ExitStatus())
}
}()
cm, err := NewNGINXProcess("pod", "default", "nginx")
if err != nil {
t.Errorf("unexpected error creating nginx status collector: %v", err)
}
go cm.Start()
defer func() {
cm.Stop()
cmd.Process.Kill()
close(done)
}()
reg := prometheus.NewPedanticRegistry()
if err := reg.Register(cm); err != nil {
t.Errorf("registering collector failed: %s", err)
}
metrics, err := reg.Gather()
if err != nil {
t.Errorf("gathering metrics failed: %s", err)
}
m := filterMetrics(metrics, c.metrics)
if *m[0].GetMetric()[0].Gauge.Value < 0 {
t.Errorf("number of process should be > 0")
}
reg.Unregister(cm)
})
}
}

View file

@ -0,0 +1,421 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"encoding/json"
"fmt"
"io"
"net"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/sets"
)
type upstream struct {
Endpoint string `json:"endpoint"`
Latency float64 `json:"upstreamLatency"`
ResponseLength float64 `json:"upstreamResponseLength"`
ResponseTime float64 `json:"upstreamResponseTime"`
Status string `json:"upstreamStatus"`
}
type socketData struct {
Host string `json:"host"`
Status string `json:"status"`
ResponseLength float64 `json:"responseLength"`
Method string `json:"method"`
RequestLength float64 `json:"requestLength"`
RequestTime float64 `json:"requestTime"`
upstream
Namespace string `json:"namespace"`
Ingress string `json:"ingress"`
Service string `json:"service"`
Path string `json:"path"`
}
// SocketCollector stores prometheus metrics and ingress meta-data
type SocketCollector struct {
prometheus.Collector
requestTime *prometheus.HistogramVec
requestLength *prometheus.HistogramVec
responseTime *prometheus.HistogramVec
responseLength *prometheus.HistogramVec
upstreamLatency *prometheus.SummaryVec
bytesSent *prometheus.HistogramVec
requests *prometheus.CounterVec
listener net.Listener
metricMapping map[string]interface{}
}
var (
requestTags = []string{
"host",
"status",
"method",
"path",
// "endpoint",
"namespace",
"ingress",
"service",
}
)
// NewSocketCollector creates a new SocketCollector instance using
// the ingresss watch namespace and class used by the controller
func NewSocketCollector(pod, namespace, class string) (*SocketCollector, error) {
listener, err := net.Listen("unix", "/tmp/prometheus-nginx.socket")
if err != nil {
return nil, err
}
constLabels := prometheus.Labels{
"controller_namespace": namespace,
"controller_class": class,
"controller_pod": pod,
}
sc := &SocketCollector{
listener: listener,
responseTime: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "response_duration_milliseconds",
Help: "The time spent on receiving the response from the upstream server",
Namespace: PrometheusNamespace,
ConstLabels: constLabels,
},
requestTags,
),
responseLength: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "response_size",
Help: "The response length (including request line, header, and request body)",
Namespace: PrometheusNamespace,
ConstLabels: constLabels,
},
requestTags,
),
requestTime: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_duration_milliseconds",
Help: "The request processing time in milliseconds",
Namespace: PrometheusNamespace,
ConstLabels: constLabels,
},
requestTags,
),
requestLength: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_size",
Help: "The request length (including request line, header, and request body)",
Namespace: PrometheusNamespace,
Buckets: prometheus.LinearBuckets(10, 10, 10), // 10 buckets, each 10 bytes wide.
ConstLabels: constLabels,
},
requestTags,
),
requests: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "requests",
Help: "The total number of client requests.",
Namespace: PrometheusNamespace,
ConstLabels: constLabels,
},
[]string{"ingress", "namespace", "status"},
),
bytesSent: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "bytes_sent",
Help: "The the number of bytes sent to a client",
Namespace: PrometheusNamespace,
Buckets: prometheus.ExponentialBuckets(10, 10, 7), // 7 buckets, exponential factor of 10.
ConstLabels: constLabels,
},
requestTags,
),
upstreamLatency: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "ingress_upstream_latency_milliseconds",
Help: "Upstream service latency per Ingress",
Namespace: PrometheusNamespace,
ConstLabels: constLabels,
},
[]string{"ingress", "namespace", "service"},
),
}
sc.metricMapping = map[string]interface{}{
prometheus.BuildFQName(PrometheusNamespace, "", "request_duration_milliseconds"): sc.requestTime,
prometheus.BuildFQName(PrometheusNamespace, "", "request_size"): sc.requestLength,
prometheus.BuildFQName(PrometheusNamespace, "", "response_duration_milliseconds"): sc.responseTime,
prometheus.BuildFQName(PrometheusNamespace, "", "response_size"): sc.responseLength,
prometheus.BuildFQName(PrometheusNamespace, "", "bytes_sent"): sc.bytesSent,
prometheus.BuildFQName(PrometheusNamespace, "", "ingress_upstream_latency_milliseconds"): sc.upstreamLatency,
}
return sc, nil
}
func (sc *SocketCollector) handleMessage(msg []byte) {
glog.V(5).Infof("msg: %v", string(msg))
// Unmarshall bytes
var stats socketData
err := json.Unmarshal(msg, &stats)
if err != nil {
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
return
}
requestLabels := prometheus.Labels{
"host": stats.Host,
"status": stats.Status,
"method": stats.Method,
"path": stats.Path,
//"endpoint": stats.Endpoint,
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}
collectorLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"status": stats.Status,
}
latencyLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
}
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
if err != nil {
glog.Errorf("Error fetching requests metric: %v", err)
} else {
requestsMetric.Inc()
}
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
if err != nil {
glog.Errorf("Error fetching latency metric: %v", err)
} else {
latencyMetric.Observe(stats.Latency)
}
if stats.RequestTime != -1 {
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request duration metric: %v", err)
} else {
requestTimeMetric.Observe(stats.RequestTime)
}
}
if stats.RequestLength != -1 {
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching request length metric: %v", err)
} else {
requestLengthMetric.Observe(stats.RequestLength)
}
}
if stats.ResponseTime != -1 {
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching upstream response time metric: %v", err)
} else {
responseTimeMetric.Observe(stats.ResponseTime)
}
}
if stats.ResponseLength != -1 {
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
bytesSentMetric.Observe(stats.ResponseLength)
}
responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
if err != nil {
glog.Errorf("Error fetching bytes sent metric: %v", err)
} else {
responseSizeMetric.Observe(stats.ResponseLength)
}
}
}
// Start listen for connections in the unix socket and spawns a goroutine to process the content
func (sc *SocketCollector) Start() {
for {
conn, err := sc.listener.Accept()
if err != nil {
continue
}
go handleMessages(conn, sc.handleMessage)
}
}
// Stop stops unix listener
func (sc *SocketCollector) Stop() {
sc.listener.Close()
}
// RemoveMetrics deletes prometheus metrics from prometheus for ingresses and
// host that are not available anymore.
// Ref: https://godoc.org/github.com/prometheus/client_golang/prometheus#CounterVec.Delete
func (sc *SocketCollector) RemoveMetrics(ingresses []string, registry prometheus.Gatherer) {
mfs, err := registry.Gather()
if err != nil {
glog.Errorf("Error gathering metrics: %v", err)
return
}
// 1. remove metrics of removed ingresses
glog.V(2).Infof("removing ingresses %v from metrics", ingresses)
for _, mf := range mfs {
metricName := mf.GetName()
metric, ok := sc.metricMapping[metricName]
if !ok {
continue
}
toRemove := sets.NewString(ingresses...)
for _, m := range mf.GetMetric() {
labels := make(map[string]string, len(m.GetLabel()))
for _, labelPair := range m.GetLabel() {
labels[*labelPair.Name] = *labelPair.Value
}
// remove labels that are constant
deleteConstants(labels)
ns, ok := labels["namespace"]
if !ok {
continue
}
ing, ok := labels["ingress"]
if !ok {
continue
}
ingKey := fmt.Sprintf("%v/%v", ns, ing)
if !toRemove.Has(ingKey) {
continue
}
glog.V(2).Infof("Removing prometheus metric from histogram %v for ingress %v", metricName, ingKey)
h, ok := metric.(*prometheus.HistogramVec)
if ok {
removed := h.Delete(labels)
if !removed {
glog.V(2).Infof("metric %v for ingress %v with labels not removed: %v", metricName, ingKey, labels)
}
}
s, ok := metric.(*prometheus.SummaryVec)
if ok {
removed := s.Delete(labels)
if !removed {
glog.V(2).Infof("metric %v for ingress %v with labels not removed: %v", metricName, ingKey, labels)
}
}
}
}
}
// Describe implements prometheus.Collector
func (sc SocketCollector) Describe(ch chan<- *prometheus.Desc) {
sc.requestTime.Describe(ch)
sc.requestLength.Describe(ch)
sc.requests.Describe(ch)
sc.upstreamLatency.Describe(ch)
sc.responseTime.Describe(ch)
sc.responseLength.Describe(ch)
sc.bytesSent.Describe(ch)
}
// Collect implements the prometheus.Collector interface.
func (sc SocketCollector) Collect(ch chan<- prometheus.Metric) {
sc.requestTime.Collect(ch)
sc.requestLength.Collect(ch)
sc.requests.Collect(ch)
sc.upstreamLatency.Collect(ch)
sc.responseTime.Collect(ch)
sc.responseLength.Collect(ch)
sc.bytesSent.Collect(ch)
}
const packetSize = 1024 * 65
// handleMessages process the content received in a network connection
func handleMessages(conn io.ReadCloser, fn func([]byte)) {
defer conn.Close()
msg := make([]byte, packetSize)
s, err := conn.Read(msg[0:])
if err != nil {
return
}
fn(msg[0:s])
}
func deleteConstants(labels prometheus.Labels) {
delete(labels, "controller_namespace")
delete(labels, "controller_class")
delete(labels, "controller_pod")
}

View file

@ -0,0 +1,262 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"fmt"
"net"
"sync/atomic"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
)
func TestNewUDPLogListener(t *testing.T) {
var count uint64
fn := func(message []byte) {
atomic.AddUint64(&count, 1)
}
tmpFile := fmt.Sprintf("/tmp/test-socket-%v", time.Now().Nanosecond())
l, err := net.Listen("unix", tmpFile)
if err != nil {
t.Fatalf("unexpected error creating unix socket: %v", err)
}
if l == nil {
t.Fatalf("expected a listener but none returned")
}
defer l.Close()
go func() {
for {
conn, err := l.Accept()
if err != nil {
continue
}
go handleMessages(conn, fn)
}
}()
conn, _ := net.Dial("unix", tmpFile)
conn.Write([]byte("message"))
conn.Close()
time.Sleep(1 * time.Millisecond)
if atomic.LoadUint64(&count) != 1 {
t.Errorf("expected only one message from the socket listener but %v returned", atomic.LoadUint64(&count))
}
}
func TestCollector(t *testing.T) {
cases := []struct {
name string
data []string
metrics []string
wantBefore string
removeIngresses []string
wantAfter string
}{
{
name: "invalid metric object should not increase prometheus metrics",
data: []string{`#missing {
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
"method":"GET",
"path":"/admin",
"requestLength":300.0,
"requestTime":60.0,
"upstreamName":"test-upstream",
"upstreamIP":"1.1.1.1:8080",
"upstreamResponseTime":200,
"upstreamStatus":"220",
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
}`},
metrics: []string{"nginx_ingress_controller_response_duration_milliseconds"},
wantBefore: `
`,
},
{
name: "valid metric object should update prometheus metrics",
data: []string{`{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
"method":"GET",
"path":"/admin",
"requestLength":300.0,
"requestTime":60.0,
"upstreamName":"test-upstream",
"upstreamIP":"1.1.1.1:8080",
"upstreamResponseTime":200,
"upstreamStatus":"220",
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
}`},
metrics: []string{"nginx_ingress_controller_response_duration_milliseconds"},
wantBefore: `
# HELP nginx_ingress_controller_response_duration_milliseconds The time spent on receiving the response from the upstream server
# TYPE nginx_ingress_controller_response_duration_milliseconds histogram
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.005"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.01"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.025"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.05"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.1"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.25"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.5"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="1"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="2.5"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="5"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="10"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="+Inf"} 1
nginx_ingress_controller_response_duration_milliseconds_sum{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 200
nginx_ingress_controller_response_duration_milliseconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 1
`,
removeIngresses: []string{"test-app-production/web-yml"},
wantAfter: `
`,
},
{
name: "multiple messages should increase prometheus metric by two",
data: []string{`{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
"method":"GET",
"path":"/admin",
"requestLength":300.0,
"requestTime":60.0,
"upstreamName":"test-upstream",
"upstreamIP":"1.1.1.1:8080",
"upstreamResponseTime":200,
"upstreamStatus":"220",
"namespace":"test-app-production",
"ingress":"web-yml",
"service":"test-app"
}`, `{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
"method":"GET",
"path":"/admin",
"requestLength":300.0,
"requestTime":60.0,
"upstreamName":"test-upstream",
"upstreamIP":"1.1.1.1:8080",
"upstreamResponseTime":200,
"upstreamStatus":"220",
"namespace":"test-app-qa",
"ingress":"web-yml-qa",
"service":"test-app-qa"
}`, `{
"host":"testshop.com",
"status":"200",
"bytesSent":150.0,
"method":"GET",
"path":"/admin",
"requestLength":300.0,
"requestTime":60.0,
"upstreamName":"test-upstream",
"upstreamIP":"1.1.1.1:8080",
"upstreamResponseTime":200,
"upstreamStatus":"220",
"namespace":"test-app-qa",
"ingress":"web-yml-qa",
"service":"test-app-qa"
}`},
metrics: []string{"nginx_ingress_controller_response_duration_milliseconds"},
wantBefore: `
# HELP nginx_ingress_controller_response_duration_milliseconds The time spent on receiving the response from the upstream server
# TYPE nginx_ingress_controller_response_duration_milliseconds histogram
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.005"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.01"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.025"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.05"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.1"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.25"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="0.5"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="1"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="2.5"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="5"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="10"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200",le="+Inf"} 1
nginx_ingress_controller_response_duration_milliseconds_sum{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 200
nginx_ingress_controller_response_duration_milliseconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml",method="GET",namespace="test-app-production",path="/admin",service="test-app",status="200"} 1
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="0.005"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="0.01"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="0.025"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="0.05"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="0.1"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="0.25"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="0.5"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="1"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="2.5"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="5"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="10"} 0
nginx_ingress_controller_response_duration_milliseconds_bucket{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200",le="+Inf"} 2
nginx_ingress_controller_response_duration_milliseconds_sum{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200"} 400
nginx_ingress_controller_response_duration_milliseconds_count{controller_class="ingress",controller_namespace="default",controller_pod="pod",host="testshop.com",ingress="web-yml-qa",method="GET",namespace="test-app-qa",path="/admin",service="test-app-qa",status="200"} 2
`,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
registry := prometheus.NewPedanticRegistry()
sc, err := NewSocketCollector("pod", "default", "ingress")
if err != nil {
t.Errorf("%v: unexpected error creating new SocketCollector: %v", c.name, err)
}
if err := registry.Register(sc); err != nil {
t.Errorf("registering collector failed: %s", err)
}
for _, d := range c.data {
sc.handleMessage([]byte(d))
}
if err := GatherAndCompare(sc, c.wantBefore, c.metrics, registry); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
if len(c.removeIngresses) > 0 {
sc.RemoveMetrics(c.removeIngresses, registry)
time.Sleep(1 * time.Second)
if err := GatherAndCompare(sc, c.wantAfter, c.metrics, registry); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}
}
sc.Stop()
registry.Unregister(sc)
})
}
}

View file

@ -0,0 +1,183 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package collectors
import (
"bytes"
"fmt"
"reflect"
"sort"
"strings"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
// GatherAndCompare retrieves all metrics exposed by a collector and compares it
// to an expected output in the Prometheus text exposition format.
// metricNames allows only comparing the given metrics. All are compared if it's nil.
func GatherAndCompare(c prometheus.Collector, expected string, metricNames []string, reg prometheus.Gatherer) error {
expected = removeUnusedWhitespace(expected)
metrics, err := reg.Gather()
if err != nil {
return fmt.Errorf("gathering metrics failed: %s", err)
}
if metricNames != nil {
metrics = filterMetrics(metrics, metricNames)
}
var tp expfmt.TextParser
expectedMetrics, err := tp.TextToMetricFamilies(bytes.NewReader([]byte(expected)))
if err != nil {
return fmt.Errorf("parsing expected metrics failed: %s", err)
}
if !reflect.DeepEqual(metrics, normalizeMetricFamilies(expectedMetrics)) {
// Encode the gathered output to the readbale text format for comparison.
var buf1 bytes.Buffer
enc := expfmt.NewEncoder(&buf1, expfmt.FmtText)
for _, mf := range metrics {
if err := enc.Encode(mf); err != nil {
return fmt.Errorf("encoding result failed: %s", err)
}
}
// Encode normalized expected metrics again to generate them in the same ordering
// the registry does to spot differences more easily.
var buf2 bytes.Buffer
enc = expfmt.NewEncoder(&buf2, expfmt.FmtText)
for _, mf := range normalizeMetricFamilies(expectedMetrics) {
if err := enc.Encode(mf); err != nil {
return fmt.Errorf("encoding result failed: %s", err)
}
}
if buf2.String() == buf1.String() {
return nil
}
return fmt.Errorf(`
metric output does not match expectation; want:
'%s'
got:
'%s'
`, buf2.String(), buf1.String())
}
return nil
}
func filterMetrics(metrics []*dto.MetricFamily, names []string) []*dto.MetricFamily {
var filtered []*dto.MetricFamily
for _, m := range metrics {
drop := true
for _, name := range names {
if m.GetName() == name {
drop = false
break
}
}
if !drop {
filtered = append(filtered, m)
}
}
return filtered
}
func removeUnusedWhitespace(s string) string {
var (
trimmedLine string
trimmedLines []string
lines = strings.Split(s, "\n")
)
for _, l := range lines {
trimmedLine = strings.TrimSpace(l)
if len(trimmedLine) > 0 {
trimmedLines = append(trimmedLines, trimmedLine)
}
}
// The Prometheus metrics representation parser expects an empty line at the
// end otherwise fails with an unexpected EOF error.
return strings.Join(trimmedLines, "\n") + "\n"
}
// The below sorting code is copied form the Prometheus client library modulo the added
// label pair sorting.
// https://github.com/prometheus/client_golang/blob/ea6e1db4cb8127eeb0b6954f7320363e5451820f/prometheus/registry.go#L642-L684
// metricSorter is a sortable slice of *dto.Metric.
type metricSorter []*dto.Metric
func (s metricSorter) Len() int {
return len(s)
}
func (s metricSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s metricSorter) Less(i, j int) bool {
sort.Sort(prometheus.LabelPairSorter(s[i].Label))
sort.Sort(prometheus.LabelPairSorter(s[j].Label))
if len(s[i].Label) != len(s[j].Label) {
return len(s[i].Label) < len(s[j].Label)
}
for n, lp := range s[i].Label {
vi := lp.GetValue()
vj := s[j].Label[n].GetValue()
if vi != vj {
return vi < vj
}
}
if s[i].TimestampMs == nil {
return false
}
if s[j].TimestampMs == nil {
return true
}
return s[i].GetTimestampMs() < s[j].GetTimestampMs()
}
// normalizeMetricFamilies returns a MetricFamily slice with empty
// MetricFamilies pruned and the remaining MetricFamilies sorted by name within
// the slice, with the contained Metrics sorted within each MetricFamily.
func normalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily {
for _, mf := range metricFamiliesByName {
sort.Sort(metricSorter(mf.Metric))
}
names := make([]string, 0, len(metricFamiliesByName))
for name, mf := range metricFamiliesByName {
if len(mf.Metric) > 0 {
names = append(names, name)
}
}
sort.Strings(names)
result := make([]*dto.MetricFamily, 0, len(names))
for _, name := range names {
result = append(result, metricFamiliesByName[name])
}
return result
}

View file

@ -0,0 +1,43 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metric
import "k8s.io/ingress-nginx/internal/ingress"
// DummyCollector dummy implementation for mocks in tests
type DummyCollector struct{}
// ConfigSuccess ...
func (dc DummyCollector) ConfigSuccess(uint64, bool) {}
// IncReloadCount ...
func (dc DummyCollector) IncReloadCount() {}
// IncReloadErrorCount ...
func (dc DummyCollector) IncReloadErrorCount() {}
// RemoveMetrics ...
func (dc DummyCollector) RemoveMetrics(ingresses, endpoints []string) {}
// Start ...
func (dc DummyCollector) Start() {}
// Stop ...
func (dc DummyCollector) Stop() {}
// SetSSLExpireTime ...
func (dc DummyCollector) SetSSLExpireTime([]*ingress.Server) {}

View file

@ -0,0 +1,134 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metric
import (
"os"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/class"
"k8s.io/ingress-nginx/internal/ingress/metric/collectors"
)
// Collector defines the interface for a metric collector
type Collector interface {
ConfigSuccess(uint64, bool)
IncReloadCount()
IncReloadErrorCount()
RemoveMetrics(ingresses, endpoints []string)
SetSSLExpireTime([]*ingress.Server)
Start()
Stop()
}
type collector struct {
nginxStatus collectors.NGINXStatusCollector
nginxProcess collectors.NGINXProcessCollector
ingressController *collectors.Controller
socket *collectors.SocketCollector
registry *prometheus.Registry
}
// NewCollector creates a new metric collector the for ingress controller
func NewCollector(statusPort int, registry *prometheus.Registry) (Collector, error) {
podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
podNamespace = "default"
}
podName := os.Getenv("POD_NAME")
nc, err := collectors.NewNGINXStatus(podName, podNamespace, class.IngressClass, statusPort)
if err != nil {
return nil, err
}
pc, err := collectors.NewNGINXProcess(podName, podNamespace, class.IngressClass)
if err != nil {
return nil, err
}
s, err := collectors.NewSocketCollector(podName, podNamespace, class.IngressClass)
if err != nil {
return nil, err
}
ic := collectors.NewController(podName, podNamespace, class.IngressClass)
return Collector(&collector{
nginxStatus: nc,
nginxProcess: pc,
ingressController: ic,
socket: s,
registry: registry,
}), nil
}
func (c *collector) ConfigSuccess(hash uint64, success bool) {
c.ingressController.ConfigSuccess(hash, success)
}
func (c *collector) IncReloadCount() {
c.ingressController.IncReloadCount()
}
func (c *collector) IncReloadErrorCount() {
c.ingressController.IncReloadErrorCount()
}
func (c *collector) RemoveMetrics(ingresses, hosts []string) {
c.socket.RemoveMetrics(ingresses, c.registry)
c.ingressController.RemoveMetrics(hosts, c.registry)
}
func (c *collector) Start() {
c.registry.MustRegister(c.nginxStatus)
c.registry.MustRegister(c.nginxProcess)
c.registry.MustRegister(c.ingressController)
c.registry.MustRegister(c.socket)
go c.nginxStatus.Start()
go c.nginxProcess.Start()
go c.socket.Start()
}
func (c *collector) Stop() {
c.registry.Unregister(c.nginxStatus)
c.registry.Unregister(c.nginxProcess)
c.registry.Unregister(c.ingressController)
c.registry.Unregister(c.socket)
c.nginxStatus.Stop()
c.nginxProcess.Stop()
c.socket.Stop()
}
func (c *collector) SetSSLExpireTime(servers []*ingress.Server) {
c.ingressController.SetSSLExpireTime(servers)
}

View file

@ -48,3 +48,8 @@ type SSLCert struct {
func (s SSLCert) GetObjectKind() schema.ObjectKind {
return schema.EmptyObjectKind
}
// HashInclude defines if a field should be used or not to calculate the hash
func (s SSLCert) HashInclude(field string, v interface{}) (bool, error) {
return (field != "PemSHA" && field != "ExpireTime"), nil
}

View file

@ -64,6 +64,9 @@ type Configuration struct {
// +optional
PassthroughBackends []*SSLPassthroughBackend `json:"passthroughBackends,omitempty"`
// BackendConfigChecksum contains the particular checksum of a Configuration object
BackendConfigChecksum string `json:"BackendConfigChecksum,omitempty"`
// ConfigurationChecksum contains the particular checksum of a Configuration object
ConfigurationChecksum string `json:"configurationChecksum,omitempty"`
}

View file

@ -104,7 +104,7 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool {
}
}
if c1.ConfigurationChecksum != c2.ConfigurationChecksum {
if c1.BackendConfigChecksum != c2.BackendConfigChecksum {
return false
}

View file

@ -149,6 +149,10 @@ func AddOrUpdateCertAndKey(name string, cert, key, ca []byte,
}
caFile, err := fs.Create(pemFileName)
if err != nil {
return nil, fmt.Errorf("could not create CA cert file %v: %v", pemFileName, err)
}
_, err = caFile.Write(caData)
if err != nil {
return nil, fmt.Errorf("could not append CA to cert file %v: %v", pemFileName, err)

View file

@ -15,17 +15,24 @@ end
function _M.encode_nginx_stats()
return cjson.encode({
host = ngx.var.host or "-",
status = ngx.var.status or "-",
bytesSent = tonumber(ngx.var.bytes_sent) or -1,
protocol = ngx.var.server_protocol or "-",
method = ngx.var.request_method or "-",
path = ngx.var.location_path or "-",
status = ngx.var.status or "-",
requestLength = tonumber(ngx.var.request_length) or -1,
requestTime = tonumber(ngx.var.request_time) or -1,
upstreamName = ngx.var.proxy_upstream_name or "-",
upstreamIP = ngx.var.upstream_addr or "-",
responseLength = tonumber(ngx.var.bytes_sent) or -1,
endpoint = ngx.var.upstream_addr or "-",
upstreamLatency = tonumber(ngx.var.upstream_connect_time) or -1,
upstreamResponseTime = tonumber(ngx.var.upstream_response_time) or -1,
upstreamResponseLength = tonumber(ngx.var.upstream_response_length) or -1,
upstreamStatus = ngx.var.upstream_status or "-",
namespace = ngx.var.namespace or "-",
ingress = ngx.var.ingress_name or "-",
service = ngx.var.service_name or "-",

View file

@ -33,10 +33,12 @@ describe("Monitor", function()
request_method = "GET",
location_path = "/admin",
request_length = "300",
request_time = "60",
request_time = "210",
proxy_upstream_name = "test-upstream",
upstream_addr = "2.2.2.2",
upstream_response_time = "200",
upstream_response_length = "150",
upstream_connect_time = "1",
upstream_status = "220",
namespace = "test-app-production",
ingress_name = "web-yml",
@ -51,16 +53,16 @@ describe("Monitor", function()
local expected_json_stats = {
host = "testshop.com",
status = "200",
bytesSent = 150.0,
protocol = "HTTP",
responseLength = 150.0,
method = "GET",
path = "/admin",
requestLength = 300.0,
requestTime = 60.0,
upstreamName = "test-upstream",
upstreamIP = "2.2.2.2",
requestTime = 210.0,
endpoint = "2.2.2.2",
upstreamResponseTime = 200,
upstreamStatus = "220",
upstreamLatency = 1.0,
upstreamResponseLength = 150.0,
namespace = "test-app-production",
ingress = "web-yml",
service = "test-app",
@ -77,10 +79,10 @@ describe("Monitor", function()
server_protocol = "HTTP",
request_method = "GET",
location_path = "/admin",
request_time = "60",
request_time = "202",
proxy_upstream_name = "test-upstream",
upstream_addr = "2.2.2.2",
upstream_response_time = "200",
upstream_response_time = "201",
upstream_status = "220",
ingress_name = "web-yml",
}
@ -93,18 +95,19 @@ describe("Monitor", function()
local expected_json_stats = {
host = "-",
status = "-",
bytesSent = -1,
protocol = "HTTP",
responseLength = -1,
method = "GET",
path = "/admin",
requestLength = -1,
requestTime = 60.0,
upstreamName = "test-upstream",
upstreamIP = "2.2.2.2",
upstreamResponseTime = 200,
requestTime = 202.0,
endpoint = "2.2.2.2",
upstreamStatus = "220",
namespace = "-",
ingress = "web-yml",
upstreamLatency = -1,
upstreamResponseTime = 201,
upstreamResponseLength = -1,
responseLength = -1,
service = "-",
}
assert.are.same(decoded_json_stats,expected_json_stats)

View file

@ -15,7 +15,7 @@ package framework
import (
"fmt"
"os/exec"
"os"
"strings"
"time"
@ -112,13 +112,11 @@ func (f *Framework) BeforeEach() {
})
Expect(err).NotTo(HaveOccurred())
By("Building NGINX HTTP URL")
HTTPURL, err := f.GetNginxURL(HTTP)
Expect(err).NotTo(HaveOccurred())
f.IngressController.HTTPURL = HTTPURL
By("Building NGINX HTTPS URL")
HTTPSURL, err := f.GetNginxURL(HTTPS)
Expect(err).NotTo(HaveOccurred())
@ -145,11 +143,7 @@ func IngressNginxDescribe(text string, body func()) bool {
// GetNginxIP returns the IP address of the minikube cluster
// where the NGINX ingress controller is running
func (f *Framework) GetNginxIP() (string, error) {
out, err := exec.Command("minikube", "ip").Output()
if err != nil {
return "", err
}
return strings.TrimSpace(string(out)), nil
return os.Getenv("NODE_IP"), nil
}
// GetNginxPort returns the number of TCP port where NGINX is running