Merge pull request #2505 from fntlnz/nginx-module

Annotations for the InfluxDB module
This commit is contained in:
k8s-ci-robot 2018-05-21 13:54:13 -07:00 committed by GitHub
commit 52496102b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 646 additions and 1 deletions

View file

@ -53,7 +53,7 @@ IMAGE = $(REGISTRY)/$(IMGNAME)
MULTI_ARCH_IMG = $(IMAGE)-$(ARCH) MULTI_ARCH_IMG = $(IMAGE)-$(ARCH)
# Set default base image dynamically for each arch # Set default base image dynamically for each arch
BASEIMAGE?=quay.io/kubernetes-ingress-controller/nginx-$(ARCH):0.45 BASEIMAGE?=quay.io/kubernetes-ingress-controller/nginx-$(ARCH):0.46
ifeq ($(ARCH),arm) ifeq ($(ARCH),arm)
QEMUARCH=arm QEMUARCH=arm

View file

@ -77,6 +77,11 @@ You can add these Kubernetes annotations to specific Ingress objects to customiz
|[nginx.ingress.kubernetes.io/lua-resty-waf-debug](#lua-resty-waf)|"true" or "false"| |[nginx.ingress.kubernetes.io/lua-resty-waf-debug](#lua-resty-waf)|"true" or "false"|
|[nginx.ingress.kubernetes.io/lua-resty-waf-ignore-rulesets](#lua-resty-waf)|string| |[nginx.ingress.kubernetes.io/lua-resty-waf-ignore-rulesets](#lua-resty-waf)|string|
|[nginx.ingress.kubernetes.io/lua-resty-waf-extra-rules](#lua-resty-waf)|string| |[nginx.ingress.kubernetes.io/lua-resty-waf-extra-rules](#lua-resty-waf)|string|
|[nginx.ingress.kubernetes.io/enable-influxdb](#influxdb)|"true" or "false"|
|[nginx.ingress.kubernetes.io/influxdb-measurement](#influxdb)|string|
|[nginx.ingress.kubernetes.io/influxdb-port](#influxdb)|string|
|[nginx.ingress.kubernetes.io/influxdb-host](#influxdb)|string|
|[nginx.ingress.kubernetes.io/influxdb-server-name](#influxdb)|string|
### Rewrite ### Rewrite
@ -553,3 +558,25 @@ Additionally, if the gRPC service requires TLS, add `nginx.ingress.kubernetes.io
Exposing a gRPC service using HTTP is not supported. Exposing a gRPC service using HTTP is not supported.
[configmap]: ./configmap.md [configmap]: ./configmap.md
### InfluxDB
Using `influxdb-*` annotations we can monitor requests passing through a Location by sending them to an InfluxDB backend exposing the UDP socket
using the [nginx-influxdb-module](https://github.com/influxdata/nginx-influxdb-module/).
```yaml
nginx.ingress.kubernetes.io/enable-influxdb: "true"
nginx.ingress.kubernetes.io/influxdb-measurement: "nginx-reqs"
nginx.ingress.kubernetes.io/influxdb-port: "8089"
nginx.ingress.kubernetes.io/influxdb-host: "influxdb"
nginx.ingress.kubernetes.io/influxdb-server-name: "nginx-ingress"
```
For the `influxdb-host` parameter you have two options:
To use the module in the Kubernetes Nginx ingress controller, you have two options:
- Use an InfluxDB server configured to enable the [UDP protocol](https://docs.influxdata.com/influxdb/v1.5/supported_protocols/udp/).
- Deploy Telegraf as a sidecar proxy to the Ingress controller configured to listen UDP with the [socket listener input](https://github.com/influxdata/telegraf/tree/release-1.6/plugins/inputs/socket_listener) and to write using
anyone of the [outputs plugins](https://github.com/influxdata/telegraf/tree/release-1.6/plugins/outputs)

View file

@ -34,6 +34,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/defaultbackend" "k8s.io/ingress-nginx/internal/ingress/annotations/defaultbackend"
"k8s.io/ingress-nginx/internal/ingress/annotations/grpc" "k8s.io/ingress-nginx/internal/ingress/annotations/grpc"
"k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck" "k8s.io/ingress-nginx/internal/ingress/annotations/healthcheck"
"k8s.io/ingress-nginx/internal/ingress/annotations/influxdb"
"k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist" "k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist"
"k8s.io/ingress-nginx/internal/ingress/annotations/loadbalancing" "k8s.io/ingress-nginx/internal/ingress/annotations/loadbalancing"
"k8s.io/ingress-nginx/internal/ingress/annotations/log" "k8s.io/ingress-nginx/internal/ingress/annotations/log"
@ -95,6 +96,7 @@ type Ingress struct {
Logs log.Config Logs log.Config
GRPC bool GRPC bool
LuaRestyWAF luarestywaf.Config LuaRestyWAF luarestywaf.Config
InfluxDB influxdb.Config
} }
// Extractor defines the annotation parsers to be used in the extraction of annotations // Extractor defines the annotation parsers to be used in the extraction of annotations
@ -136,6 +138,7 @@ func NewAnnotationExtractor(cfg resolver.Resolver) Extractor {
"Logs": log.NewParser(cfg), "Logs": log.NewParser(cfg),
"GRPC": grpc.NewParser(cfg), "GRPC": grpc.NewParser(cfg),
"LuaRestyWAF": luarestywaf.NewParser(cfg), "LuaRestyWAF": luarestywaf.NewParser(cfg),
"InfluxDB": influxdb.NewParser(cfg),
}, },
} }
} }

View file

@ -0,0 +1,104 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package influxdb
import (
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/resolver"
)
type influxdb struct {
r resolver.Resolver
}
// Config contains the IfluxDB configuration to be used in the Ingress
type Config struct {
InfluxDBEnabled bool `json:"influxDBEnabled"`
InfluxDBMeasurement string `json:"influxDBMeasurement"`
InfluxDBPort string `json:"influxDBPort"`
InfluxDBHost string `json:"influxDBHost"`
InfluxDBServerName string `json:"influxDBServerName"`
}
// NewParser creates a new InfluxDB annotation parser
func NewParser(r resolver.Resolver) parser.IngressAnnotation {
return influxdb{r}
}
// Parse parses the annotations to look for InfluxDB configurations
func (c influxdb) Parse(ing *extensions.Ingress) (interface{}, error) {
influxdbEnabled, err := parser.GetBoolAnnotation("enable-influxdb", ing)
if err != nil {
influxdbEnabled = false
}
influxdbMeasurement, err := parser.GetStringAnnotation("influxdb-measurement", ing)
if err != nil {
influxdbMeasurement = "default"
}
influxdbPort, err := parser.GetStringAnnotation("influxdb-port", ing)
if err != nil {
// This is not the default 8086 port but the port usually used to expose
// influxdb in UDP, the module uses UDP to talk to influx via the line protocol.
influxdbPort = "8089"
}
influxdbHost, err := parser.GetStringAnnotation("influxdb-host", ing)
if err != nil {
influxdbHost = "127.0.0.1"
}
influxdbServerName, err := parser.GetStringAnnotation("influxdb-server-name", ing)
if err != nil {
influxdbServerName = "nginx-ingress"
}
return &Config{
InfluxDBEnabled: influxdbEnabled,
InfluxDBMeasurement: influxdbMeasurement,
InfluxDBPort: influxdbPort,
InfluxDBHost: influxdbHost,
InfluxDBServerName: influxdbServerName,
}, nil
}
// Equal tests for equality between two Config types
func (e1 *Config) Equal(e2 *Config) bool {
if e1 == e2 {
return true
}
if e1 == nil || e2 == nil {
return false
}
if e1.InfluxDBEnabled != e2.InfluxDBEnabled {
return false
}
if e1.InfluxDBPort != e2.InfluxDBPort {
return false
}
if e1.InfluxDBHost != e2.InfluxDBHost {
return false
}
if e1.InfluxDBServerName != e2.InfluxDBServerName {
return false
}
return true
}

View file

@ -0,0 +1,101 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package influxdb
import (
"testing"
api "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/ingress/resolver"
)
func buildIngress() *extensions.Ingress {
defaultBackend := extensions.IngressBackend{
ServiceName: "default-backend",
ServicePort: intstr.FromInt(80),
}
return &extensions.Ingress{
ObjectMeta: meta_v1.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
},
Spec: extensions.IngressSpec{
Backend: &extensions.IngressBackend{
ServiceName: "default-backend",
ServicePort: intstr.FromInt(80),
},
Rules: []extensions.IngressRule{
{
Host: "foo.bar.com",
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/foo",
Backend: defaultBackend,
},
},
},
},
},
},
},
}
}
func TestIngressInfluxDB(t *testing.T) {
ing := buildIngress()
data := map[string]string{}
data[parser.GetAnnotationWithPrefix("enable-influxdb")] = "true"
data[parser.GetAnnotationWithPrefix("influxdb-measurement")] = "nginxmeasures"
data[parser.GetAnnotationWithPrefix("influxdb-port")] = "9091"
data[parser.GetAnnotationWithPrefix("influxdb-host")] = "10.99.0.13"
data[parser.GetAnnotationWithPrefix("influxdb-server-name")] = "nginx-test-1"
ing.SetAnnotations(data)
influx, _ := NewParser(&resolver.Mock{}).Parse(ing)
nginxInflux, ok := influx.(*Config)
if !ok {
t.Errorf("expected a Config type")
}
if !nginxInflux.InfluxDBEnabled {
t.Errorf("expected influxdb enabled but returned %v", nginxInflux.InfluxDBEnabled)
}
if nginxInflux.InfluxDBMeasurement != "nginxmeasures" {
t.Errorf("expected measurement name not found. Found %v", nginxInflux.InfluxDBMeasurement)
}
if nginxInflux.InfluxDBPort != "9091" {
t.Errorf("expected port not found. Found %v", nginxInflux.InfluxDBPort)
}
if nginxInflux.InfluxDBHost != "10.99.0.13" {
t.Errorf("expected host not found. Found %v", nginxInflux.InfluxDBHost)
}
if nginxInflux.InfluxDBServerName != "nginx-test-1" {
t.Errorf("expected server name not found. Found %v", nginxInflux.InfluxDBServerName)
}
}

View file

@ -516,6 +516,11 @@ type Configuration struct {
// DisableLuaRestyWAF disables lua-resty-waf globally regardless // DisableLuaRestyWAF disables lua-resty-waf globally regardless
// of whether there's an ingress that has enabled the WAF using annotation // of whether there's an ingress that has enabled the WAF using annotation
DisableLuaRestyWAF bool `json:"disable-lua-resty-waf"` DisableLuaRestyWAF bool `json:"disable-lua-resty-waf"`
// EnableInfluxDB enables the nginx InfluxDB extension
// http://github.com/influxdata/nginx-influxdb-module/
// By default this is disabled
EnableInfluxDB bool `json:"enable-influxdb"`
} }
// NewDefault returns the default nginx configuration // NewDefault returns the default nginx configuration

View file

@ -450,6 +450,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
loc.Logs = anns.Logs loc.Logs = anns.Logs
loc.GRPC = anns.GRPC loc.GRPC = anns.GRPC
loc.LuaRestyWAF = anns.LuaRestyWAF loc.LuaRestyWAF = anns.LuaRestyWAF
loc.InfluxDB = anns.InfluxDB
if loc.Redirect.FromToWWW { if loc.Redirect.FromToWWW {
server.RedirectFromToWWW = true server.RedirectFromToWWW = true
@ -486,6 +487,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
Logs: anns.Logs, Logs: anns.Logs,
GRPC: anns.GRPC, GRPC: anns.GRPC,
LuaRestyWAF: anns.LuaRestyWAF, LuaRestyWAF: anns.LuaRestyWAF,
InfluxDB: anns.InfluxDB,
} }
if loc.Redirect.FromToWWW { if loc.Redirect.FromToWWW {
@ -922,6 +924,7 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
defLoc.Denied = anns.Denied defLoc.Denied = anns.Denied
defLoc.GRPC = anns.GRPC defLoc.GRPC = anns.GRPC
defLoc.LuaRestyWAF = anns.LuaRestyWAF defLoc.LuaRestyWAF = anns.LuaRestyWAF
defLoc.InfluxDB = anns.InfluxDB
} }
} }
} }

View file

@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-nginx/internal/file" "k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress" "k8s.io/ingress-nginx/internal/ingress"
"k8s.io/ingress-nginx/internal/ingress/annotations/influxdb"
"k8s.io/ingress-nginx/internal/ingress/annotations/ratelimit" "k8s.io/ingress-nginx/internal/ingress/annotations/ratelimit"
"k8s.io/ingress-nginx/internal/ingress/controller/config" "k8s.io/ingress-nginx/internal/ingress/controller/config"
ing_net "k8s.io/ingress-nginx/internal/net" ing_net "k8s.io/ingress-nginx/internal/net"
@ -153,6 +154,7 @@ var (
"buildOpentracingLoad": buildOpentracingLoad, "buildOpentracingLoad": buildOpentracingLoad,
"buildOpentracing": buildOpentracing, "buildOpentracing": buildOpentracing,
"proxySetHeader": proxySetHeader, "proxySetHeader": proxySetHeader,
"buildInfluxDB": buildInfluxDB,
} }
) )
@ -897,6 +899,29 @@ func buildOpentracing(input interface{}) string {
return buf.String() return buf.String()
} }
// buildInfluxDB produces the single line configuration
// needed by the InfluxDB module to send request's metrics
// for the current resource
func buildInfluxDB(input interface{}) string {
cfg, ok := input.(influxdb.Config)
if !ok {
glog.Errorf("expected an 'influxdb.Config' type but %T was returned", input)
return ""
}
if !cfg.InfluxDBEnabled {
return ""
}
return fmt.Sprintf(
"influxdb server_name=%s host=%s port=%s measurement=%s enabled=true;",
cfg.InfluxDBServerName,
cfg.InfluxDBHost,
cfg.InfluxDBPort,
cfg.InfluxDBMeasurement,
)
}
func proxySetHeader(loc interface{}) string { func proxySetHeader(loc interface{}) string {
location, ok := loc.(*ingress.Location) location, ok := loc.(*ingress.Location)
if !ok { if !ok {

View file

@ -28,6 +28,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress/annotations/authtls" "k8s.io/ingress-nginx/internal/ingress/annotations/authtls"
"k8s.io/ingress-nginx/internal/ingress/annotations/connection" "k8s.io/ingress-nginx/internal/ingress/annotations/connection"
"k8s.io/ingress-nginx/internal/ingress/annotations/cors" "k8s.io/ingress-nginx/internal/ingress/annotations/cors"
"k8s.io/ingress-nginx/internal/ingress/annotations/influxdb"
"k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist" "k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist"
"k8s.io/ingress-nginx/internal/ingress/annotations/log" "k8s.io/ingress-nginx/internal/ingress/annotations/log"
"k8s.io/ingress-nginx/internal/ingress/annotations/luarestywaf" "k8s.io/ingress-nginx/internal/ingress/annotations/luarestywaf"
@ -271,6 +272,9 @@ type Location struct {
GRPC bool `json:"grpc"` GRPC bool `json:"grpc"`
// LuaRestyWAF contains parameters to configure lua-resty-waf // LuaRestyWAF contains parameters to configure lua-resty-waf
LuaRestyWAF luarestywaf.Config `json:"luaRestyWAF"` LuaRestyWAF luarestywaf.Config `json:"luaRestyWAF"`
// InfluxDB allows to monitor the incoming request by sending them to an influxdb database
// +optional
InfluxDB influxdb.Config `json:"influxDB,omitempty"`
} }
// SSLPassthroughBackend describes a SSL upstream server configured // SSLPassthroughBackend describes a SSL upstream server configured

View file

@ -389,6 +389,10 @@ func (l1 *Location) Equal(l2 *Location) bool {
return false return false
} }
if !(&l1.InfluxDB).Equal(&l2.InfluxDB) {
return false
}
return true return true
} }

View file

@ -980,6 +980,8 @@ stream {
{{ template "CORS" $location }} {{ template "CORS" $location }}
{{ end }} {{ end }}
{{ buildInfluxDB $location.InfluxDB }}
{{ if not (empty $location.Redirect.URL) }} {{ if not (empty $location.Redirect.URL) }}
if ($uri ~* {{ $path }}) { if ($uri ~* {{ $path }}) {
return {{ $location.Redirect.Code }} {{ $location.Redirect.URL }}; return {{ $location.Redirect.Code }} {{ $location.Redirect.URL }};

View file

@ -0,0 +1,193 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package annotations
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"os/exec"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/parnurzeal/gorequest"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/ingress-nginx/test/e2e/framework"
)
var _ = framework.IngressNginxDescribe("Annotations - influxdb", func() {
f := framework.NewDefaultFramework("influxdb")
BeforeEach(func() {
err := f.NewInfluxDBDeployment()
Expect(err).NotTo(HaveOccurred())
err = f.NewEchoDeployment()
Expect(err).NotTo(HaveOccurred())
})
Context("when influxdb is enabled", func() {
It("should send the request metric to the influxdb server", func() {
ifs, err := createInfluxDBService(f)
Expect(err).NotTo(HaveOccurred())
// Ingress configured with InfluxDB annotations
host := "influxdb.e2e.local"
createInfluxDBIngress(
f,
host,
"http-svc",
8080,
map[string]string{
"nginx.ingress.kubernetes.io/enable-influxdb": "true",
"nginx.ingress.kubernetes.io/influxdb-host": ifs.Spec.ClusterIP,
"nginx.ingress.kubernetes.io/influxdb-port": "8089",
"nginx.ingress.kubernetes.io/influxdb-measurement": "requests",
"nginx.ingress.kubernetes.io/influxdb-servername": "e2e-nginx-srv",
},
)
// Do a request to the echo server ingress that sends metrics
// to the InfluxDB backend.
res, _, errs := gorequest.New().
Get(f.IngressController.HTTPURL).
Set("Host", host).
End()
Expect(len(errs)).Should(Equal(0))
Expect(res.StatusCode).Should(Equal(http.StatusOK))
time.Sleep(5 * time.Second)
measurements, err := extractInfluxDBMeasurements(f)
var results map[string][]map[string]interface{}
json.Unmarshal([]byte(measurements), &results)
Expect(err).NotTo(HaveOccurred())
Expect(len(measurements)).ShouldNot(Equal(0))
for _, elem := range results["results"] {
Expect(len(elem)).ShouldNot(Equal(0))
}
})
})
})
func createInfluxDBService(f *framework.Framework) (*corev1.Service, error) {
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "inflxudb-svc",
Namespace: f.IngressController.Namespace,
},
Spec: corev1.ServiceSpec{Ports: []corev1.ServicePort{
{
Name: "udp",
Port: 8089,
TargetPort: intstr.FromInt(8089),
Protocol: "UDP",
},
},
Selector: map[string]string{
"app": "influxdb-svc",
},
},
}
s, err := f.EnsureService(service)
if err != nil {
return nil, err
}
if s == nil {
return nil, fmt.Errorf("unexpected error creating service for influxdb deployment")
}
return s, nil
}
func createInfluxDBIngress(f *framework.Framework, host, service string, port int, annotations map[string]string) {
ing, err := f.EnsureIngress(framework.NewSingleIngress(host, "/", host, f.IngressController.Namespace, service, port, &annotations))
Expect(err).NotTo(HaveOccurred())
Expect(ing).NotTo(BeNil())
err = f.WaitForNginxServer(host,
func(server string) bool {
return Expect(server).Should(ContainSubstring(fmt.Sprintf("server_name %v", host))) &&
Expect(server).ShouldNot(ContainSubstring("return 503"))
})
Expect(err).NotTo(HaveOccurred())
}
func extractInfluxDBMeasurements(f *framework.Framework) (string, error) {
l, err := f.KubeClientSet.CoreV1().Pods(f.IngressController.Namespace).List(metav1.ListOptions{
LabelSelector: "app=influxdb-svc",
})
if err != nil {
return "", err
}
if len(l.Items) == 0 {
return "", err
}
cmd := "influx -database 'nginx' -execute 'select * from requests' -format 'json' -pretty"
var pod *corev1.Pod
for _, p := range l.Items {
pod = &p
break
}
if pod == nil {
return "", fmt.Errorf("no influxdb pods found")
}
o, err := execInfluxDBCommand(pod, cmd)
if err != nil {
return "", err
}
return o, nil
}
func execInfluxDBCommand(pod *corev1.Pod, command string) (string, error) {
var (
execOut bytes.Buffer
execErr bytes.Buffer
)
args := fmt.Sprintf("kubectl exec --namespace %v %v -- %v", pod.Namespace, pod.Name, command)
cmd := exec.Command("/bin/bash", "-c", args)
cmd.Stdout = &execOut
cmd.Stderr = &execErr
err := cmd.Run()
if execErr.Len() > 0 {
return "", fmt.Errorf("stderr: %v", execErr.String())
}
if err != nil {
return "", fmt.Errorf("could not execute: %v", err)
}
return execOut.String(), nil
}

View file

@ -0,0 +1,162 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"fmt"
"time"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
)
const influxConfig = `
reporting-disabled = true
bind-address = "0.0.0.0:8088"
[meta]
dir = "/var/lib/influxdb/meta"
retention-autocreate = true
logging-enabled = true
[data]
dir = "/var/lib/influxdb/data"
index-version = "inmem"
wal-dir = "/var/lib/influxdb/wal"
wal-fsync-delay = "0s"
query-log-enabled = true
cache-max-memory-size = 1073741824
cache-snapshot-memory-size = 26214400
cache-snapshot-write-cold-duration = "10m0s"
compact-full-write-cold-duration = "4h0m0s"
max-series-per-database = 1000000
max-values-per-tag = 100000
max-concurrent-compactions = 0
trace-logging-enabled = false
[[udp]]
enabled = true
bind-address = ":8089"
database = "nginx"
`
// NewInfluxDBDeployment creates an InfluxDB server configured to reply
// on 8086/tcp and 8089/udp
func (f *Framework) NewInfluxDBDeployment() error {
configuration := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "influxdb-config",
Namespace: f.IngressController.Namespace,
},
Data: map[string]string{
"influxd.conf": influxConfig,
},
}
cm, err := f.EnsureConfigMap(configuration)
if err != nil {
return err
}
if cm == nil {
return fmt.Errorf("unexpected error creating configmap for influxdb")
}
deployment := &extensions.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "influxdb-svc",
Namespace: f.IngressController.Namespace,
},
Spec: extensions.DeploymentSpec{
Replicas: NewInt32(1),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "influxdb-svc",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "influxdb-svc",
},
},
Spec: corev1.PodSpec{
TerminationGracePeriodSeconds: NewInt64(0),
Volumes: []corev1.Volume{
{
Name: "influxdb-config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "influxdb-config",
},
},
},
},
},
Containers: []corev1.Container{
{
Name: "influxdb-svc",
Image: "docker.io/influxdb:1.5",
Env: []corev1.EnvVar{},
Command: []string{"influxd", "-config", "/influxdb-config/influxd.conf"},
VolumeMounts: []corev1.VolumeMount{
{
Name: "influxdb-config",
ReadOnly: true,
MountPath: "/influxdb-config",
},
},
Ports: []corev1.ContainerPort{
{
Name: "http",
ContainerPort: 8086,
},
{
Name: "udp",
ContainerPort: 8089,
},
},
},
},
},
},
},
}
d, err := f.EnsureDeployment(deployment)
if err != nil {
return err
}
if d == nil {
return fmt.Errorf("unexpected error creating deployement for influxdb")
}
err = WaitForPodsReady(f.KubeClientSet, 5*time.Minute, 1, f.IngressController.Namespace, metav1.ListOptions{
LabelSelector: fields.SelectorFromSet(fields.Set(d.Spec.Template.ObjectMeta.Labels)).String(),
})
if err != nil {
return errors.Wrap(err, "failed to wait for influxdb to become ready")
}
return nil
}

View file

@ -42,6 +42,18 @@ func (f *Framework) EnsureSecret(secret *api.Secret) (*api.Secret, error) {
return s, nil return s, nil
} }
// EnsureConfigMap creates a ConfigMap object or returns it if it already exists.
func (f *Framework) EnsureConfigMap(configMap *api.ConfigMap) (*api.ConfigMap, error) {
cm, err := f.KubeClientSet.CoreV1().ConfigMaps(configMap.Namespace).Create(configMap)
if err != nil {
if k8sErrors.IsAlreadyExists(err) {
return f.KubeClientSet.CoreV1().ConfigMaps(configMap.Namespace).Update(configMap)
}
return nil, err
}
return cm, nil
}
// EnsureIngress creates an Ingress object or returns it if it already exists. // EnsureIngress creates an Ingress object or returns it if it already exists.
func (f *Framework) EnsureIngress(ingress *extensions.Ingress) (*extensions.Ingress, error) { func (f *Framework) EnsureIngress(ingress *extensions.Ingress) (*extensions.Ingress, error) {
s, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress) s, err := f.KubeClientSet.ExtensionsV1beta1().Ingresses(ingress.Namespace).Update(ingress)