From 93be8db6127cae4037a97a7021203b5cf23a0c68 Mon Sep 17 00:00:00 2001 From: Lorenzo Fontana Date: Thu, 17 May 2018 14:25:38 +0200 Subject: [PATCH 1/3] Annotations for the InfluxDB Module Signed-off-by: Lorenzo Fontana --- Makefile | 2 +- .../nginx-configuration/annotations.md | 27 +++++ internal/ingress/annotations/annotations.go | 3 + internal/ingress/annotations/influxdb/main.go | 104 ++++++++++++++++++ .../ingress/annotations/influxdb/main_test.go | 101 +++++++++++++++++ internal/ingress/controller/config/config.go | 5 + internal/ingress/controller/controller.go | 3 + internal/ingress/types.go | 4 + internal/ingress/types_equals.go | 4 + rootfs/etc/nginx/template/nginx.tmpl | 4 + 10 files changed, 256 insertions(+), 1 deletion(-) create mode 100644 internal/ingress/annotations/influxdb/main.go create mode 100644 internal/ingress/annotations/influxdb/main_test.go diff --git a/Makefile b/Makefile index f91a0c599..42c785ab7 100644 --- a/Makefile +++ b/Makefile @@ -53,7 +53,7 @@ IMAGE = $(REGISTRY)/$(IMGNAME) MULTI_ARCH_IMG = $(IMAGE)-$(ARCH) # Set default base image dynamically for each arch -BASEIMAGE?=quay.io/kubernetes-ingress-controller/nginx-$(ARCH):0.45 +BASEIMAGE?=quay.io/kubernetes-ingress-controller/nginx-$(ARCH):0.46 ifeq ($(ARCH),arm) QEMUARCH=arm diff --git a/docs/user-guide/nginx-configuration/annotations.md b/docs/user-guide/nginx-configuration/annotations.md index bf288f317..9b9d0f8d5 100644 --- a/docs/user-guide/nginx-configuration/annotations.md +++ b/docs/user-guide/nginx-configuration/annotations.md @@ -77,6 +77,11 @@ You can add these Kubernetes annotations to specific Ingress objects to customiz |[nginx.ingress.kubernetes.io/lua-resty-waf-debug](#lua-resty-waf)|"true" or "false"| |[nginx.ingress.kubernetes.io/lua-resty-waf-ignore-rulesets](#lua-resty-waf)|string| |[nginx.ingress.kubernetes.io/lua-resty-waf-extra-rules](#lua-resty-waf)|string| +|[nginx.ingress.kubernetes.io/enable-influxdb](#influxdb)|"true" or "false"| +|[nginx.ingress.kubernetes.io/influxdb-measurement](#influxdb)|string| +|[nginx.ingress.kubernetes.io/influxdb-port](#influxdb)|string| +|[nginx.ingress.kubernetes.io/influxdb-host](#influxdb)|string| +|[nginx.ingress.kubernetes.io/influxdb-server-name](#influxdb)|string| ### Rewrite @@ -553,3 +558,25 @@ Additionally, if the gRPC service requires TLS, add `nginx.ingress.kubernetes.io Exposing a gRPC service using HTTP is not supported. [configmap]: ./configmap.md + +### InfluxDB + +Using `influxdb-*` annotations we can monitor requests passing through a Location by sending them to an InfluxDB backend exposing the UDP socket +using the [nginx-influxdb-module](https://github.com/influxdata/nginx-influxdb-module/). + +```yaml +nginx.ingress.kubernetes.io/enable-influxdb: "true" +nginx.ingress.kubernetes.io/influxdb-measurement: "nginx-reqs" +nginx.ingress.kubernetes.io/influxdb-port: "8089" +nginx.ingress.kubernetes.io/influxdb-host: "influxdb" +nginx.ingress.kubernetes.io/influxdb-server-name: "nginx-ingress" +``` + +For the `influxdb-host` parameter you have two options: + +To use the module in the Kubernetes Nginx ingress controller, you have two options: + +- Use an InfluxDB server configured to enable the [UDP protocol](https://docs.influxdata.com/influxdb/v1.5/supported_protocols/udp/). +- Deploy Telegraf as a sidecar proxy to the Ingress controller configured to listen UDP with the [socket listener input](https://github.com/influxdata/telegraf/tree/release-1.6/plugins/inputs/socket_listener) and to write using +anyone of the [outputs plugins](https://github.com/influxdata/telegraf/tree/release-1.6/plugins/outputs) + diff --git a/internal/ingress/annotations/annotations.go b/internal/ingress/annotations/annotations.go index 495ca7def..30a5dbfb8 100644 --- a/internal/ingress/annotations/annotations.go +++ b/internal/ingress/annotations/annotations.go @@ -34,6 +34,7 @@ import ( "k8s.io/ingress-nginx/internal/ingress/annotations/defaultbackend" "k8s.io/ingress-nginx/internal/ingress/annotations/grpc" "k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck" + "k8s.io/ingress-nginx/internal/ingress/annotations/influxdb" "k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist" "k8s.io/ingress-nginx/internal/ingress/annotations/loadbalancing" "k8s.io/ingress-nginx/internal/ingress/annotations/log" @@ -95,6 +96,7 @@ type Ingress struct { Logs log.Config GRPC bool LuaRestyWAF luarestywaf.Config + InfluxDB influxdb.Config } // Extractor defines the annotation parsers to be used in the extraction of annotations @@ -136,6 +138,7 @@ func NewAnnotationExtractor(cfg resolver.Resolver) Extractor { "Logs": log.NewParser(cfg), "GRPC": grpc.NewParser(cfg), "LuaRestyWAF": luarestywaf.NewParser(cfg), + "InfluxDB": influxdb.NewParser(cfg), }, } } diff --git a/internal/ingress/annotations/influxdb/main.go b/internal/ingress/annotations/influxdb/main.go new file mode 100644 index 000000000..d8125ec44 --- /dev/null +++ b/internal/ingress/annotations/influxdb/main.go @@ -0,0 +1,104 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package influxdb + +import ( + extensions "k8s.io/api/extensions/v1beta1" + + "k8s.io/ingress-nginx/internal/ingress/annotations/parser" + "k8s.io/ingress-nginx/internal/ingress/resolver" +) + +type influxdb struct { + r resolver.Resolver +} + +// Config contains the IfluxDB configuration to be used in the Ingress +type Config struct { + InfluxDBEnabled bool `json:"influxDBEnabled"` + InfluxDBMeasurement string `json:"influxDBMeasurement"` + InfluxDBPort string `json:"influxDBPort"` + InfluxDBHost string `json:"influxDBHost"` + InfluxDBServerName string `json:"influxDBServerName"` +} + +// NewParser creates a new InfluxDB annotation parser +func NewParser(r resolver.Resolver) parser.IngressAnnotation { + return influxdb{r} +} + +// Parse parses the annotations to look for InfluxDB configurations +func (c influxdb) Parse(ing *extensions.Ingress) (interface{}, error) { + influxdbEnabled, err := parser.GetBoolAnnotation("enable-influxdb", ing) + if err != nil { + influxdbEnabled = false + } + + influxdbMeasurement, err := parser.GetStringAnnotation("influxdb-measurement", ing) + if err != nil { + influxdbMeasurement = "default" + } + + influxdbPort, err := parser.GetStringAnnotation("influxdb-port", ing) + if err != nil { + // This is not the default 8086 port but the port usually used to expose + // influxdb in UDP, the module uses UDP to talk to influx via the line protocol. + influxdbPort = "8089" + } + + influxdbHost, err := parser.GetStringAnnotation("influxdb-host", ing) + if err != nil { + influxdbHost = "127.0.0.1" + } + + influxdbServerName, err := parser.GetStringAnnotation("influxdb-server-name", ing) + if err != nil { + influxdbServerName = "nginx-ingress" + } + + return &Config{ + InfluxDBEnabled: influxdbEnabled, + InfluxDBMeasurement: influxdbMeasurement, + InfluxDBPort: influxdbPort, + InfluxDBHost: influxdbHost, + InfluxDBServerName: influxdbServerName, + }, nil +} + +// Equal tests for equality between two Config types +func (e1 *Config) Equal(e2 *Config) bool { + if e1 == e2 { + return true + } + if e1 == nil || e2 == nil { + return false + } + if e1.InfluxDBEnabled != e2.InfluxDBEnabled { + return false + } + if e1.InfluxDBPort != e2.InfluxDBPort { + return false + } + if e1.InfluxDBHost != e2.InfluxDBHost { + return false + } + if e1.InfluxDBServerName != e2.InfluxDBServerName { + return false + } + + return true +} diff --git a/internal/ingress/annotations/influxdb/main_test.go b/internal/ingress/annotations/influxdb/main_test.go new file mode 100644 index 000000000..f9ddef59b --- /dev/null +++ b/internal/ingress/annotations/influxdb/main_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package influxdb + +import ( + "testing" + + api "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/ingress-nginx/internal/ingress/annotations/parser" + "k8s.io/ingress-nginx/internal/ingress/resolver" +) + +func buildIngress() *extensions.Ingress { + defaultBackend := extensions.IngressBackend{ + ServiceName: "default-backend", + ServicePort: intstr.FromInt(80), + } + + return &extensions.Ingress{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }, + Spec: extensions.IngressSpec{ + Backend: &extensions.IngressBackend{ + ServiceName: "default-backend", + ServicePort: intstr.FromInt(80), + }, + Rules: []extensions.IngressRule{ + { + Host: "foo.bar.com", + IngressRuleValue: extensions.IngressRuleValue{ + HTTP: &extensions.HTTPIngressRuleValue{ + Paths: []extensions.HTTPIngressPath{ + { + Path: "/foo", + Backend: defaultBackend, + }, + }, + }, + }, + }, + }, + }, + } +} + +func TestIngressInfluxDB(t *testing.T) { + ing := buildIngress() + + data := map[string]string{} + data[parser.GetAnnotationWithPrefix("enable-influxdb")] = "true" + data[parser.GetAnnotationWithPrefix("influxdb-measurement")] = "nginxmeasures" + data[parser.GetAnnotationWithPrefix("influxdb-port")] = "9091" + data[parser.GetAnnotationWithPrefix("influxdb-host")] = "mytelegrafserver.mycompany.mytld" + data[parser.GetAnnotationWithPrefix("influxdb-server-name")] = "nginx-test-1" + ing.SetAnnotations(data) + + influx, _ := NewParser(&resolver.Mock{}).Parse(ing) + nginxInflux, ok := influx.(*Config) + if !ok { + t.Errorf("expected a Config type") + } + + if !nginxInflux.InfluxDBEnabled { + t.Errorf("expected influxdb enabled but returned %v", nginxInflux.InfluxDBEnabled) + } + + if nginxInflux.InfluxDBMeasurement != "nginxmeasures" { + t.Errorf("expected measurement name not found. Found %v", nginxInflux.InfluxDBMeasurement) + } + + if nginxInflux.InfluxDBPort != "9091" { + t.Errorf("expected port not found. Found %v", nginxInflux.InfluxDBPort) + } + + if nginxInflux.InfluxDBHost != "mytelegrafserver.mycompany.mytld" { + t.Errorf("expected host not found. Found %v", nginxInflux.InfluxDBHost) + } + + if nginxInflux.InfluxDBServerName != "nginx-test-1" { + t.Errorf("expected server name not found. Found %v", nginxInflux.InfluxDBServerName) + } +} diff --git a/internal/ingress/controller/config/config.go b/internal/ingress/controller/config/config.go index 1e1640764..7cfdeab14 100644 --- a/internal/ingress/controller/config/config.go +++ b/internal/ingress/controller/config/config.go @@ -516,6 +516,11 @@ type Configuration struct { // DisableLuaRestyWAF disables lua-resty-waf globally regardless // of whether there's an ingress that has enabled the WAF using annotation DisableLuaRestyWAF bool `json:"disable-lua-resty-waf"` + + // EnableInfluxDB enables the nginx InfluxDB extension + // http://github.com/influxdata/nginx-influxdb-module/ + // By default this is disabled + EnableInfluxDB bool `json:"enable-influxdb"` } // NewDefault returns the default nginx configuration diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index b47fb384b..bcf5cacf6 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -450,6 +450,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([] loc.Logs = anns.Logs loc.GRPC = anns.GRPC loc.LuaRestyWAF = anns.LuaRestyWAF + loc.InfluxDB = anns.InfluxDB if loc.Redirect.FromToWWW { server.RedirectFromToWWW = true @@ -486,6 +487,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([] Logs: anns.Logs, GRPC: anns.GRPC, LuaRestyWAF: anns.LuaRestyWAF, + InfluxDB: anns.InfluxDB, } if loc.Redirect.FromToWWW { @@ -922,6 +924,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress, defLoc.Denied = anns.Denied defLoc.GRPC = anns.GRPC defLoc.LuaRestyWAF = anns.LuaRestyWAF + defLoc.InfluxDB = anns.InfluxDB } } } diff --git a/internal/ingress/types.go b/internal/ingress/types.go index 6506bbacf..647bebeb2 100644 --- a/internal/ingress/types.go +++ b/internal/ingress/types.go @@ -28,6 +28,7 @@ import ( "k8s.io/ingress-nginx/internal/ingress/annotations/authtls" "k8s.io/ingress-nginx/internal/ingress/annotations/connection" "k8s.io/ingress-nginx/internal/ingress/annotations/cors" + "k8s.io/ingress-nginx/internal/ingress/annotations/influxdb" "k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist" "k8s.io/ingress-nginx/internal/ingress/annotations/log" "k8s.io/ingress-nginx/internal/ingress/annotations/luarestywaf" @@ -271,6 +272,9 @@ type Location struct { GRPC bool `json:"grpc"` // LuaRestyWAF contains parameters to configure lua-resty-waf LuaRestyWAF luarestywaf.Config `json:"luaRestyWAF"` + // InfluxDB allows to monitor the incoming request by sending them to an influxdb database + // +optional + InfluxDB influxdb.Config `json:"influxDB,omitempty"` } // SSLPassthroughBackend describes a SSL upstream server configured diff --git a/internal/ingress/types_equals.go b/internal/ingress/types_equals.go index 9d7a6b3dc..c0018592e 100644 --- a/internal/ingress/types_equals.go +++ b/internal/ingress/types_equals.go @@ -389,6 +389,10 @@ func (l1 *Location) Equal(l2 *Location) bool { return false } + if !(&l1.InfluxDB).Equal(&l2.InfluxDB) { + return false + } + return true } diff --git a/rootfs/etc/nginx/template/nginx.tmpl b/rootfs/etc/nginx/template/nginx.tmpl index 35004eef1..ba148e7be 100644 --- a/rootfs/etc/nginx/template/nginx.tmpl +++ b/rootfs/etc/nginx/template/nginx.tmpl @@ -980,6 +980,10 @@ stream { {{ template "CORS" $location }} {{ end }} + {{ if $location.InfluxDB.InfluxDBEnabled }} + influxdb server_name=$location.InfluxDB.InfluxDBServerName host=$location.InfluxDB.InfluxDBHost port=$location.InfluxDB.InfluxDBPort measurement=$location.InfluxDB.InfluxDBMeasurement enabled=true; + {{ end }} + {{ if not (empty $location.Redirect.URL) }} if ($uri ~* {{ $path }}) { return {{ $location.Redirect.Code }} {{ $location.Redirect.URL }}; From d434583b535549d18a9d264f163dbeb485ca1ec5 Mon Sep 17 00:00:00 2001 From: Lorenzo Fontana Date: Fri, 18 May 2018 01:49:47 +0200 Subject: [PATCH 2/3] InfluxDB configuration string template builder helper Signed-off-by: Lorenzo Fontana --- .../ingress/controller/template/template.go | 25 +++++++++++++++++++ rootfs/etc/nginx/template/nginx.tmpl | 4 +-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/internal/ingress/controller/template/template.go b/internal/ingress/controller/template/template.go index ab771b54a..320c31553 100644 --- a/internal/ingress/controller/template/template.go +++ b/internal/ingress/controller/template/template.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress-nginx/internal/file" "k8s.io/ingress-nginx/internal/ingress" + "k8s.io/ingress-nginx/internal/ingress/annotations/influxdb" "k8s.io/ingress-nginx/internal/ingress/annotations/ratelimit" "k8s.io/ingress-nginx/internal/ingress/controller/config" ing_net "k8s.io/ingress-nginx/internal/net" @@ -153,6 +154,7 @@ var ( "buildOpentracingLoad": buildOpentracingLoad, "buildOpentracing": buildOpentracing, "proxySetHeader": proxySetHeader, + "buildInfluxDB": buildInfluxDB, } ) @@ -897,6 +899,29 @@ func buildOpentracing(input interface{}) string { return buf.String() } +// buildInfluxDB produces the single line configuration +// needed by the InfluxDB module to send request's metrics +// for the current resource +func buildInfluxDB(input interface{}) string { + cfg, ok := input.(influxdb.Config) + if !ok { + glog.Errorf("expected an 'influxdb.Config' type but %T was returned", input) + return "" + } + + if !cfg.InfluxDBEnabled { + return "" + } + + return fmt.Sprintf( + "influxdb server_name=%s host=%s port=%s measurement=%s enabled=true;", + cfg.InfluxDBServerName, + cfg.InfluxDBHost, + cfg.InfluxDBPort, + cfg.InfluxDBMeasurement, + ) +} + func proxySetHeader(loc interface{}) string { location, ok := loc.(*ingress.Location) if !ok { diff --git a/rootfs/etc/nginx/template/nginx.tmpl b/rootfs/etc/nginx/template/nginx.tmpl index ba148e7be..22eec9521 100644 --- a/rootfs/etc/nginx/template/nginx.tmpl +++ b/rootfs/etc/nginx/template/nginx.tmpl @@ -980,9 +980,7 @@ stream { {{ template "CORS" $location }} {{ end }} - {{ if $location.InfluxDB.InfluxDBEnabled }} - influxdb server_name=$location.InfluxDB.InfluxDBServerName host=$location.InfluxDB.InfluxDBHost port=$location.InfluxDB.InfluxDBPort measurement=$location.InfluxDB.InfluxDBMeasurement enabled=true; - {{ end }} + {{ buildInfluxDB $location.InfluxDB }} {{ if not (empty $location.Redirect.URL) }} if ($uri ~* {{ $path }}) { From c3b896dfbc547379cd1936adb176291ecec91c6c Mon Sep 17 00:00:00 2001 From: Lorenzo Fontana Date: Sat, 19 May 2018 16:45:17 +0200 Subject: [PATCH 3/3] InfluxDB annotations e2e tests Signed-off-by: Lorenzo Fontana --- .../ingress/annotations/influxdb/main_test.go | 4 +- test/e2e/annotations/influxdb.go | 193 ++++++++++++++++++ test/e2e/framework/influxdb.go | 162 +++++++++++++++ test/e2e/framework/k8s.go | 12 ++ 4 files changed, 369 insertions(+), 2 deletions(-) create mode 100644 test/e2e/annotations/influxdb.go create mode 100644 test/e2e/framework/influxdb.go diff --git a/internal/ingress/annotations/influxdb/main_test.go b/internal/ingress/annotations/influxdb/main_test.go index f9ddef59b..b8c67ba59 100644 --- a/internal/ingress/annotations/influxdb/main_test.go +++ b/internal/ingress/annotations/influxdb/main_test.go @@ -69,7 +69,7 @@ func TestIngressInfluxDB(t *testing.T) { data[parser.GetAnnotationWithPrefix("enable-influxdb")] = "true" data[parser.GetAnnotationWithPrefix("influxdb-measurement")] = "nginxmeasures" data[parser.GetAnnotationWithPrefix("influxdb-port")] = "9091" - data[parser.GetAnnotationWithPrefix("influxdb-host")] = "mytelegrafserver.mycompany.mytld" + data[parser.GetAnnotationWithPrefix("influxdb-host")] = "10.99.0.13" data[parser.GetAnnotationWithPrefix("influxdb-server-name")] = "nginx-test-1" ing.SetAnnotations(data) @@ -91,7 +91,7 @@ func TestIngressInfluxDB(t *testing.T) { t.Errorf("expected port not found. Found %v", nginxInflux.InfluxDBPort) } - if nginxInflux.InfluxDBHost != "mytelegrafserver.mycompany.mytld" { + if nginxInflux.InfluxDBHost != "10.99.0.13" { t.Errorf("expected host not found. Found %v", nginxInflux.InfluxDBHost) } diff --git a/test/e2e/annotations/influxdb.go b/test/e2e/annotations/influxdb.go new file mode 100644 index 000000000..f72825ba5 --- /dev/null +++ b/test/e2e/annotations/influxdb.go @@ -0,0 +1,193 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package annotations + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "os/exec" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/parnurzeal/gorequest" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/ingress-nginx/test/e2e/framework" +) + +var _ = framework.IngressNginxDescribe("Annotations - influxdb", func() { + f := framework.NewDefaultFramework("influxdb") + + BeforeEach(func() { + err := f.NewInfluxDBDeployment() + Expect(err).NotTo(HaveOccurred()) + err = f.NewEchoDeployment() + Expect(err).NotTo(HaveOccurred()) + }) + + Context("when influxdb is enabled", func() { + It("should send the request metric to the influxdb server", func() { + ifs, err := createInfluxDBService(f) + + Expect(err).NotTo(HaveOccurred()) + + // Ingress configured with InfluxDB annotations + host := "influxdb.e2e.local" + createInfluxDBIngress( + f, + host, + "http-svc", + 8080, + map[string]string{ + "nginx.ingress.kubernetes.io/enable-influxdb": "true", + "nginx.ingress.kubernetes.io/influxdb-host": ifs.Spec.ClusterIP, + "nginx.ingress.kubernetes.io/influxdb-port": "8089", + "nginx.ingress.kubernetes.io/influxdb-measurement": "requests", + "nginx.ingress.kubernetes.io/influxdb-servername": "e2e-nginx-srv", + }, + ) + + // Do a request to the echo server ingress that sends metrics + // to the InfluxDB backend. + res, _, errs := gorequest.New(). + Get(f.IngressController.HTTPURL). + Set("Host", host). + End() + + Expect(len(errs)).Should(Equal(0)) + Expect(res.StatusCode).Should(Equal(http.StatusOK)) + + time.Sleep(5 * time.Second) + measurements, err := extractInfluxDBMeasurements(f) + + var results map[string][]map[string]interface{} + json.Unmarshal([]byte(measurements), &results) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(measurements)).ShouldNot(Equal(0)) + for _, elem := range results["results"] { + Expect(len(elem)).ShouldNot(Equal(0)) + } + }) + }) +}) + +func createInfluxDBService(f *framework.Framework) (*corev1.Service, error) { + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "inflxudb-svc", + Namespace: f.IngressController.Namespace, + }, + Spec: corev1.ServiceSpec{Ports: []corev1.ServicePort{ + { + Name: "udp", + Port: 8089, + TargetPort: intstr.FromInt(8089), + Protocol: "UDP", + }, + }, + Selector: map[string]string{ + "app": "influxdb-svc", + }, + }, + } + + s, err := f.EnsureService(service) + if err != nil { + return nil, err + } + + if s == nil { + return nil, fmt.Errorf("unexpected error creating service for influxdb deployment") + } + + return s, nil +} + +func createInfluxDBIngress(f *framework.Framework, host, service string, port int, annotations map[string]string) { + ing, err := f.EnsureIngress(framework.NewSingleIngress(host, "/", host, f.IngressController.Namespace, service, port, &annotations)) + Expect(err).NotTo(HaveOccurred()) + Expect(ing).NotTo(BeNil()) + + err = f.WaitForNginxServer(host, + func(server string) bool { + return Expect(server).Should(ContainSubstring(fmt.Sprintf("server_name %v", host))) && + Expect(server).ShouldNot(ContainSubstring("return 503")) + }) + Expect(err).NotTo(HaveOccurred()) +} + +func extractInfluxDBMeasurements(f *framework.Framework) (string, error) { + l, err := f.KubeClientSet.CoreV1().Pods(f.IngressController.Namespace).List(metav1.ListOptions{ + LabelSelector: "app=influxdb-svc", + }) + if err != nil { + return "", err + } + + if len(l.Items) == 0 { + return "", err + } + + cmd := "influx -database 'nginx' -execute 'select * from requests' -format 'json' -pretty" + + var pod *corev1.Pod + for _, p := range l.Items { + pod = &p + break + } + + if pod == nil { + return "", fmt.Errorf("no influxdb pods found") + } + + o, err := execInfluxDBCommand(pod, cmd) + if err != nil { + return "", err + } + + return o, nil +} + +func execInfluxDBCommand(pod *corev1.Pod, command string) (string, error) { + var ( + execOut bytes.Buffer + execErr bytes.Buffer + ) + + args := fmt.Sprintf("kubectl exec --namespace %v %v -- %v", pod.Namespace, pod.Name, command) + cmd := exec.Command("/bin/bash", "-c", args) + cmd.Stdout = &execOut + cmd.Stderr = &execErr + + err := cmd.Run() + + if execErr.Len() > 0 { + return "", fmt.Errorf("stderr: %v", execErr.String()) + } + + if err != nil { + return "", fmt.Errorf("could not execute: %v", err) + } + + return execOut.String(), nil +} diff --git a/test/e2e/framework/influxdb.go b/test/e2e/framework/influxdb.go new file mode 100644 index 000000000..82ef69b80 --- /dev/null +++ b/test/e2e/framework/influxdb.go @@ -0,0 +1,162 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "fmt" + "time" + + "github.com/pkg/errors" + + corev1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" +) + +const influxConfig = ` +reporting-disabled = true +bind-address = "0.0.0.0:8088" + +[meta] + dir = "/var/lib/influxdb/meta" + retention-autocreate = true + logging-enabled = true + +[data] + dir = "/var/lib/influxdb/data" + index-version = "inmem" + wal-dir = "/var/lib/influxdb/wal" + wal-fsync-delay = "0s" + query-log-enabled = true + cache-max-memory-size = 1073741824 + cache-snapshot-memory-size = 26214400 + cache-snapshot-write-cold-duration = "10m0s" + compact-full-write-cold-duration = "4h0m0s" + max-series-per-database = 1000000 + max-values-per-tag = 100000 + max-concurrent-compactions = 0 + trace-logging-enabled = false + +[[udp]] + enabled = true + bind-address = ":8089" + database = "nginx" +` + +// NewInfluxDBDeployment creates an InfluxDB server configured to reply +// on 8086/tcp and 8089/udp +func (f *Framework) NewInfluxDBDeployment() error { + configuration := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "influxdb-config", + Namespace: f.IngressController.Namespace, + }, + Data: map[string]string{ + "influxd.conf": influxConfig, + }, + } + + cm, err := f.EnsureConfigMap(configuration) + if err != nil { + return err + } + + if cm == nil { + return fmt.Errorf("unexpected error creating configmap for influxdb") + } + + deployment := &extensions.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "influxdb-svc", + Namespace: f.IngressController.Namespace, + }, + Spec: extensions.DeploymentSpec{ + Replicas: NewInt32(1), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "influxdb-svc", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "influxdb-svc", + }, + }, + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: NewInt64(0), + Volumes: []corev1.Volume{ + { + Name: "influxdb-config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "influxdb-config", + }, + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "influxdb-svc", + Image: "docker.io/influxdb:1.5", + Env: []corev1.EnvVar{}, + Command: []string{"influxd", "-config", "/influxdb-config/influxd.conf"}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "influxdb-config", + ReadOnly: true, + MountPath: "/influxdb-config", + }, + }, + Ports: []corev1.ContainerPort{ + { + Name: "http", + ContainerPort: 8086, + }, + { + Name: "udp", + ContainerPort: 8089, + }, + }, + }, + }, + }, + }, + }, + } + + d, err := f.EnsureDeployment(deployment) + if err != nil { + return err + } + + if d == nil { + return fmt.Errorf("unexpected error creating deployement for influxdb") + } + + err = WaitForPodsReady(f.KubeClientSet, 5*time.Minute, 1, f.IngressController.Namespace, metav1.ListOptions{ + LabelSelector: fields.SelectorFromSet(fields.Set(d.Spec.Template.ObjectMeta.Labels)).String(), + }) + if err != nil { + return errors.Wrap(err, "failed to wait for influxdb to become ready") + } + + return nil +} diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go index db21765ed..e5594497f 100644 --- a/test/e2e/framework/k8s.go +++ b/test/e2e/framework/k8s.go @@ -42,6 +42,18 @@ func (f *Framework) EnsureSecret(secret *api.Secret) (*api.Secret, error) { return s, nil } +// EnsureConfigMap creates a ConfigMap object or returns it if it already exists. +func (f *Framework) EnsureConfigMap(configMap *api.ConfigMap) (*api.ConfigMap, error) { + cm, err := f.KubeClientSet.CoreV1().ConfigMaps(configMap.Namespace).Create(configMap) + if err != nil { + if k8sErrors.IsAlreadyExists(err) { + return f.KubeClientSet.CoreV1().ConfigMaps(configMap.Namespace).Update(configMap) + } + return nil, err + } + return cm, nil +} + // EnsureIngress creates an Ingress object or returns it if it already exists. func (f *Framework) EnsureIngress(ingress *extensions.Ingress) (*extensions.Ingress, error) { s, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress)