Merge pull request #3455 from Shopify/controller-pod

Watch controller Pods and make then available in k8sStore
This commit is contained in:
k8s-ci-robot 2018-11-27 03:11:18 -08:00 committed by GitHub
commit 8299652747
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 205 additions and 7 deletions

View file

@ -56,6 +56,7 @@ import (
ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template"
"k8s.io/ingress-nginx/internal/ingress/metric"
"k8s.io/ingress-nginx/internal/ingress/status"
"k8s.io/ingress-nginx/internal/k8s"
ing_net "k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/net/dns"
"k8s.io/ingress-nginx/internal/net/ssl"
@ -110,6 +111,11 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File
metricCollector: mc,
}
pod, err := k8s.GetPodDetails(config.Client)
if err != nil {
glog.Fatalf("unexpected error obtaining pod information: %v", err)
}
n.store = store.New(
config.EnableSSLChainCompletion,
config.Namespace,
@ -121,7 +127,8 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File
config.Client,
fs,
n.updateCh,
config.DynamicCertificatesEnabled)
config.DynamicCertificatesEnabled,
pod)
n.syncQueue = task.NewTaskQueue(n.syncIngress)

View file

@ -0,0 +1,26 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"k8s.io/client-go/tools/cache"
)
// PodLister makes a Store that lists Pods.
type PodLister struct {
cache.Store
}

View file

@ -30,8 +30,11 @@ import (
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
@ -76,6 +79,9 @@ type Storer interface {
// ListIngresses returns a list of all Ingresses in the store.
ListIngresses() []*ingress.Ingress
// ListControllerPods returns a list of ingress-nginx controller Pods.
ListControllerPods() []*corev1.Pod
// GetLocalSSLCert returns the local copy of a SSLCert
GetLocalSSLCert(name string) (*ingress.SSLCert, error)
@ -121,6 +127,7 @@ type Informer struct {
Service cache.SharedIndexInformer
Secret cache.SharedIndexInformer
ConfigMap cache.SharedIndexInformer
Pod cache.SharedIndexInformer
}
// Lister contains object listers (stores).
@ -131,6 +138,7 @@ type Lister struct {
Secret SecretLister
ConfigMap ConfigMapLister
IngressAnnotation IngressAnnotationsLister
Pod PodLister
}
// NotExistsError is returned when an object does not exist in a local store.
@ -147,6 +155,7 @@ func (i *Informer) Run(stopCh chan struct{}) {
go i.Service.Run(stopCh)
go i.Secret.Run(stopCh)
go i.ConfigMap.Run(stopCh)
go i.Pod.Run(stopCh)
// wait for all involved caches to be synced before processing items
// from the queue
@ -211,6 +220,8 @@ type k8sStore struct {
defaultSSLCertificate string
isDynamicCertificatesEnabled bool
pod *k8s.PodInfo
}
// New creates a new object store to be used in the ingress controller
@ -220,7 +231,8 @@ func New(checkOCSP bool,
client clientset.Interface,
fs file.Filesystem,
updateCh *channels.RingChannel,
isDynamicCertificatesEnabled bool) Storer {
isDynamicCertificatesEnabled bool,
pod *k8s.PodInfo) Storer {
store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
@ -234,6 +246,7 @@ func New(checkOCSP bool,
secretIngressMap: NewObjectRefMap(),
defaultSSLCertificate: defaultSSLCertificate,
isDynamicCertificatesEnabled: isDynamicCertificatesEnabled,
pod: pod,
}
eventBroadcaster := record.NewBroadcaster()
@ -270,6 +283,26 @@ func New(checkOCSP bool,
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
labelSelector := labels.SelectorFromSet(store.pod.Labels)
store.informers.Pod = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {
options.LabelSelector = labelSelector.String()
return client.CoreV1().Pods(store.pod.Namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelSelector.String()
return client.CoreV1().Pods(store.pod.Namespace).Watch(options)
},
},
&corev1.Pod{},
resyncPeriod,
cache.Indexers{},
)
store.listers.Pod.Store = store.informers.Pod.GetStore()
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing := obj.(*extensions.Ingress)
@ -512,11 +545,40 @@ func New(checkOCSP bool,
},
}
podEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oldPod := old.(*corev1.Pod)
curPod := cur.(*corev1.Pod)
if oldPod.Status.Phase == curPod.Status.Phase {
return
}
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
DeleteFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
},
}
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{})
store.informers.Pod.AddEventHandler(podEventHandler)
// do not wait for informers to read the configmap configuration
ns, name, _ := k8s.ParseNameNS(configmap)
@ -773,3 +835,20 @@ func (s k8sStore) Run(stopCh chan struct{}) {
go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh)
}
}
// ListControllerPods returns a list of ingress-nginx controller Pods
func (s k8sStore) ListControllerPods() []*corev1.Pod {
var pods []*corev1.Pod
for _, i := range s.listers.Pod.List() {
pod := i.(*corev1.Pod)
if pod.Status.Phase != corev1.PodRunning {
continue
}
pods = append(pods, pod)
}
return pods
}

View file

@ -18,6 +18,7 @@ package store
import (
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
@ -38,10 +39,19 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/ingress-nginx/internal/file"
"k8s.io/ingress-nginx/internal/ingress/annotations/parser"
"k8s.io/ingress-nginx/internal/k8s"
"k8s.io/ingress-nginx/test/e2e/framework"
)
func TestStore(t *testing.T) {
pod := &k8s.PodInfo{
Name: "testpod",
Namespace: v1.NamespaceDefault,
Labels: map[string]string{
"pod-template-hash": "1234",
},
}
clientSet := fake.NewSimpleClientset()
t.Run("should return an error searching for non existing objects", func(t *testing.T) {
@ -70,7 +80,8 @@ func TestStore(t *testing.T) {
clientSet,
fs,
updateCh,
false)
false,
pod)
storer.Run(stopCh)
@ -158,7 +169,8 @@ func TestStore(t *testing.T) {
clientSet,
fs,
updateCh,
false)
false,
pod)
storer.Run(stopCh)
@ -306,7 +318,8 @@ func TestStore(t *testing.T) {
clientSet,
fs,
updateCh,
false)
false,
pod)
storer.Run(stopCh)
@ -395,7 +408,8 @@ func TestStore(t *testing.T) {
clientSet,
fs,
updateCh,
false)
false,
pod)
storer.Run(stopCh)
@ -507,7 +521,8 @@ func TestStore(t *testing.T) {
clientSet,
fs,
updateCh,
false)
false,
pod)
storer.Run(stopCh)
@ -727,17 +742,27 @@ func newStore(t *testing.T) *k8sStore {
t.Fatalf("error: %v", err)
}
pod := &k8s.PodInfo{
Name: "ingress-1",
Namespace: v1.NamespaceDefault,
Labels: map[string]string{
"pod-template-hash": "1234",
},
}
return &k8sStore{
listers: &Lister{
// add more listers if needed
Ingress: IngressLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
IngressAnnotation: IngressAnnotationsLister{cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)},
Pod: PodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
},
sslStore: NewSSLCertTracker(),
filesystem: fs,
updateCh: channels.NewRingChannel(10),
mu: new(sync.Mutex),
secretIngressMap: NewObjectRefMap(),
pod: pod,
}
}
@ -943,3 +968,64 @@ func TestWriteSSLSessionTicketKey(t *testing.T) {
}
}
}
func TestListControllerPods(t *testing.T) {
os.Setenv("POD_NAMESPACE", "testns")
os.Setenv("POD_NAME", "ingress-1")
s := newStore(t)
s.pod = &k8s.PodInfo{
Name: "ingress-1",
Namespace: "testns",
Labels: map[string]string{
"pod-template-hash": "1234",
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "ingress-1",
Namespace: "testns",
Labels: map[string]string{
"pod-template-hash": "1234",
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
s.listers.Pod.Add(pod)
pod = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "ingress-2",
Namespace: "testns",
Labels: map[string]string{
"pod-template-hash": "1234",
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
s.listers.Pod.Add(pod)
pod = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "ingress-3",
Namespace: "testns",
Labels: map[string]string{
"pod-template-hash": "1234",
},
},
Status: v1.PodStatus{
Phase: v1.PodFailed,
},
}
s.listers.Pod.Add(pod)
pods := s.ListControllerPods()
if s := len(pods); s != 2 {
t.Errorf("Expected 1 controller Pods but got %v", s)
}
}