From 371f898d5320f6391da7d998a99c81e455ef6df7 Mon Sep 17 00:00:00 2001 From: hzxuzhonghu Date: Wed, 13 Sep 2017 12:07:30 +0800 Subject: [PATCH] refactor controllers.go --- core/pkg/ingress/controller/controller.go | 86 +++++++++++------------ 1 file changed, 42 insertions(+), 44 deletions(-) diff --git a/core/pkg/ingress/controller/controller.go b/core/pkg/ingress/controller/controller.go index 818d3e8bd..554c96f0d 100644 --- a/core/pkg/ingress/controller/controller.go +++ b/core/pkg/ingress/controller/controller.go @@ -277,7 +277,7 @@ func (ic *GenericController) syncIngress(key interface{}) error { } } - pcfg := ingress.Configuration{ + cfg := ingress.Configuration{ Backends: upstreams, Servers: servers, TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, apiv1.ProtocolTCP), @@ -285,14 +285,14 @@ func (ic *GenericController) syncIngress(key interface{}) error { PassthroughBackends: passUpstreams, } - if !ic.isForceReload() && ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg) { + if !ic.isForceReload() && ic.runningConfig != nil && ic.runningConfig.Equal(&cfg) { glog.V(3).Infof("skipping backend reload (no changes detected)") return nil } glog.Infof("backend reload required") - err := ic.cfg.Backend.OnUpdate(pcfg) + err := ic.cfg.Backend.OnUpdate(cfg) if err != nil { incReloadErrorCount() glog.Errorf("unexpected failure restarting the backend: \n%v", err) @@ -303,7 +303,7 @@ func (ic *GenericController) syncIngress(key interface{}) error { incReloadCount() setSSLExpireTime(servers) - ic.runningConfig = &pcfg + ic.runningConfig = &cfg ic.setForceReload(false) return nil @@ -719,25 +719,7 @@ func (ic *GenericController) createUpstreams(data []interface{}, du *ingress.Bac upstreams[defBackend] = newUpstream(defBackend) svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName) - // Add the service cluster endpoint as the upstream instead of individual endpoints - // if the serviceUpstream annotation is enabled - if serviceUpstream { - endpoint, err := ic.getServiceClusterEndpoint(svcKey, ing.Spec.Backend) - if err != nil { - glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err) - } else { - upstreams[defBackend].Endpoints = []ingress.Endpoint{endpoint} - } - } - - if len(upstreams[defBackend].Endpoints) == 0 { - endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz) - upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...) - if err != nil { - glog.Warningf("error creating upstream %v: %v", defBackend, err) - } - } - + upstreams = ic.createUpstreamEndpoint(defBackend, svcKey, serviceUpstream, ing.Spec.Backend, upstreams, hz) } for _, rule := range ing.Spec.Rules { @@ -769,25 +751,7 @@ func (ic *GenericController) createUpstreams(data []interface{}, du *ingress.Bac svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName) - // Add the service cluster endpoint as the upstream instead of individual endpoints - // if the serviceUpstream annotation is enabled - if serviceUpstream { - endpoint, err := ic.getServiceClusterEndpoint(svcKey, &path.Backend) - if err != nil { - glog.Errorf("failed to get service cluster endpoint for service %s: %v", svcKey, err) - } else { - upstreams[name].Endpoints = []ingress.Endpoint{endpoint} - } - } - - if len(upstreams[name].Endpoints) == 0 { - endp, err := ic.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz) - if err != nil { - glog.Warningf("error obtaining service endpoints: %v", err) - continue - } - upstreams[name].Endpoints = endp - } + upstreams = ic.createUpstreamEndpoint(name, svcKey, serviceUpstream, &path.Backend, upstreams, hz) s, err := ic.listers.Service.GetByName(svcKey) if err != nil { @@ -871,6 +835,36 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, return upstreams, nil } +// createUpstreamEndpoint returns the upstream servers (endpoints) associated +// to a ingress. +func (ic *GenericController) createUpstreamEndpoint( + upstreamName, svcKey string, + serviceUpstream bool, + backend *extensions.IngressBackend, + upstreams map[string]*ingress.Backend, + hz *healthcheck.Upstream, +) map[string]*ingress.Backend { + // Add the service cluster endpoint as the upstream instead of individual endpoints + // if the serviceUpstream annotation is enabled + if serviceUpstream { + endpoint, err := ic.getServiceClusterEndpoint(svcKey, backend) + if err != nil { + glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err) + } else { + upstreams[upstreamName].Endpoints = []ingress.Endpoint{endpoint} + } + } + + if len(upstreams[upstreamName].Endpoints) == 0 { + endps, err := ic.serviceEndpoints(svcKey, backend.ServicePort.String(), hz) + upstreams[upstreamName].Endpoints = append(upstreams[upstreamName].Endpoints, endps...) + if err != nil { + glog.Warningf("error creating upstream %v: %v", upstreamName, err) + } + } + return upstreams +} + // createServers initializes a map that contains information about the list of // FDQN referenced by ingress rules and the common name field in the referenced // SSL certificates. Each server is configured with location / using a default @@ -920,7 +914,8 @@ func (ic *GenericController) createServers(data []interface{}, Proxy: ngxProxy, Service: du.Service, }, - }} + }, + } // initialize all the servers for _, ingIf := range data { @@ -937,7 +932,10 @@ func (ic *GenericController) createServers(data []interface{}, if ing.Spec.Backend != nil { // replace default backend - defUpstream := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String()) + defUpstream := fmt.Sprintf("%v-%v-%v", + ing.GetNamespace(), + ing.Spec.Backend.ServiceName, + ing.Spec.Backend.ServicePort.String()) if backendUpstream, ok := upstreams[defUpstream]; ok { un = backendUpstream.Name