Cleaner Ingress processing code
This commit is contained in:
parent
6cb0e41737
commit
5ed462db09
4 changed files with 74 additions and 67 deletions
124
controllers/nginx-third-party/controller.go
vendored
124
controllers/nginx-third-party/controller.go
vendored
|
@ -242,9 +242,9 @@ func (lbc *loadBalancerController) sync() {
|
||||||
lbc.nginx.CheckAndReload(ngxConfig, upstreams, servers, tcpServices)
|
lbc.nginx.CheckAndReload(ngxConfig, upstreams, servers, tcpServices)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]nginx.Upstream, []nginx.Server) {
|
func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*nginx.Upstream, []*nginx.Server) {
|
||||||
upstreams := make(map[string]nginx.Upstream)
|
upstreams := lbc.createUpstreams(data)
|
||||||
servers := make(map[string]nginx.Server)
|
servers := lbc.createServers(data)
|
||||||
|
|
||||||
for _, ingIf := range data {
|
for _, ingIf := range data {
|
||||||
ing := ingIf.(*extensions.Ingress)
|
ing := ingIf.(*extensions.Ingress)
|
||||||
|
@ -254,16 +254,12 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]ngi
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
server := servers[rule.Host]
|
||||||
|
var locations []nginx.Location
|
||||||
|
|
||||||
for _, path := range rule.HTTP.Paths {
|
for _, path := range rule.HTTP.Paths {
|
||||||
name := ing.GetNamespace() + "-" + path.Backend.ServiceName
|
upsName := ing.GetNamespace() + "-" + path.Backend.ServiceName
|
||||||
|
ups := upstreams[upsName]
|
||||||
var ups nginx.Upstream
|
|
||||||
|
|
||||||
if existent, ok := upstreams[name]; ok {
|
|
||||||
ups = existent
|
|
||||||
} else {
|
|
||||||
ups = nginx.NewUpstream(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
svcKey := ing.GetNamespace() + "/" + path.Backend.ServiceName
|
svcKey := ing.GetNamespace() + "/" + path.Backend.ServiceName
|
||||||
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
|
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
|
||||||
|
@ -291,58 +287,24 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]ngi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
upstreams[name] = ups
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pems := lbc.getPemsFromIngress(data)
|
|
||||||
|
|
||||||
for _, rule := range ing.Spec.Rules {
|
|
||||||
var server nginx.Server
|
|
||||||
if existent, ok := servers[rule.Host]; ok {
|
|
||||||
server = existent
|
|
||||||
} else {
|
|
||||||
server = nginx.Server{Name: rule.Host}
|
|
||||||
}
|
|
||||||
|
|
||||||
if pemFile, ok := pems[rule.Host]; ok {
|
|
||||||
server.SSL = true
|
|
||||||
server.SSLCertificate = pemFile
|
|
||||||
server.SSLCertificateKey = pemFile
|
|
||||||
}
|
|
||||||
|
|
||||||
var locations []nginx.Location
|
|
||||||
|
|
||||||
for _, path := range rule.HTTP.Paths {
|
|
||||||
loc := nginx.Location{Path: path.Path}
|
|
||||||
upsName := ing.GetNamespace() + "-" + path.Backend.ServiceName
|
|
||||||
|
|
||||||
svcKey := ing.GetNamespace() + "/" + path.Backend.ServiceName
|
|
||||||
_, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
|
|
||||||
if err != nil {
|
|
||||||
glog.Infof("error getting service %v from the cache: %v", svcKey, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !svcExists {
|
|
||||||
glog.Warningf("service %v does no exists. skipping Ingress rule", svcKey)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, ups := range upstreams {
|
for _, ups := range upstreams {
|
||||||
if upsName == ups.Name {
|
if upsName == ups.Name {
|
||||||
loc.Upstream = ups
|
loc := nginx.Location{Path: path.Path}
|
||||||
}
|
loc.Upstream = *ups
|
||||||
}
|
|
||||||
locations = append(locations, loc)
|
locations = append(locations, loc)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
server.Locations = append(server.Locations, locations...)
|
server.Locations = append(server.Locations, locations...)
|
||||||
servers[rule.Host] = server
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
aUpstreams := make([]nginx.Upstream, 0, len(upstreams))
|
// TODO: find a way to make this more readable
|
||||||
|
// The structs must be ordered to always generate the same file
|
||||||
|
// if the content does not change.
|
||||||
|
aUpstreams := make([]*nginx.Upstream, 0, len(upstreams))
|
||||||
for _, value := range upstreams {
|
for _, value := range upstreams {
|
||||||
if len(value.Backends) == 0 {
|
if len(value.Backends) == 0 {
|
||||||
value.Backends = append(value.Backends, nginx.NewDefaultServer())
|
value.Backends = append(value.Backends, nginx.NewDefaultServer())
|
||||||
|
@ -352,7 +314,7 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]ngi
|
||||||
}
|
}
|
||||||
sort.Sort(nginx.UpstreamByNameServers(aUpstreams))
|
sort.Sort(nginx.UpstreamByNameServers(aUpstreams))
|
||||||
|
|
||||||
aServers := make([]nginx.Server, 0, len(servers))
|
aServers := make([]*nginx.Server, 0, len(servers))
|
||||||
for _, value := range servers {
|
for _, value := range servers {
|
||||||
sort.Sort(nginx.LocationByPath(value.Locations))
|
sort.Sort(nginx.LocationByPath(value.Locations))
|
||||||
aServers = append(aServers, value)
|
aServers = append(aServers, value)
|
||||||
|
@ -362,6 +324,54 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]ngi
|
||||||
return aUpstreams, aServers
|
return aUpstreams, aServers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (lbc *loadBalancerController) createUpstreams(data []interface{}) map[string]*nginx.Upstream {
|
||||||
|
upstreams := make(map[string]*nginx.Upstream)
|
||||||
|
|
||||||
|
for _, ingIf := range data {
|
||||||
|
ing := ingIf.(*extensions.Ingress)
|
||||||
|
|
||||||
|
for _, rule := range ing.Spec.Rules {
|
||||||
|
if rule.IngressRuleValue.HTTP == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, path := range rule.HTTP.Paths {
|
||||||
|
name := ing.GetNamespace() + "-" + path.Backend.ServiceName
|
||||||
|
if _, ok := upstreams[name]; !ok {
|
||||||
|
upstreams[name] = nginx.NewUpstream(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return upstreams
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lbc *loadBalancerController) createServers(data []interface{}) map[string]*nginx.Server {
|
||||||
|
servers := make(map[string]*nginx.Server)
|
||||||
|
|
||||||
|
pems := lbc.getPemsFromIngress(data)
|
||||||
|
|
||||||
|
for _, ingIf := range data {
|
||||||
|
ing := ingIf.(*extensions.Ingress)
|
||||||
|
|
||||||
|
for _, rule := range ing.Spec.Rules {
|
||||||
|
if _, ok := servers[rule.Host]; !ok {
|
||||||
|
servers[rule.Host] = &nginx.Server{Name: rule.Host}
|
||||||
|
}
|
||||||
|
|
||||||
|
if pemFile, ok := pems[rule.Host]; ok {
|
||||||
|
server := servers[rule.Host]
|
||||||
|
server.SSL = true
|
||||||
|
server.SSLCertificate = pemFile
|
||||||
|
server.SSLCertificateKey = pemFile
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return servers
|
||||||
|
}
|
||||||
|
|
||||||
func (lbc *loadBalancerController) getPemsFromIngress(data []interface{}) map[string]string {
|
func (lbc *loadBalancerController) getPemsFromIngress(data []interface{}) map[string]string {
|
||||||
pems := make(map[string]string)
|
pems := make(map[string]string)
|
||||||
|
|
||||||
|
@ -462,7 +472,7 @@ func (lbc *loadBalancerController) Run() {
|
||||||
go lbc.svcController.Run(lbc.stopCh)
|
go lbc.svcController.Run(lbc.stopCh)
|
||||||
|
|
||||||
// periodic check for changes in configuration
|
// periodic check for changes in configuration
|
||||||
go wait.Until(lbc.sync, 10*time.Second, wait.NeverStop)
|
go wait.Until(lbc.sync, 5*time.Second, wait.NeverStop)
|
||||||
|
|
||||||
<-lbc.stopCh
|
<-lbc.stopCh
|
||||||
glog.Infof("shutting down NGINX loadbalancer controller")
|
glog.Infof("shutting down NGINX loadbalancer controller")
|
||||||
|
|
|
@ -54,7 +54,7 @@ func (ngx *NginxManager) Start() {
|
||||||
// shut down, stop accepting new connections and continue to service current requests
|
// shut down, stop accepting new connections and continue to service current requests
|
||||||
// until all such requests are serviced. After that, the old worker processes exit.
|
// until all such requests are serviced. After that, the old worker processes exit.
|
||||||
// http://nginx.org/en/docs/beginners_guide.html#control
|
// http://nginx.org/en/docs/beginners_guide.html#control
|
||||||
func (ngx *NginxManager) CheckAndReload(cfg *nginxConfiguration, upstreams []Upstream, servers []Server, servicesL4 []Service) {
|
func (ngx *NginxManager) CheckAndReload(cfg *nginxConfiguration, upstreams []*Upstream, servers []*Server, servicesL4 []Service) {
|
||||||
ngx.reloadLock.Lock()
|
ngx.reloadLock.Lock()
|
||||||
defer ngx.reloadLock.Unlock()
|
defer ngx.reloadLock.Unlock()
|
||||||
|
|
||||||
|
|
15
controllers/nginx-third-party/nginx/nginx.go
vendored
15
controllers/nginx-third-party/nginx/nginx.go
vendored
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||||
package nginx
|
package nginx
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
@ -35,7 +36,7 @@ type Upstream struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpstreamByNameServers Upstream sorter by name
|
// UpstreamByNameServers Upstream sorter by name
|
||||||
type UpstreamByNameServers []Upstream
|
type UpstreamByNameServers []*Upstream
|
||||||
|
|
||||||
func (c UpstreamByNameServers) Len() int { return len(c) }
|
func (c UpstreamByNameServers) Len() int { return len(c) }
|
||||||
func (c UpstreamByNameServers) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
func (c UpstreamByNameServers) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
||||||
|
@ -76,7 +77,7 @@ type Server struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerByName Server sorter by name
|
// ServerByName Server sorter by name
|
||||||
type ServerByName []Server
|
type ServerByName []*Server
|
||||||
|
|
||||||
func (c ServerByName) Len() int { return len(c) }
|
func (c ServerByName) Len() int { return len(c) }
|
||||||
func (c ServerByName) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
func (c ServerByName) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
||||||
|
@ -105,8 +106,8 @@ func NewDefaultServer() UpstreamServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewUpstream creates an upstream without servers.
|
// NewUpstream creates an upstream without servers.
|
||||||
func NewUpstream(name string) Upstream {
|
func NewUpstream(name string) *Upstream {
|
||||||
return Upstream{
|
return &Upstream{
|
||||||
Name: name,
|
Name: name,
|
||||||
Backends: []UpstreamServer{},
|
Backends: []UpstreamServer{},
|
||||||
}
|
}
|
||||||
|
@ -122,11 +123,7 @@ func (nginx *NginxManager) AddOrUpdateCertAndKey(name string, cert string, key s
|
||||||
}
|
}
|
||||||
defer pem.Close()
|
defer pem.Close()
|
||||||
|
|
||||||
_, err = pem.WriteString(string(key))
|
_, err = pem.WriteString(fmt.Sprintf("%v\n%v", key, cert))
|
||||||
if err != nil {
|
|
||||||
glog.Fatalf("Couldn't write to pem file %v: %v", pemFileName, err)
|
|
||||||
}
|
|
||||||
_, err = pem.WriteString(string(cert))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Couldn't write to pem file %v: %v", pemFileName, err)
|
glog.Fatalf("Couldn't write to pem file %v: %v", pemFileName, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ func (ngx *NginxManager) loadTemplate() {
|
||||||
ngx.template = tmpl
|
ngx.template = tmpl
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ngx *NginxManager) writeCfg(cfg *nginxConfiguration, upstreams []Upstream, servers []Server, servicesL4 []Service) (bool, error) {
|
func (ngx *NginxManager) writeCfg(cfg *nginxConfiguration, upstreams []*Upstream, servers []*Server, servicesL4 []Service) (bool, error) {
|
||||||
fromMap := structs.Map(cfg)
|
fromMap := structs.Map(cfg)
|
||||||
toMap := structs.Map(ngx.defCfg)
|
toMap := structs.Map(ngx.defCfg)
|
||||||
curNginxCfg := merge(toMap, fromMap)
|
curNginxCfg := merge(toMap, fromMap)
|
||||||
|
|
Loading…
Reference in a new issue