Merge pull request #837 from aledbf/cleanup-interface

Cleanup interface
This commit is contained in:
Manuel Alejandro de Brito Fontes 2017-06-12 15:41:46 -04:00 committed by GitHub
commit 72f484e72f
4 changed files with 41 additions and 67 deletions

View file

@ -235,23 +235,6 @@ func (n *NGINXController) start(cmd *exec.Cmd, done chan error) {
}() }()
} }
// Reload checks if the running configuration file is different
// to the specified and reload nginx if required
func (n NGINXController) Reload(data []byte) ([]byte, bool, error) {
if !n.isReloadRequired(data) {
return []byte("Reload not required"), false, nil
}
err := ioutil.WriteFile(cfgPath, data, 0644)
if err != nil {
return nil, false, err
}
o, e := exec.Command(n.binary, "-s", "reload", "-c", cfgPath).CombinedOutput()
return o, true, e
}
// BackendDefaults returns the nginx defaults // BackendDefaults returns the nginx defaults
func (n NGINXController) BackendDefaults() defaults.Backend { func (n NGINXController) BackendDefaults() defaults.Backend {
if n.configmap == nil { if n.configmap == nil {
@ -262,36 +245,35 @@ func (n NGINXController) BackendDefaults() defaults.Backend {
return ngx_template.ReadConfig(n.configmap.Data).Backend return ngx_template.ReadConfig(n.configmap.Data).Backend
} }
// isReloadRequired check if the new configuration file is different // printDiff returns the difference between the running configuration
// from the current one. // and the new one
func (n NGINXController) isReloadRequired(data []byte) bool { func (n NGINXController) printDiff(data []byte) {
in, err := os.Open(cfgPath) in, err := os.Open(cfgPath)
if err != nil { if err != nil {
return false return
} }
src, err := ioutil.ReadAll(in) src, err := ioutil.ReadAll(in)
in.Close() in.Close()
if err != nil { if err != nil {
return false return
} }
if !bytes.Equal(src, data) { if !bytes.Equal(src, data) {
tmpfile, err := ioutil.TempFile("", "nginx-cfg-diff") tmpfile, err := ioutil.TempFile("", "nginx-cfg-diff")
if err != nil { if err != nil {
glog.Errorf("error creating temporal file: %s", err) glog.Errorf("error creating temporal file: %s", err)
return false return
} }
defer tmpfile.Close() defer tmpfile.Close()
err = ioutil.WriteFile(tmpfile.Name(), data, 0644) err = ioutil.WriteFile(tmpfile.Name(), data, 0644)
if err != nil { if err != nil {
return false return
} }
diffOutput, err := diff(src, data) diffOutput, err := diff(src, data)
if err != nil { if err != nil {
glog.Errorf("error computing diff: %s", err) glog.Errorf("error computing diff: %s", err)
return true return
} }
if glog.V(2) { if glog.V(2) {
@ -299,9 +281,7 @@ func (n NGINXController) isReloadRequired(data []byte) bool {
glog.Infof("%v", string(diffOutput)) glog.Infof("%v", string(diffOutput))
} }
os.Remove(tmpfile.Name()) os.Remove(tmpfile.Name())
return len(diffOutput) > 0
} }
return false
} }
// Info return build information // Info return build information
@ -404,7 +384,7 @@ func (n *NGINXController) SetListers(lister ingress.StoreLister) {
// write the configuration file // write the configuration file
// returning nill implies the backend will be reloaded. // returning nill implies the backend will be reloaded.
// if an error is returned means requeue the update // if an error is returned means requeue the update
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) ([]byte, error) { func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
var longestName int var longestName int
var serverNameBytes int var serverNameBytes int
for _, srv := range ingressCfg.Servers { for _, srv := range ingressCfg.Servers {
@ -540,15 +520,29 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) ([]byte, er
Cfg: cfg, Cfg: cfg,
IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6, IsIPV6Enabled: n.isIPV6Enabled && !cfg.DisableIpv6,
}) })
if err != nil { if err != nil {
return nil, err return err
} }
if err := n.testTemplate(content); err != nil { err = n.testTemplate(content)
return nil, err if err != nil {
return err
} }
return content, nil n.printDiff(content)
err = ioutil.WriteFile(cfgPath, content, 0644)
if err != nil {
return err
}
o, e := exec.Command(n.binary, "-s", "reload", "-c", cfgPath).CombinedOutput()
if err != nil {
return fmt.Errorf("%v\n%v", e, string(o))
}
return nil
} }
// nginxHashBucketSize computes the correct nginx hash_bucket_size for a hash with the given longest key // nginxHashBucketSize computes the correct nginx hash_bucket_size for a hash with the given longest key

View file

@ -153,7 +153,7 @@ func newIngressController(config *Configuration) *GenericController {
cfg: config, cfg: config,
stopLock: &sync.Mutex{}, stopLock: &sync.Mutex{},
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.5, 1), syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(0.3, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, api.EventSource{ recorder: eventBroadcaster.NewRecorder(scheme.Scheme, api.EventSource{
Component: "ingress-controller", Component: "ingress-controller",
}), }),
@ -400,27 +400,23 @@ func (ic *GenericController) syncIngress(key interface{}) error {
} }
} }
data, err := ic.cfg.Backend.OnUpdate(ingress.Configuration{ err := ic.cfg.Backend.OnUpdate(ingress.Configuration{
Backends: upstreams, Backends: upstreams,
Servers: servers, Servers: servers,
TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP), TCPEndpoints: ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
UDPEndpoints: ic.getStreamServices(ic.cfg.UDPConfigMapName, api.ProtocolUDP), UDPEndpoints: ic.getStreamServices(ic.cfg.UDPConfigMapName, api.ProtocolUDP),
PassthroughBackends: passUpstreams, PassthroughBackends: passUpstreams,
}) })
if err != nil { if err != nil {
incReloadErrorCount()
glog.Errorf("unexpected failure restarting the backend: \n%v", err)
return err return err
} }
out, reloaded, err := ic.cfg.Backend.Reload(data) glog.Infof("ingress backend successfully reloaded...")
if err != nil { incReloadCount()
incReloadErrorCount()
glog.Errorf("unexpected failure restarting the backend: \n%v", string(out))
return err
}
if reloaded {
glog.Infof("ingress backend successfully reloaded...")
incReloadCount()
}
return nil return nil
} }

View file

@ -50,14 +50,6 @@ type Controller interface {
// controller status // controller status
healthz.HealthzChecker healthz.HealthzChecker
// Reload takes a byte array representing the new loadbalancer configuration,
// and returns a byte array containing any output/errors from the backend and
// if a reload was required.
// Before returning the backend must load the configuration in the given array
// into the loadbalancer and restart it, or fail with an error and message string.
// If reloading fails, there should be not change in the running configuration or
// the given byte array.
Reload(data []byte) ([]byte, bool, error)
// OnUpdate callback invoked from the sync queue https://k8s.io/ingress/core/blob/master/pkg/ingress/controller/controller.go#L387 // OnUpdate callback invoked from the sync queue https://k8s.io/ingress/core/blob/master/pkg/ingress/controller/controller.go#L387
// when an update occurs. This is executed frequently because Ingress // when an update occurs. This is executed frequently because Ingress
// controllers watches changes in: // controllers watches changes in:
@ -78,12 +70,11 @@ type Controller interface {
// servers (FQDN) and all the locations inside each server. Each // servers (FQDN) and all the locations inside each server. Each
// location contains information about all the annotations were configured // location contains information about all the annotations were configured
// https://k8s.io/ingress/core/blob/master/pkg/ingress/types.go#L83 // https://k8s.io/ingress/core/blob/master/pkg/ingress/types.go#L83
// The backend returns the contents of the configuration file or an error // The backend returns an error if was not possible to update the configuration.
// with the reason why was not possible to generate the file.
// //
// The returned configuration is then passed to test, and then to reload // The returned configuration is then passed to test, and then to reload
// if there is no errors. // if there is no errors.
OnUpdate(Configuration) ([]byte, error) OnUpdate(Configuration) error
// ConfigMap content of --configmap // ConfigMap content of --configmap
SetConfig(*api.ConfigMap) SetConfig(*api.ConfigMap)
// SetListers allows the access of store listers present in the generic controller // SetListers allows the access of store listers present in the generic controller

View file

@ -52,20 +52,11 @@ func (dc DummyController) SetConfig(cfgMap *api.ConfigMap) {
log.Printf("Config map %+v", cfgMap) log.Printf("Config map %+v", cfgMap)
} }
func (dc DummyController) Reload(data []byte) ([]byte, bool, error) {
out, err := exec.Command("echo", string(data)).CombinedOutput()
if err != nil {
return out, false, err
}
log.Printf("Reloaded new config %s", out)
return out, true, err
}
func (dc DummyController) Test(file string) *exec.Cmd { func (dc DummyController) Test(file string) *exec.Cmd {
return exec.Command("echo", file) return exec.Command("echo", file)
} }
func (dc DummyController) OnUpdate(updatePayload ingress.Configuration) ([]byte, error) { func (dc DummyController) OnUpdate(updatePayload ingress.Configuration) error {
log.Printf("Received OnUpdate notification") log.Printf("Received OnUpdate notification")
for _, b := range updatePayload.Backends { for _, b := range updatePayload.Backends {
eps := []string{} eps := []string{}
@ -74,7 +65,9 @@ func (dc DummyController) OnUpdate(updatePayload ingress.Configuration) ([]byte,
} }
log.Printf("%v: %v", b.Name, strings.Join(eps, ", ")) log.Printf("%v: %v", b.Name, strings.Join(eps, ", "))
} }
return []byte(`<string containing a configuration file>`), nil
log.Printf("Reloaded new config")
return nil
} }
func (dc DummyController) BackendDefaults() defaults.Backend { func (dc DummyController) BackendDefaults() defaults.Backend {