Merge pull request #3505 from Shopify/watch-pod-lua
Update lua configuration_data when number of controller pod change
This commit is contained in:
commit
ee3a8fe581
10 changed files with 113 additions and 19 deletions
|
@ -163,6 +163,7 @@ func (n *NGINXController) syncIngress(interface{}) error {
|
||||||
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
|
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
|
||||||
PassthroughBackends: passUpstreams,
|
PassthroughBackends: passUpstreams,
|
||||||
BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum,
|
BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum,
|
||||||
|
ControllerPodsCount: n.store.GetRunningControllerPodsCount(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.runningConfig.Equal(pcfg) {
|
if n.runningConfig.Equal(pcfg) {
|
||||||
|
|
|
@ -754,6 +754,8 @@ func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configurati
|
||||||
|
|
||||||
copyOfRunningConfig.Backends = []*ingress.Backend{}
|
copyOfRunningConfig.Backends = []*ingress.Backend{}
|
||||||
copyOfPcfg.Backends = []*ingress.Backend{}
|
copyOfPcfg.Backends = []*ingress.Backend{}
|
||||||
|
copyOfRunningConfig.ControllerPodsCount = 0
|
||||||
|
copyOfPcfg.ControllerPodsCount = 0
|
||||||
|
|
||||||
if n.cfg.DynamicCertificatesEnabled {
|
if n.cfg.DynamicCertificatesEnabled {
|
||||||
clearCertificates(©OfRunningConfig)
|
clearCertificates(©OfRunningConfig)
|
||||||
|
@ -827,6 +829,14 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
url = fmt.Sprintf("http://localhost:%d/configuration/general", port)
|
||||||
|
err = post(url, &ingress.GeneralConfig{
|
||||||
|
ControllerPodsCount: pcfg.ControllerPodsCount,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if isDynamicCertificatesEnabled {
|
if isDynamicCertificatesEnabled {
|
||||||
err = configureCertificates(pcfg, port)
|
err = configureCertificates(pcfg, port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -207,6 +207,7 @@ func TestConfigureDynamically(t *testing.T) {
|
||||||
commonConfig := &ingress.Configuration{
|
commonConfig := &ingress.Configuration{
|
||||||
Backends: backends,
|
Backends: backends,
|
||||||
Servers: servers,
|
Servers: servers,
|
||||||
|
ControllerPodsCount: 2,
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -221,6 +222,10 @@ func TestConfigureDynamically(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
body := string(b)
|
body := string(b)
|
||||||
|
|
||||||
|
switch r.URL.Path {
|
||||||
|
case "/configuration/backends":
|
||||||
|
{
|
||||||
if strings.Contains(body, "target") {
|
if strings.Contains(body, "target") {
|
||||||
t.Errorf("unexpected target reference in JSON content: %v", body)
|
t.Errorf("unexpected target reference in JSON content: %v", body)
|
||||||
}
|
}
|
||||||
|
@ -228,6 +233,16 @@ func TestConfigureDynamically(t *testing.T) {
|
||||||
if !strings.Contains(body, "service") {
|
if !strings.Contains(body, "service") {
|
||||||
t.Errorf("service reference should be present in JSON content: %v", body)
|
t.Errorf("service reference should be present in JSON content: %v", body)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
case "/configuration/general":
|
||||||
|
{
|
||||||
|
if !strings.Contains(body, "controllerPodsCount") {
|
||||||
|
t.Errorf("controllerPodsCount should be present in JSON content: %v", body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
t.Errorf("unknown request to %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
|
|
@ -76,8 +76,8 @@ type Storer interface {
|
||||||
// ListIngresses returns a list of all Ingresses in the store.
|
// ListIngresses returns a list of all Ingresses in the store.
|
||||||
ListIngresses() []*ingress.Ingress
|
ListIngresses() []*ingress.Ingress
|
||||||
|
|
||||||
// ListControllerPods returns a list of ingress-nginx controller Pods.
|
// GetRunningControllerPodsCount returns the number of Running ingress-nginx controller Pods.
|
||||||
ListControllerPods() []*corev1.Pod
|
GetRunningControllerPodsCount() int
|
||||||
|
|
||||||
// GetLocalSSLCert returns the local copy of a SSLCert
|
// GetLocalSSLCert returns the local copy of a SSLCert
|
||||||
GetLocalSSLCert(name string) (*ingress.SSLCert, error)
|
GetLocalSSLCert(name string) (*ingress.SSLCert, error)
|
||||||
|
@ -288,12 +288,10 @@ func New(checkOCSP bool,
|
||||||
store.informers.Pod = cache.NewSharedIndexInformer(
|
store.informers.Pod = cache.NewSharedIndexInformer(
|
||||||
&cache.ListWatch{
|
&cache.ListWatch{
|
||||||
ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {
|
ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {
|
||||||
|
|
||||||
options.LabelSelector = labelSelector.String()
|
options.LabelSelector = labelSelector.String()
|
||||||
return client.CoreV1().Pods(store.pod.Namespace).List(options)
|
return client.CoreV1().Pods(store.pod.Namespace).List(options)
|
||||||
},
|
},
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
|
||||||
options.LabelSelector = labelSelector.String()
|
options.LabelSelector = labelSelector.String()
|
||||||
return client.CoreV1().Pods(store.pod.Namespace).Watch(options)
|
return client.CoreV1().Pods(store.pod.Namespace).Watch(options)
|
||||||
},
|
},
|
||||||
|
@ -832,9 +830,9 @@ func (s *k8sStore) Run(stopCh chan struct{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListControllerPods returns a list of ingress-nginx controller Pods
|
// GetRunningControllerPodsCount returns the number of Running ingress-nginx controller Pods
|
||||||
func (s *k8sStore) ListControllerPods() []*corev1.Pod {
|
func (s k8sStore) GetRunningControllerPodsCount() int {
|
||||||
var pods []*corev1.Pod
|
count := 0
|
||||||
|
|
||||||
for _, i := range s.listers.Pod.List() {
|
for _, i := range s.listers.Pod.List() {
|
||||||
pod := i.(*corev1.Pod)
|
pod := i.(*corev1.Pod)
|
||||||
|
@ -843,8 +841,8 @@ func (s *k8sStore) ListControllerPods() []*corev1.Pod {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
pods = append(pods, pod)
|
count++
|
||||||
}
|
}
|
||||||
|
|
||||||
return pods
|
return count
|
||||||
}
|
}
|
||||||
|
|
|
@ -1062,7 +1062,7 @@ func TestWriteSSLSessionTicketKey(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListControllerPods(t *testing.T) {
|
func TestGetRunningControllerPodsCount(t *testing.T) {
|
||||||
os.Setenv("POD_NAMESPACE", "testns")
|
os.Setenv("POD_NAMESPACE", "testns")
|
||||||
os.Setenv("POD_NAME", "ingress-1")
|
os.Setenv("POD_NAME", "ingress-1")
|
||||||
|
|
||||||
|
@ -1117,8 +1117,8 @@ func TestListControllerPods(t *testing.T) {
|
||||||
}
|
}
|
||||||
s.listers.Pod.Add(pod)
|
s.listers.Pod.Add(pod)
|
||||||
|
|
||||||
pods := s.ListControllerPods()
|
podsCount := s.GetRunningControllerPodsCount()
|
||||||
if s := len(pods); s != 2 {
|
if podsCount != 2 {
|
||||||
t.Errorf("Expected 1 controller Pods but got %v", s)
|
t.Errorf("Expected 1 controller Pods but got %v", s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,6 +71,9 @@ type Configuration struct {
|
||||||
|
|
||||||
// ConfigurationChecksum contains the particular checksum of a Configuration object
|
// ConfigurationChecksum contains the particular checksum of a Configuration object
|
||||||
ConfigurationChecksum string `json:"configurationChecksum,omitempty"`
|
ConfigurationChecksum string `json:"configurationChecksum,omitempty"`
|
||||||
|
|
||||||
|
// ControllerPodsCount contains the list of running ingress controller Pod(s)
|
||||||
|
ControllerPodsCount int `json:"controllerPodsCount,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backend describes one or more remote server/s (endpoints) associated with a service
|
// Backend describes one or more remote server/s (endpoints) associated with a service
|
||||||
|
@ -338,3 +341,8 @@ type Ingress struct {
|
||||||
extensions.Ingress
|
extensions.Ingress
|
||||||
ParsedAnnotations *annotations.Ingress
|
ParsedAnnotations *annotations.Ingress
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GeneralConfig holds the definition of lua general configuration data
|
||||||
|
type GeneralConfig struct {
|
||||||
|
ControllerPodsCount int `json:"controllerPodsCount"`
|
||||||
|
}
|
||||||
|
|
|
@ -105,6 +105,10 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c1.ControllerPodsCount != c2.ControllerPodsCount {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,10 @@ function _M.get_backends_data()
|
||||||
return configuration_data:get("backends")
|
return configuration_data:get("backends")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function _M.get_general_data()
|
||||||
|
return configuration_data:get("general")
|
||||||
|
end
|
||||||
|
|
||||||
local function fetch_request_body()
|
local function fetch_request_body()
|
||||||
ngx.req.read_body()
|
ngx.req.read_body()
|
||||||
local body = ngx.req.get_body_data()
|
local body = ngx.req.get_body_data()
|
||||||
|
@ -80,6 +84,31 @@ local function handle_servers()
|
||||||
ngx.status = ngx.HTTP_CREATED
|
ngx.status = ngx.HTTP_CREATED
|
||||||
end
|
end
|
||||||
|
|
||||||
|
local function handle_general()
|
||||||
|
if ngx.var.request_method == "GET" then
|
||||||
|
ngx.status = ngx.HTTP_OK
|
||||||
|
ngx.print(_M.get_general_data())
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
if ngx.var.request_method ~= "POST" then
|
||||||
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
|
ngx.print("Only POST and GET requests are allowed!")
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
local config = fetch_request_body()
|
||||||
|
|
||||||
|
local success, err = configuration_data:safe_set("general", config)
|
||||||
|
if not success then
|
||||||
|
ngx.status = ngx.HTTP_INTERNAL_SERVER_ERROR
|
||||||
|
ngx.log(ngx.ERR, "error setting general config: " .. tostring(err))
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
ngx.status = ngx.HTTP_CREATED
|
||||||
|
end
|
||||||
|
|
||||||
function _M.call()
|
function _M.call()
|
||||||
if ngx.var.request_method ~= "POST" and ngx.var.request_method ~= "GET" then
|
if ngx.var.request_method ~= "POST" and ngx.var.request_method ~= "GET" then
|
||||||
ngx.status = ngx.HTTP_BAD_REQUEST
|
ngx.status = ngx.HTTP_BAD_REQUEST
|
||||||
|
@ -92,6 +121,11 @@ function _M.call()
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if ngx.var.request_uri == "/configuration/general" then
|
||||||
|
handle_general()
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
if ngx.var.request_uri ~= "/configuration/backends" then
|
if ngx.var.request_uri ~= "/configuration/backends" then
|
||||||
ngx.status = ngx.HTTP_NOT_FOUND
|
ngx.status = ngx.HTTP_NOT_FOUND
|
||||||
ngx.print("Not found!")
|
ngx.print("Not found!")
|
||||||
|
|
|
@ -28,6 +28,16 @@ import (
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ExecIngressPod executes a command inside the first container in ingress controller running pod
|
||||||
|
func (f *Framework) ExecIngressPod(command string) (string, error) {
|
||||||
|
pod, err := getIngressNGINXPod(f.IngressController.Namespace, f.KubeClientSet)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return f.ExecCommand(pod, command)
|
||||||
|
}
|
||||||
|
|
||||||
// ExecCommand executes a command inside a the first container in a running pod
|
// ExecCommand executes a command inside a the first container in a running pod
|
||||||
func (f *Framework) ExecCommand(pod *v1.Pod, command string) (string, error) {
|
func (f *Framework) ExecCommand(pod *v1.Pod, command string) (string, error) {
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -149,6 +149,20 @@ var _ = framework.IngressNginxDescribe("Dynamic Configuration", func() {
|
||||||
})
|
})
|
||||||
Expect(nginxConfig).ShouldNot(Equal(newNginxConfig))
|
Expect(nginxConfig).ShouldNot(Equal(newNginxConfig))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("sets controllerPodsCount in Lua general configuration", func() {
|
||||||
|
output, err := f.ExecIngressPod("curl --fail --silent http://127.0.0.1:18080/configuration/general")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(output).Should(Equal(`{"controllerPodsCount":1}`))
|
||||||
|
|
||||||
|
err = framework.UpdateDeployment(f.KubeClientSet, f.IngressController.Namespace, "nginx-ingress-controller", 3, nil)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
time.Sleep(waitForLuaSync)
|
||||||
|
|
||||||
|
output, err = f.ExecIngressPod("curl --fail --silent http://127.0.0.1:18080/configuration/general")
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(output).Should(Equal(`{"controllerPodsCount":3}`))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
func ensureIngress(f *framework.Framework, host string) *extensions.Ingress {
|
func ensureIngress(f *framework.Framework, host string) *extensions.Ingress {
|
||||||
|
|
Loading…
Reference in a new issue