Add health checking from NGINX to individual endpoints.

This commit is contained in:
Chris Moos 2017-01-19 22:16:39 -07:00
parent fbcedc02dc
commit 5a3a3730a4
6 changed files with 120 additions and 2 deletions

View file

@ -314,6 +314,7 @@ The default mime type list to compress is: `application/atom+xml application/jav
**use-http2:** Enables or disables [HTTP/2](http://nginx.org/en/docs/http/ngx_http_v2_module.html) support in secure connections.
**use-upstream-health-checks:** Enables or disables the use of upstream health checks provided by the [nginx_upstream_check_module](https://github.com/yaoweibin/nginx_upstream_check_module) module. If enabled, NGINX will do health checking based on the `readinessProbe` in the pod definition.
**use-proxy-protocol:** Enables or disables the [PROXY protocol](https://www.nginx.com/resources/admin-guide/proxy-protocol/) to receive client connection (real IP address) information passed through proxy servers and load balancers such as HAProxy and Amazon Elastic Load Balancer (ELB).

View file

@ -220,6 +220,11 @@ type Configuration struct {
// Defines the number of worker processes. By default auto means number of available CPU cores
// http://nginx.org/en/docs/ngx_core_module.html#worker_processes
WorkerProcesses int `json:"worker-processes,omitempty"`
// Enables or disables the use of upstream health checks provided by the
// nginx_upstream_check_module module. If enabled, NGINX will do health checking
// based on the readinessProbe in the pod definition.
UseUpstreamHealthChecks bool `json:"use-upstream-health-checks"`
}
// NewDefault returns the default nginx configuration
@ -252,6 +257,7 @@ func NewDefault() Configuration {
WorkerProcesses: runtime.NumCPU(),
VtsStatusZoneSize: "10m",
UseHTTP2: true,
UseUpstreamHealthChecks: false,
Backend: defaults.Backend{
ProxyConnectTimeout: 5,
ProxyReadTimeout: 60,

View file

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM gcr.io/google_containers/nginx-slim:0.12
FROM gcr.io/google_containers/nginx-slim:0.13
RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get install -y \
diffutils \

View file

@ -187,6 +187,13 @@ http {
{{ end }}
{{ range $server := $upstream.Endpoints }}server {{ $server.Address }}:{{ $server.Port }} max_fails={{ $server.MaxFails }} fail_timeout={{ $server.FailTimeout }};
{{ end }}
{{ if $cfg.UseUpstreamHealthChecks -}}
{{ if $upstream.UpstreamCheck -}}
check interval={{ $upstream.UpstreamCheck.IntervalMillis }} rise={{ $upstream.UpstreamCheck.Rise }} fall={{ $upstream.UpstreamCheck.Fall }} timeout={{ $upstream.UpstreamCheck.TimeoutMillis }} port={{ $upstream.UpstreamCheck.Port }} type=http;
check_http_send "{{ $upstream.UpstreamCheck.HttpSend }}";
{{- end }}
{{- end }}
}
{{ end }}
@ -358,6 +365,12 @@ http {
access_log off;
return 200;
}
{{ if $cfg.UseUpstreamHealthChecks }}
location /upstream_status {
check_status html;
}
{{ end }}
location /nginx_status {
{{ if $cfg.EnableVtsStatus }}

View file

@ -17,7 +17,9 @@ limitations under the License.
package controller
import (
"bytes"
"fmt"
"net/http"
"reflect"
"sort"
"strconv"
@ -34,6 +36,7 @@ import (
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/intstr"
@ -64,7 +67,8 @@ const (
var (
// list of ports that cannot be used by TCP or UDP services
reservedPorts = []string{"80", "443", "8181", "18080"}
reservedPorts = []string{"80", "443", "8181", "18080"}
httpSendReplacer = strings.NewReplacer("\r", "\\r", "\n", "\\n", "\"", "\\\"")
)
// GenericController holds the boilerplate code required to build an Ingress controlller.
@ -76,12 +80,14 @@ type GenericController struct {
svcController *cache.Controller
secrController *cache.Controller
mapController *cache.Controller
podController *cache.Controller
ingLister cache_store.StoreToIngressLister
svcLister cache.StoreToServiceLister
endpLister cache.StoreToEndpointsLister
secrLister cache_store.StoreToSecretsLister
mapLister cache_store.StoreToConfigmapLister
podLister cache.StoreToPodLister
annotations annotationExtractor
@ -257,6 +263,13 @@ func newIngressController(config *Configuration) *GenericController {
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
ic.podLister.Indexer, ic.podController = cache.NewIndexerInformer(
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "pods", ic.cfg.Namespace, fields.Everything()),
&api.Pod{},
ic.cfg.ResyncPeriod,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
ic.syncStatus = status.NewStatusSyncer(status.Config{
Client: config.Client,
PublishService: ic.cfg.PublishService,
@ -273,6 +286,7 @@ func (ic *GenericController) controllersInSync() bool {
ic.svcController.HasSynced() &&
ic.endpController.HasSynced() &&
ic.secrController.HasSynced() &&
ic.podController.HasSynced() &&
ic.mapController.HasSynced()
}
@ -739,6 +753,20 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
continue
}
upstreams[name].Endpoints = endp
probe, err := ic.findProbeForService(svcKey, &path.Backend.ServicePort)
if err != nil {
glog.Errorf("Failed to check for readinessProbe for %v: %v", name, err)
}
if probe != nil {
check, err := ic.getUpstreamCheckForProbe(probe)
if err != nil {
glog.Errorf("Failed to create health check for probe: %v", err)
} else {
upstreams[name].UpstreamCheck = check
}
}
}
}
}
@ -746,6 +774,63 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
return upstreams
}
func (ic *GenericController) findProbeForService(svcKey string, servicePort *intstr.IntOrString) (*api.Probe, error) {
svcObj, svcExists, err := ic.svcLister.Indexer.GetByKey(svcKey)
if err != nil {
return nil, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
}
if !svcExists {
err = fmt.Errorf("service %v does not exists", svcKey)
return nil, err
}
svc := svcObj.(*api.Service)
selector := labels.SelectorFromSet(svc.Spec.Selector)
pods, err := ic.podLister.List(selector)
if err != nil {
return nil, fmt.Errorf("Failed to get pod listing: %v", err)
}
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if servicePort.Type == intstr.Int && int(port.ContainerPort) == servicePort.IntValue() ||
servicePort.Type == intstr.String && port.Name == servicePort.String() {
if container.ReadinessProbe != nil {
if container.ReadinessProbe.HTTPGet == nil || container.ReadinessProbe.HTTPGet.Scheme != "HTTP" {
continue
}
return container.ReadinessProbe, nil
}
}
}
}
}
return nil, nil
}
func (ic *GenericController) getUpstreamCheckForProbe(probe *api.Probe) (*ingress.UpstreamCheck, error) {
var headers http.Header = make(http.Header)
for _, header := range probe.HTTPGet.HTTPHeaders {
headers.Add(header.Name, header.Value)
}
headersWriter := new(bytes.Buffer)
headers.Write(headersWriter)
httpSend := httpSendReplacer.Replace(
fmt.Sprintf("GET %s HTTP/1.0\r\n%s\r\n", probe.HTTPGet.Path, string(headersWriter.Bytes())))
return &ingress.UpstreamCheck{
HttpSend: httpSend,
Port: probe.HTTPGet.Port.IntValue(),
Rise: probe.SuccessThreshold,
Fall: probe.FailureThreshold,
TimeoutMillis: probe.TimeoutSeconds * 1000,
IntervalMillis: probe.PeriodSeconds * 1000,
}, nil
}
// serviceEndpoints returns the upstream servers (endpoints) associated
// to a service.
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
@ -985,6 +1070,7 @@ func (ic GenericController) Start() {
go ic.svcController.Run(ic.stopCh)
go ic.secrController.Run(ic.stopCh)
go ic.mapController.Run(ic.stopCh)
go ic.podController.Run(ic.stopCh)
go ic.secretQueue.Run(5*time.Second, ic.stopCh)
go ic.syncQueue.Run(5*time.Second, ic.stopCh)

View file

@ -101,6 +101,16 @@ type BackendInfo struct {
Repository string `json:"repository"`
}
// UpstreamCheck is used to configure ingress health checks
type UpstreamCheck struct {
HttpSend string
Port int
IntervalMillis int32
Fall int32
Rise int32
TimeoutMillis int32
}
// Configuration holds the definition of all the parts required to describe all
// ingresses reachable by the ingress controller (using a filter by namespace)
type Configuration struct {
@ -133,6 +143,8 @@ type Backend struct {
Secure bool `json:"secure"`
// Endpoints contains the list of endpoints currently running
Endpoints []Endpoint `json:"endpoints"`
UpstreamCheck *UpstreamCheck
}
// Endpoint describes a kubernetes endpoint in an backend