Merge pull request #1350 from kubernetes/revert-1345-example

Revert "refactor controllers.go"
This commit is contained in:
Manuel Alejandro de Brito Fontes 2017-09-13 12:16:29 -07:00 committed by GitHub
commit 6cca4c5cdd

View file

@ -461,7 +461,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
} }
} }
cfg := ingress.Configuration{ pcfg := ingress.Configuration{
Backends: upstreams, Backends: upstreams,
Servers: servers, Servers: servers,
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP), TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
@ -469,14 +469,14 @@ func (ic *GenericController) syncIngress(key interface{}) error {
PassthroughBackends: passUpstreams, PassthroughBackends: passUpstreams,
} }
if !ic.forceReload && ic.runningConfig != nil && ic.runningConfig.Equal(&cfg) { if !ic.forceReload && ic.runningConfig != nil && ic.runningConfig.Equal(&pcfg) {
glog.V(3).Infof("skipping backend reload (no changes detected)") glog.V(3).Infof("skipping backend reload (no changes detected)")
return nil return nil
} }
glog.Infof("backend reload required") glog.Infof("backend reload required")
err := ic.cfg.Backend.OnUpdate(cfg) err := ic.cfg.Backend.OnUpdate(pcfg)
if err != nil { if err != nil {
incReloadErrorCount() incReloadErrorCount()
glog.Errorf("unexpected failure restarting the backend: \n%v", err) glog.Errorf("unexpected failure restarting the backend: \n%v", err)
@ -487,7 +487,7 @@ func (ic *GenericController) syncIngress(key interface{}) error {
incReloadCount() incReloadCount()
setSSLExpireTime(servers) setSSLExpireTime(servers)
ic.runningConfig = &cfg ic.runningConfig = &pcfg
ic.forceReload = false ic.forceReload = false
return nil return nil
@ -913,7 +913,25 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
upstreams[defBackend] = newUpstream(defBackend) upstreams[defBackend] = newUpstream(defBackend)
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName) svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName)
upstreams = ic.createUpstreamEndpoint(defBackend, svcKey, serviceUpstream, ing, upstreams, hz) // Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
if err != nil {
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
upstreams[defBackend].Endpoints = []ingress.Endpoint{endpoint}
}
}
if len(upstreams[defBackend].Endpoints) == 0 {
endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", defBackend, err)
}
}
} }
for _, rule := range ing.Spec.Rules { for _, rule := range ing.Spec.Rules {
@ -945,7 +963,25 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName) svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
upstreams = ic.createUpstreamEndpoint(name, svcKey, serviceUpstream, ing, upstreams, hz) // Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, &path.Backend)
if err != nil {
glog.Errorf("failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
upstreams[name].Endpoints = []ingress.Endpoint{endpoint}
}
}
if len(upstreams[name].Endpoints) == 0 {
endp, err := ic.serviceEndpoints(svcKey, path.Backend.ServicePort.String(), hz)
if err != nil {
glog.Warningf("error obtaining service endpoints: %v", err)
continue
}
upstreams[name].Endpoints = endp
}
s, exists, err := ic.svcLister.Store.GetByKey(svcKey) s, exists, err := ic.svcLister.Store.GetByKey(svcKey)
if err != nil { if err != nil {
@ -986,11 +1022,11 @@ func (ic *GenericController) getServiceClusterEndpoint(svcKey string, backend *e
// serviceEndpoints returns the upstream servers (endpoints) associated // serviceEndpoints returns the upstream servers (endpoints) associated
// to a service. // to a service.
func (ic *GenericController) serviceEndpoints( func (ic *GenericController) serviceEndpoints(svcKey, backendPort string,
svcKey, backendPort string,
hz *healthcheck.Upstream) ([]ingress.Endpoint, error) { hz *healthcheck.Upstream) ([]ingress.Endpoint, error) {
var upstreams []ingress.Endpoint
svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey) svcObj, svcExists, err := ic.svcLister.Store.GetByKey(svcKey)
var upstreams []ingress.Endpoint
if err != nil { if err != nil {
return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err) return upstreams, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err)
} }
@ -1032,54 +1068,24 @@ func (ic *GenericController) serviceEndpoints(
return upstreams, nil return upstreams, nil
} }
// createUpstreamEndpoint returns the upstream servers (endpoints) associated
// to a ingress.
func (ic *GenericController) createUpstreamEndpoint(
upstreamName, svcKey string,
serviceUpstream bool,
ing *extensions.Ingress,
upstreams map[string]*ingress.Backend,
hz *healthcheck.Upstream,
) map[string]*ingress.Backend {
// Add the service cluster endpoint as the upstream instead of individual endpoints
// if the serviceUpstream annotation is enabled
if serviceUpstream {
endpoint, err := ic.getServiceClusterEndpoint(svcKey, ing.Spec.Backend)
if err != nil {
glog.Errorf("Failed to get service cluster endpoint for service %s: %v", svcKey, err)
} else {
upstreams[upstreamName].Endpoints = []ingress.Endpoint{endpoint}
}
}
if len(upstreams[upstreamName].Endpoints) == 0 {
endps, err := ic.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String(), hz)
upstreams[upstreamName].Endpoints = append(upstreams[upstreamName].Endpoints, endps...)
if err != nil {
glog.Warningf("error creating upstream %v: %v", upstreamName, err)
}
}
return upstreams
}
// createServers initializes a map that contains information about the list of // createServers initializes a map that contains information about the list of
// FDQN referenced by ingress rules and the common name field in the referenced // FDQN referenced by ingress rules and the common name field in the referenced
// SSL certificates. Each server is configured with location / using a default // SSL certificates. Each server is configured with location / using a default
// backend specified by the user or the one inside the ingress spec. // backend specified by the user or the one inside the ingress spec.
func (ic *GenericController) createServers( func (ic *GenericController) createServers(data []interface{},
data []interface{}, upstreams map[string]*ingress.Backend) map[string]*ingress.Server {
upstreams map[string]*ingress.Backend, servers := make(map[string]*ingress.Server)
) map[string]*ingress.Server {
defBackend := ic.GetDefaultBackend() bdef := ic.GetDefaultBackend()
ngxProxy := proxy.Configuration{ ngxProxy := proxy.Configuration{
BodySize: defBackend.ProxyBodySize, BodySize: bdef.ProxyBodySize,
ConnectTimeout: defBackend.ProxyConnectTimeout, ConnectTimeout: bdef.ProxyConnectTimeout,
SendTimeout: defBackend.ProxySendTimeout, SendTimeout: bdef.ProxySendTimeout,
ReadTimeout: defBackend.ProxyReadTimeout, ReadTimeout: bdef.ProxyReadTimeout,
BufferSize: defBackend.ProxyBufferSize, BufferSize: bdef.ProxyBufferSize,
CookieDomain: defBackend.ProxyCookieDomain, CookieDomain: bdef.ProxyCookieDomain,
CookiePath: defBackend.ProxyCookiePath, CookiePath: bdef.ProxyCookiePath,
NextUpstream: defBackend.ProxyNextUpstream, NextUpstream: bdef.ProxyNextUpstream,
} }
defaultPemFileName := fakeCertificatePath defaultPemFileName := fakeCertificatePath
@ -1093,9 +1099,7 @@ func (ic *GenericController) createServers(
} }
// initialize the default server // initialize the default server
du := upstreams[defUpstreamName] du := ic.getDefaultUpstream()
servers := make(map[string]*ingress.Server)
servers[defServerName] = &ingress.Server{ servers[defServerName] = &ingress.Server{
Hostname: defServerName, Hostname: defServerName,
SSLCertificate: defaultPemFileName, SSLCertificate: defaultPemFileName,
@ -1108,8 +1112,7 @@ func (ic *GenericController) createServers(
Proxy: ngxProxy, Proxy: ngxProxy,
Service: du.Service, Service: du.Service,
}, },
}, }}
}
// initialize all the servers // initialize all the servers
for _, ingIf := range data { for _, ingIf := range data {
@ -1120,13 +1123,11 @@ func (ic *GenericController) createServers(
// check if ssl passthrough is configured // check if ssl passthrough is configured
sslpt := ic.annotations.SSLPassthrough(ing) sslpt := ic.annotations.SSLPassthrough(ing)
du := ic.getDefaultUpstream()
un := du.Name un := du.Name
if ing.Spec.Backend != nil { if ing.Spec.Backend != nil {
// replace default backend // replace default backend
defUpstream := fmt.Sprintf("%v-%v-%v", defUpstream := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort.String())
ing.GetNamespace(),
ing.Spec.Backend.ServiceName,
ing.Spec.Backend.ServicePort.String())
if backendUpstream, ok := upstreams[defUpstream]; ok { if backendUpstream, ok := upstreams[defUpstream]; ok {
un = backendUpstream.Name un = backendUpstream.Name
} }
@ -1152,9 +1153,7 @@ func (ic *GenericController) createServers(
Proxy: ngxProxy, Proxy: ngxProxy,
Service: &api.Service{}, Service: &api.Service{},
}, },
}, }, SSLPassthrough: sslpt}
SSLPassthrough: sslpt,
}
} }
} }
@ -1242,8 +1241,7 @@ func (ic *GenericController) getEndpoints(
s *api.Service, s *api.Service,
servicePort *api.ServicePort, servicePort *api.ServicePort,
proto api.Protocol, proto api.Protocol,
hz *healthcheck.Upstream, hz *healthcheck.Upstream) []ingress.Endpoint {
) []ingress.Endpoint {
upsServers := []ingress.Endpoint{} upsServers := []ingress.Endpoint{}