refactor controllers.go
This commit is contained in:
parent
4481b2ee11
commit
a61017ae4e
1 changed files with 66 additions and 64 deletions
|
@ -461,7 +461,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pcfg := ingress.Configuration{
|
cfg := ingress.Configuration{
|
||||||
Backends: upstreams,
|
Backends: upstreams,
|
||||||
Servers: servers,
|
Servers: servers,
|
||||||
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
|
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
|
||||||
|
@ -469,14 +469,14 @@ func (ic *GenericController) syncIngress(key interface{}) error {
|
||||||
PassthroughBackends: passUpstreams,
|
PassthroughBackends: passUpstreams,
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ic.forceReload && ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg) {
|
if !ic.forceReload && ic.runningConfig != nil && ic.runningConfig.Equal(&cfg) {
|
||||||
glog.V(3).Infof("skipping backend reload (no changes detected)")
|
glog.V(3).Infof("skipping backend reload (no changes detected)")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.Infof("backend reload required")
|
glog.Infof("backend reload required")
|
||||||
|
|
||||||
err := ic.cfg.Backend.OnUpdate(pcfg)
|
err := ic.cfg.Backend.OnUpdate(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
incReloadErrorCount()
|
incReloadErrorCount()
|
||||||
glog.Errorf("unexpected failure restarting the backend: \n%v", err)
|
glog.Errorf("unexpected failure restarting the backend: \n%v", err)
|
||||||
|
@ -487,7 +487,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
|
||||||
incReloadCount()
|
incReloadCount()
|
||||||
setSSLExpireTime(servers)
|
setSSLExpireTime(servers)
|
||||||
|
|
||||||
ic.runningConfig = &pcfg
|
ic.runningConfig = &cfg
|
||||||
ic.forceReload = false
|
ic.forceReload = false
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -913,25 +913,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
|
||||||
upstreams[defBackend] = newUpstream(defBackend)
|
upstreams[defBackend] = newUpstream(defBackend)
|
||||||
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName)
|
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName)
|
||||||
|
|
||||||
// Add the service cluster endpoint as the upstream instead of individual endpoints
|
upstreams = ic.createUpstreamEndpoint(defBackend, svcKey, serviceUpstream, ing, upstreams, hz)
|
||||||
// if the serviceUpstream annotation is enabled
|
|
||||||
if serviceUpstream {
|
|
||||||
endpoint, err := ic.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
|
|
||||||
} else {
|
|
||||||
upstreams[defBackend].Endpoints = []ingress.Endpoint{endpoint}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(upstreams[defBackend].Endpoints) == 0 {
|
|
||||||
endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
|
|
||||||
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("error creating upstream %v: %v", defBackend, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, rule := range ing.Spec.Rules {
|
for _, rule := range ing.Spec.Rules {
|
||||||
|
@ -963,25 +945,7 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
|
||||||
|
|
||||||
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
|
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
|
||||||
|
|
||||||
// Add the service cluster endpoint as the upstream instead of individual endpoints
|
upstreams = ic.createUpstreamEndpoint(name, svcKey, serviceUpstream, ing, upstreams, hz)
|
||||||
// if the serviceUpstream annotation is enabled
|
|
||||||
if serviceUpstream {
|
|
||||||
endpoint, err := ic.getServiceClusterEndpoint(svcKey, &path.Backend)
|
|
||||||
if err != nil {
|
|
||||||
glog.Errorf("failed to get service cluster endpoint for service %s: %v", svcKey, err)
|
|
||||||
} else {
|
|
||||||
upstreams[name].Endpoints = []ingress.Endpoint{endpoint}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(upstreams[name].Endpoints) == 0 {
|
|
||||||
endp, err := ic.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz)
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("error obtaining service endpoints: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
upstreams[name].Endpoints = endp
|
|
||||||
}
|
|
||||||
|
|
||||||
s, exists, err := ic.svcLister.Store.GetByKey(svcKey)
|
s, exists, err := ic.svcLister.Store.GetByKey(svcKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1022,11 +986,11 @@ func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *e
|
||||||
|
|
||||||
// serviceEndpoints returns the upstream servers (endpoints) associated
|
// serviceEndpoints returns the upstream servers (endpoints) associated
|
||||||
// to a service.
|
// to a service.
|
||||||
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
|
func (ic *GenericController) serviceEndpoints(
|
||||||
|
svcKey, backendPort string,
|
||||||
hz *healthcheck.Upstream) ([]ingress.Endpoint, error) {
|
hz *healthcheck.Upstream) ([]ingress.Endpoint, error) {
|
||||||
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
|
|
||||||
|
|
||||||
var upstreams []ingress.Endpoint
|
var upstreams []ingress.Endpoint
|
||||||
|
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
|
return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
|
||||||
}
|
}
|
||||||
|
@ -1068,24 +1032,54 @@ func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
|
||||||
return upstreams, nil
|
return upstreams, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createUpstreamEndpoint returns the upstream servers (endpoints) associated
|
||||||
|
// to a ingress.
|
||||||
|
func (ic *GenericController) createUpstreamEndpoint(
|
||||||
|
upstreamName, svcKey string,
|
||||||
|
serviceUpstream bool,
|
||||||
|
ing *extensions.Ingress,
|
||||||
|
upstreams map[string]*ingress.Backend,
|
||||||
|
hz *healthcheck.Upstream,
|
||||||
|
) map[string]*ingress.Backend {
|
||||||
|
// Add the service cluster endpoint as the upstream instead of individual endpoints
|
||||||
|
// if the serviceUpstream annotation is enabled
|
||||||
|
if serviceUpstream {
|
||||||
|
endpoint, err := ic.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
|
||||||
|
} else {
|
||||||
|
upstreams[upstreamName].Endpoints = []ingress.Endpoint{endpoint}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(upstreams[upstreamName].Endpoints) == 0 {
|
||||||
|
endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
|
||||||
|
upstreams[upstreamName].Endpoints = append(upstreams[upstreamName].Endpoints, endps...)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("error creating upstream %v: %v", upstreamName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return upstreams
|
||||||
|
}
|
||||||
|
|
||||||
// createServers initializes a map that contains information about the list of
|
// createServers initializes a map that contains information about the list of
|
||||||
// FDQN referenced by ingress rules and the common name field in the referenced
|
// FDQN referenced by ingress rules and the common name field in the referenced
|
||||||
// SSL certificates. Each server is configured with location / using a default
|
// SSL certificates. Each server is configured with location / using a default
|
||||||
// backend specified by the user or the one inside the ingress spec.
|
// backend specified by the user or the one inside the ingress spec.
|
||||||
func (ic *GenericController) createServers(data []interface{},
|
func (ic *GenericController) createServers(
|
||||||
upstreams map[string]*ingress.Backend) map[string]*ingress.Server {
|
data []interface{},
|
||||||
servers := make(map[string]*ingress.Server)
|
upstreams map[string]*ingress.Backend,
|
||||||
|
) map[string]*ingress.Server {
|
||||||
bdef := ic.GetDefaultBackend()
|
defBackend := ic.GetDefaultBackend()
|
||||||
ngxProxy := proxy.Configuration{
|
ngxProxy := proxy.Configuration{
|
||||||
BodySize: bdef.ProxyBodySize,
|
BodySize: defBackend.ProxyBodySize,
|
||||||
ConnectTimeout: bdef.ProxyConnectTimeout,
|
ConnectTimeout: defBackend.ProxyConnectTimeout,
|
||||||
SendTimeout: bdef.ProxySendTimeout,
|
SendTimeout: defBackend.ProxySendTimeout,
|
||||||
ReadTimeout: bdef.ProxyReadTimeout,
|
ReadTimeout: defBackend.ProxyReadTimeout,
|
||||||
BufferSize: bdef.ProxyBufferSize,
|
BufferSize: defBackend.ProxyBufferSize,
|
||||||
CookieDomain: bdef.ProxyCookieDomain,
|
CookieDomain: defBackend.ProxyCookieDomain,
|
||||||
CookiePath: bdef.ProxyCookiePath,
|
CookiePath: defBackend.ProxyCookiePath,
|
||||||
NextUpstream: bdef.ProxyNextUpstream,
|
NextUpstream: defBackend.ProxyNextUpstream,
|
||||||
}
|
}
|
||||||
|
|
||||||
defaultPemFileName := fakeCertificatePath
|
defaultPemFileName := fakeCertificatePath
|
||||||
|
@ -1099,7 +1093,9 @@ func (ic *GenericController) createServers(data []interface{},
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize the default server
|
// initialize the default server
|
||||||
du := ic.getDefaultUpstream()
|
du := upstreams[defUpstreamName]
|
||||||
|
servers := make(map[string]*ingress.Server)
|
||||||
|
|
||||||
servers[defServerName] = &ingress.Server{
|
servers[defServerName] = &ingress.Server{
|
||||||
Hostname: defServerName,
|
Hostname: defServerName,
|
||||||
SSLCertificate: defaultPemFileName,
|
SSLCertificate: defaultPemFileName,
|
||||||
|
@ -1112,7 +1108,8 @@ func (ic *GenericController) createServers(data []interface{},
|
||||||
Proxy: ngxProxy,
|
Proxy: ngxProxy,
|
||||||
Service: du.Service,
|
Service: du.Service,
|
||||||
},
|
},
|
||||||
}}
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// initialize all the servers
|
// initialize all the servers
|
||||||
for _, ingIf := range data {
|
for _, ingIf := range data {
|
||||||
|
@ -1123,11 +1120,13 @@ func (ic *GenericController) createServers(data []interface{},
|
||||||
|
|
||||||
// check if ssl passthrough is configured
|
// check if ssl passthrough is configured
|
||||||
sslpt := ic.annotations.SSLPassthrough(ing)
|
sslpt := ic.annotations.SSLPassthrough(ing)
|
||||||
du := ic.getDefaultUpstream()
|
|
||||||
un := du.Name
|
un := du.Name
|
||||||
if ing.Spec.Backend != nil {
|
if ing.Spec.Backend != nil {
|
||||||
// replace default backend
|
// replace default backend
|
||||||
defUpstream := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String())
|
defUpstream := fmt.Sprintf("%v-%v-%v",
|
||||||
|
ing.GetNamespace(),
|
||||||
|
ing.Spec.Backend.ServiceName,
|
||||||
|
ing.Spec.Backend.ServicePort.String())
|
||||||
if backendUpstream, ok := upstreams[defUpstream]; ok {
|
if backendUpstream, ok := upstreams[defUpstream]; ok {
|
||||||
un = backendUpstream.Name
|
un = backendUpstream.Name
|
||||||
}
|
}
|
||||||
|
@ -1153,7 +1152,9 @@ func (ic *GenericController) createServers(data []interface{},
|
||||||
Proxy: ngxProxy,
|
Proxy: ngxProxy,
|
||||||
Service: &api.Service{},
|
Service: &api.Service{},
|
||||||
},
|
},
|
||||||
}, SSLPassthrough: sslpt}
|
},
|
||||||
|
SSLPassthrough: sslpt,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1241,7 +1242,8 @@ func (ic *GenericController) getEndpoints(
|
||||||
s *api.Service,
|
s *api.Service,
|
||||||
servicePort *api.ServicePort,
|
servicePort *api.ServicePort,
|
||||||
proto api.Protocol,
|
proto api.Protocol,
|
||||||
hz *healthcheck.Upstream) []ingress.Endpoint {
|
hz *healthcheck.Upstream,
|
||||||
|
) []ingress.Endpoint {
|
||||||
|
|
||||||
upsServers := []ingress.Endpoint{}
|
upsServers := []ingress.Endpoint{}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue