Add health checking from NGINX to individual endpoints.
This commit is contained in:
parent
0b1fc38128
commit
9867801967
6 changed files with 132 additions and 1 deletions
|
@ -319,6 +319,7 @@ The default mime type list to compress is: `application/atom+xml application/jav
|
||||||
|
|
||||||
**use-http2:** Enables or disables [HTTP/2](http://nginx.org/en/docs/http/ngx_http_v2_module.html) support in secure connections.
|
**use-http2:** Enables or disables [HTTP/2](http://nginx.org/en/docs/http/ngx_http_v2_module.html) support in secure connections.
|
||||||
|
|
||||||
|
**use-upstream-health-checks:** Enables or disables the use of upstream health checks provided by the [nginx_upstream_check_module](https://github.com/yaoweibin/nginx_upstream_check_module) module. If enabled, NGINX will do health checking based on the `readinessProbe` in the pod definition.
|
||||||
|
|
||||||
**use-proxy-protocol:** Enables or disables the [PROXY protocol](https://www.nginx.com/resources/admin-guide/proxy-protocol/) to receive client connection (real IP address) information passed through proxy servers and load balancers such as HAProxy and Amazon Elastic Load Balancer (ELB).
|
**use-proxy-protocol:** Enables or disables the [PROXY protocol](https://www.nginx.com/resources/admin-guide/proxy-protocol/) to receive client connection (real IP address) information passed through proxy servers and load balancers such as HAProxy and Amazon Elastic Load Balancer (ELB).
|
||||||
|
|
||||||
|
|
|
@ -227,6 +227,11 @@ type Configuration struct {
|
||||||
// Defines the number of worker processes. By default auto means number of available CPU cores
|
// Defines the number of worker processes. By default auto means number of available CPU cores
|
||||||
// http://nginx.org/en/docs/ngx_core_module.html#worker_processes
|
// http://nginx.org/en/docs/ngx_core_module.html#worker_processes
|
||||||
WorkerProcesses int `json:"worker-processes,omitempty"`
|
WorkerProcesses int `json:"worker-processes,omitempty"`
|
||||||
|
|
||||||
|
// Enables or disables the use of upstream health checks provided by the
|
||||||
|
// nginx_upstream_check_module module. If enabled, NGINX will do health checking
|
||||||
|
// based on the readinessProbe in the pod definition.
|
||||||
|
UseUpstreamHealthChecks bool `json:"use-upstream-health-checks"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDefault returns the default nginx configuration
|
// NewDefault returns the default nginx configuration
|
||||||
|
@ -260,6 +265,7 @@ func NewDefault() Configuration {
|
||||||
WorkerProcesses: runtime.NumCPU(),
|
WorkerProcesses: runtime.NumCPU(),
|
||||||
VtsStatusZoneSize: "10m",
|
VtsStatusZoneSize: "10m",
|
||||||
UseHTTP2: true,
|
UseHTTP2: true,
|
||||||
|
UseUpstreamHealthChecks: false,
|
||||||
Backend: defaults.Backend{
|
Backend: defaults.Backend{
|
||||||
ProxyBodySize: bodySize,
|
ProxyBodySize: bodySize,
|
||||||
ProxyConnectTimeout: 5,
|
ProxyConnectTimeout: 5,
|
||||||
|
|
|
@ -188,6 +188,13 @@ http {
|
||||||
{{ end }}
|
{{ end }}
|
||||||
{{ range $server := $upstream.Endpoints }}server {{ $server.Address }}:{{ $server.Port }} max_fails={{ $server.MaxFails }} fail_timeout={{ $server.FailTimeout }};
|
{{ range $server := $upstream.Endpoints }}server {{ $server.Address }}:{{ $server.Port }} max_fails={{ $server.MaxFails }} fail_timeout={{ $server.FailTimeout }};
|
||||||
{{ end }}
|
{{ end }}
|
||||||
|
|
||||||
|
{{ if $cfg.UseUpstreamHealthChecks -}}
|
||||||
|
{{ if $upstream.UpstreamCheck -}}
|
||||||
|
check interval={{ $upstream.UpstreamCheck.IntervalMillis }} rise={{ $upstream.UpstreamCheck.Rise }} fall={{ $upstream.UpstreamCheck.Fall }} timeout={{ $upstream.UpstreamCheck.TimeoutMillis }} port={{ $upstream.UpstreamCheck.Port }} type=http;
|
||||||
|
check_http_send "{{ $upstream.UpstreamCheck.HttpSend }}";
|
||||||
|
{{- end }}
|
||||||
|
{{- end }}
|
||||||
}
|
}
|
||||||
{{ end }}
|
{{ end }}
|
||||||
|
|
||||||
|
@ -366,6 +373,12 @@ http {
|
||||||
return 200;
|
return 200;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{{ if $cfg.UseUpstreamHealthChecks }}
|
||||||
|
location /upstream_status {
|
||||||
|
check_status html;
|
||||||
|
}
|
||||||
|
{{ end }}
|
||||||
|
|
||||||
location /nginx_status {
|
location /nginx_status {
|
||||||
{{ if $cfg.EnableVtsStatus }}
|
{{ if $cfg.EnableVtsStatus }}
|
||||||
vhost_traffic_status_display;
|
vhost_traffic_status_display;
|
||||||
|
|
|
@ -17,7 +17,9 @@ limitations under the License.
|
||||||
package controller
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -35,6 +37,7 @@ import (
|
||||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
|
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||||
"k8s.io/kubernetes/pkg/util/intstr"
|
"k8s.io/kubernetes/pkg/util/intstr"
|
||||||
|
|
||||||
|
@ -66,7 +69,8 @@ const (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// list of ports that cannot be used by TCP or UDP services
|
// list of ports that cannot be used by TCP or UDP services
|
||||||
reservedPorts = []string{"80", "443", "8181", "18080"}
|
reservedPorts = []string{"80", "443", "8181", "18080"}
|
||||||
|
httpSendReplacer = strings.NewReplacer("\r", "\\r", "\n", "\\n", "\"", "\\\"")
|
||||||
)
|
)
|
||||||
|
|
||||||
// GenericController holds the boilerplate code required to build an Ingress controlller.
|
// GenericController holds the boilerplate code required to build an Ingress controlller.
|
||||||
|
@ -78,12 +82,14 @@ type GenericController struct {
|
||||||
svcController *cache.Controller
|
svcController *cache.Controller
|
||||||
secrController *cache.Controller
|
secrController *cache.Controller
|
||||||
mapController *cache.Controller
|
mapController *cache.Controller
|
||||||
|
podController *cache.Controller
|
||||||
|
|
||||||
ingLister cache_store.StoreToIngressLister
|
ingLister cache_store.StoreToIngressLister
|
||||||
svcLister cache.StoreToServiceLister
|
svcLister cache.StoreToServiceLister
|
||||||
endpLister cache.StoreToEndpointsLister
|
endpLister cache.StoreToEndpointsLister
|
||||||
secrLister cache_store.StoreToSecretsLister
|
secrLister cache_store.StoreToSecretsLister
|
||||||
mapLister cache_store.StoreToConfigmapLister
|
mapLister cache_store.StoreToConfigmapLister
|
||||||
|
podLister cache.StoreToPodLister
|
||||||
|
|
||||||
annotations annotationExtractor
|
annotations annotationExtractor
|
||||||
|
|
||||||
|
@ -302,6 +308,19 @@ func newIngressController(config *Configuration) *GenericController {
|
||||||
glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)")
|
glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ic.podLister.Indexer, ic.podController = cache.NewIndexerInformer(
|
||||||
|
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "pods", ic.cfg.Namespace, fields.Everything()),
|
||||||
|
&api.Pod{},
|
||||||
|
ic.cfg.ResyncPeriod,
|
||||||
|
cache.ResourceEventHandlerFuncs{},
|
||||||
|
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||||
|
|
||||||
|
ic.syncStatus = status.NewStatusSyncer(status.Config{
|
||||||
|
Client: config.Client,
|
||||||
|
PublishService: ic.cfg.PublishService,
|
||||||
|
IngressLister: ic.ingLister,
|
||||||
|
})
|
||||||
|
|
||||||
ic.annotations = newAnnotationExtractor(ic)
|
ic.annotations = newAnnotationExtractor(ic)
|
||||||
|
|
||||||
return &ic
|
return &ic
|
||||||
|
@ -312,6 +331,7 @@ func (ic *GenericController) controllersInSync() bool {
|
||||||
ic.svcController.HasSynced() &&
|
ic.svcController.HasSynced() &&
|
||||||
ic.endpController.HasSynced() &&
|
ic.endpController.HasSynced() &&
|
||||||
ic.secrController.HasSynced() &&
|
ic.secrController.HasSynced() &&
|
||||||
|
ic.podController.HasSynced() &&
|
||||||
ic.mapController.HasSynced()
|
ic.mapController.HasSynced()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -746,6 +766,20 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
upstreams[name].Endpoints = endp
|
upstreams[name].Endpoints = endp
|
||||||
|
|
||||||
|
probe, err := ic.findProbeForService(svcKey, &path.Backend.ServicePort)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to check for readinessProbe for %v: %v", name, err)
|
||||||
|
}
|
||||||
|
if probe != nil {
|
||||||
|
check, err := ic.getUpstreamCheckForProbe(probe)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to create health check for probe: %v", err)
|
||||||
|
} else {
|
||||||
|
upstreams[name].UpstreamCheck = check
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -753,6 +787,63 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
|
||||||
return upstreams
|
return upstreams
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ic *GenericController) findProbeForService(svcKey string, servicePort *intstr.IntOrString) (*api.Probe, error) {
|
||||||
|
svcObj, svcExists, err := ic.svcLister.Indexer.GetByKey(svcKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !svcExists {
|
||||||
|
err = fmt.Errorf("service %v does not exists", svcKey)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
svc := svcObj.(*api.Service)
|
||||||
|
|
||||||
|
selector := labels.SelectorFromSet(svc.Spec.Selector)
|
||||||
|
pods, err := ic.podLister.List(selector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to get pod listing: %v", err)
|
||||||
|
}
|
||||||
|
for _, pod := range pods {
|
||||||
|
for _, container := range pod.Spec.Containers {
|
||||||
|
for _, port := range container.Ports {
|
||||||
|
if servicePort.Type == intstr.Int && int(port.ContainerPort) == servicePort.IntValue() ||
|
||||||
|
servicePort.Type == intstr.String && port.Name == servicePort.String() {
|
||||||
|
if container.ReadinessProbe != nil {
|
||||||
|
if container.ReadinessProbe.HTTPGet == nil || container.ReadinessProbe.HTTPGet.Scheme != "HTTP" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return container.ReadinessProbe, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ic *GenericController) getUpstreamCheckForProbe(probe *api.Probe) (*ingress.UpstreamCheck, error) {
|
||||||
|
var headers http.Header = make(http.Header)
|
||||||
|
for _, header := range probe.HTTPGet.HTTPHeaders {
|
||||||
|
headers.Add(header.Name, header.Value)
|
||||||
|
}
|
||||||
|
headersWriter := new(bytes.Buffer)
|
||||||
|
headers.Write(headersWriter)
|
||||||
|
|
||||||
|
httpSend := httpSendReplacer.Replace(
|
||||||
|
fmt.Sprintf("GET %s HTTP/1.0\r\n%s\r\n", probe.HTTPGet.Path, string(headersWriter.Bytes())))
|
||||||
|
|
||||||
|
return &ingress.UpstreamCheck{
|
||||||
|
HttpSend: httpSend,
|
||||||
|
Port: probe.HTTPGet.Port.IntValue(),
|
||||||
|
Rise: probe.SuccessThreshold,
|
||||||
|
Fall: probe.FailureThreshold,
|
||||||
|
TimeoutMillis: probe.TimeoutSeconds * 1000,
|
||||||
|
IntervalMillis: probe.PeriodSeconds * 1000,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// serviceEndpoints returns the upstream servers (endpoints) associated
|
// serviceEndpoints returns the upstream servers (endpoints) associated
|
||||||
// to a service.
|
// to a service.
|
||||||
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
|
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
|
||||||
|
@ -1018,6 +1109,7 @@ func (ic GenericController) Start() {
|
||||||
go ic.svcController.Run(ic.stopCh)
|
go ic.svcController.Run(ic.stopCh)
|
||||||
go ic.secrController.Run(ic.stopCh)
|
go ic.secrController.Run(ic.stopCh)
|
||||||
go ic.mapController.Run(ic.stopCh)
|
go ic.mapController.Run(ic.stopCh)
|
||||||
|
go ic.podController.Run(ic.stopCh)
|
||||||
|
|
||||||
go ic.secretQueue.Run(5*time.Second, ic.stopCh)
|
go ic.secretQueue.Run(5*time.Second, ic.stopCh)
|
||||||
go ic.syncQueue.Run(5*time.Second, ic.stopCh)
|
go ic.syncQueue.Run(5*time.Second, ic.stopCh)
|
||||||
|
|
|
@ -102,6 +102,16 @@ type BackendInfo struct {
|
||||||
Repository string `json:"repository"`
|
Repository string `json:"repository"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpstreamCheck is used to configure ingress health checks
|
||||||
|
type UpstreamCheck struct {
|
||||||
|
HttpSend string
|
||||||
|
Port int
|
||||||
|
IntervalMillis int32
|
||||||
|
Fall int32
|
||||||
|
Rise int32
|
||||||
|
TimeoutMillis int32
|
||||||
|
}
|
||||||
|
|
||||||
// Configuration holds the definition of all the parts required to describe all
|
// Configuration holds the definition of all the parts required to describe all
|
||||||
// ingresses reachable by the ingress controller (using a filter by namespace)
|
// ingresses reachable by the ingress controller (using a filter by namespace)
|
||||||
type Configuration struct {
|
type Configuration struct {
|
||||||
|
@ -134,6 +144,8 @@ type Backend struct {
|
||||||
Secure bool `json:"secure"`
|
Secure bool `json:"secure"`
|
||||||
// Endpoints contains the list of endpoints currently running
|
// Endpoints contains the list of endpoints currently running
|
||||||
Endpoints []Endpoint `json:"endpoints"`
|
Endpoints []Endpoint `json:"endpoints"`
|
||||||
|
|
||||||
|
UpstreamCheck *UpstreamCheck
|
||||||
}
|
}
|
||||||
|
|
||||||
// Endpoint describes a kubernetes endpoint in an backend
|
// Endpoint describes a kubernetes endpoint in an backend
|
||||||
|
|
|
@ -29,6 +29,7 @@ export LUA_UPSTREAM_VERSION=0.06
|
||||||
export MORE_HEADERS_VERSION=0.32
|
export MORE_HEADERS_VERSION=0.32
|
||||||
export NGINX_DIGEST_AUTH=7955af9c77598c697ac292811914ce1e2b3b824c
|
export NGINX_DIGEST_AUTH=7955af9c77598c697ac292811914ce1e2b3b824c
|
||||||
export NGINX_SUBSTITUTIONS=bc58cb11844bc42735bbaef7085ea86ace46d05b
|
export NGINX_SUBSTITUTIONS=bc58cb11844bc42735bbaef7085ea86ace46d05b
|
||||||
|
export UPSTREAM_CHECK_VERSION=d6341aeeb86911d4798fbceab35015c63178e66f
|
||||||
|
|
||||||
export BUILD_PATH=/tmp/build
|
export BUILD_PATH=/tmp/build
|
||||||
|
|
||||||
|
@ -105,6 +106,8 @@ get_src 9b1d0075df787338bb607f14925886249bda60b6b3156713923d5d59e99a708b \
|
||||||
get_src 8eabbcd5950fdcc718bb0ef9165206c2ed60f67cd9da553d7bc3e6fe4e338461 \
|
get_src 8eabbcd5950fdcc718bb0ef9165206c2ed60f67cd9da553d7bc3e6fe4e338461 \
|
||||||
"https://github.com/yaoweibin/ngx_http_substitutions_filter_module/archive/$NGINX_SUBSTITUTIONS.tar.gz"
|
"https://github.com/yaoweibin/ngx_http_substitutions_filter_module/archive/$NGINX_SUBSTITUTIONS.tar.gz"
|
||||||
|
|
||||||
|
get_src 35983b0b6ae812bee9fb4de37db6bf68cea68f7e82a9fc274ab29d574e321e98 \
|
||||||
|
"https://github.com/yaoweibin/nginx_upstream_check_module/archive/$UPSTREAM_CHECK_VERSION.tar.gz"
|
||||||
|
|
||||||
#https://blog.cloudflare.com/optimizing-tls-over-tcp-to-reduce-latency/
|
#https://blog.cloudflare.com/optimizing-tls-over-tcp-to-reduce-latency/
|
||||||
curl -sSL -o nginx__dynamic_tls_records.patch https://raw.githubusercontent.com/cloudflare/sslconfig/master/patches/nginx__1.11.5_dynamic_tls_records.patch
|
curl -sSL -o nginx__dynamic_tls_records.patch https://raw.githubusercontent.com/cloudflare/sslconfig/master/patches/nginx__1.11.5_dynamic_tls_records.patch
|
||||||
|
@ -115,6 +118,9 @@ cd "$BUILD_PATH/nginx-$NGINX_VERSION"
|
||||||
echo "Applying tls nginx patches..."
|
echo "Applying tls nginx patches..."
|
||||||
patch -p1 < $BUILD_PATH/nginx__dynamic_tls_records.patch
|
patch -p1 < $BUILD_PATH/nginx__dynamic_tls_records.patch
|
||||||
|
|
||||||
|
echo "Applying nginx_upstream_check patch.."
|
||||||
|
patch -p0 < $BUILD_PATH/nginx_upstream_check_module-$UPSTREAM_CHECK_VERSION/check_1.11.5+.patch
|
||||||
|
|
||||||
./configure \
|
./configure \
|
||||||
--prefix=/usr/share/nginx \
|
--prefix=/usr/share/nginx \
|
||||||
--conf-path=/etc/nginx/nginx.conf \
|
--conf-path=/etc/nginx/nginx.conf \
|
||||||
|
@ -158,6 +164,7 @@ patch -p1 < $BUILD_PATH/nginx__dynamic_tls_records.patch
|
||||||
--add-module="$BUILD_PATH/nginx-goodies-nginx-sticky-module-ng-$STICKY_SESSIONS_VERSION" \
|
--add-module="$BUILD_PATH/nginx-goodies-nginx-sticky-module-ng-$STICKY_SESSIONS_VERSION" \
|
||||||
--add-module="$BUILD_PATH/nginx-http-auth-digest-$NGINX_DIGEST_AUTH" \
|
--add-module="$BUILD_PATH/nginx-http-auth-digest-$NGINX_DIGEST_AUTH" \
|
||||||
--add-module="$BUILD_PATH/ngx_http_substitutions_filter_module-$NGINX_SUBSTITUTIONS" \
|
--add-module="$BUILD_PATH/ngx_http_substitutions_filter_module-$NGINX_SUBSTITUTIONS" \
|
||||||
|
--add-module="$BUILD_PATH/nginx_upstream_check_module-$UPSTREAM_CHECK_VERSION" \
|
||||||
--add-module="$BUILD_PATH/lua-upstream-nginx-module-$LUA_UPSTREAM_VERSION" || exit 1 \
|
--add-module="$BUILD_PATH/lua-upstream-nginx-module-$LUA_UPSTREAM_VERSION" || exit 1 \
|
||||||
&& make || exit 1 \
|
&& make || exit 1 \
|
||||||
&& make install || exit 1
|
&& make install || exit 1
|
||||||
|
|
Loading…
Reference in a new issue