Merge 1bdb178cb0
into de1a4c463c
This commit is contained in:
commit
2d8c563edb
3 changed files with 115 additions and 72 deletions
|
@ -18,51 +18,72 @@ package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
|
|
||||||
discoveryv1 "k8s.io/api/discovery/v1"
|
discoveryv1 "k8s.io/api/discovery/v1"
|
||||||
apiNames "k8s.io/apiserver/pkg/storage/names"
|
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type getEPSsForServiceFunc = func(key string) ([]*discoveryv1.EndpointSlice, error)
|
||||||
|
|
||||||
// EndpointSliceLister makes a Store that lists Endpoints.
|
// EndpointSliceLister makes a Store that lists Endpoints.
|
||||||
type EndpointSliceLister struct {
|
type EndpointSliceLister struct {
|
||||||
cache.Store
|
cache.Store
|
||||||
|
endpointSliceIndex getEPSsForServiceFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// MatchByKey returns the EndpointsSlices of the Service matching key in the local Endpoint Store.
|
// MatchByKey returns the EndpointsSlices of the Service matching key in the local Endpoint Store.
|
||||||
func (s *EndpointSliceLister) MatchByKey(key string) ([]*discoveryv1.EndpointSlice, error) {
|
func (s *EndpointSliceLister) MatchByKey(key string) ([]*discoveryv1.EndpointSlice, error) {
|
||||||
var eps []*discoveryv1.EndpointSlice
|
epss, err := s.endpointSliceIndex(key)
|
||||||
keyNsLen := strings.Index(key, "/")
|
if err != nil {
|
||||||
if keyNsLen < -1 {
|
return nil, err
|
||||||
keyNsLen = 0
|
|
||||||
} else {
|
|
||||||
// count '/' char
|
|
||||||
keyNsLen++
|
|
||||||
}
|
}
|
||||||
// filter endpointSlices owned by svc
|
|
||||||
for _, listKey := range s.ListKeys() {
|
if len(epss) == 0 {
|
||||||
if len(key) < (apiNames.MaxGeneratedNameLength+keyNsLen) && !strings.HasPrefix(listKey, key) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// generated endpointslices names has truncated svc name as prefix when svc name is too long, we compare only non truncated part
|
|
||||||
// https://github.com/kubernetes/ingress-nginx/issues/9240
|
|
||||||
if len(key) >= (apiNames.MaxGeneratedNameLength+keyNsLen) && !strings.HasPrefix(listKey, key[:apiNames.MaxGeneratedNameLength+keyNsLen-1]) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
epss, exists, err := s.GetByKey(listKey)
|
|
||||||
if exists && err == nil {
|
|
||||||
// check for svc owner label
|
|
||||||
if svcName, ok := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetLabels()[discoveryv1.LabelServiceName]; ok {
|
|
||||||
namespace := epss.(*discoveryv1.EndpointSlice).ObjectMeta.GetNamespace()
|
|
||||||
if key == fmt.Sprintf("%s/%s", namespace, svcName) {
|
|
||||||
eps = append(eps, epss.(*discoveryv1.EndpointSlice))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(eps) == 0 {
|
|
||||||
return nil, NotExistsError(key)
|
return nil, NotExistsError(key)
|
||||||
}
|
}
|
||||||
return eps, nil
|
return epss, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEPSsIndexer() cache.Indexers {
|
||||||
|
return cache.Indexers{
|
||||||
|
discoveryv1.LabelServiceName: func(obj interface{}) ([]string, error) {
|
||||||
|
eps, ok := obj.(*discoveryv1.EndpointSlice)
|
||||||
|
if !ok {
|
||||||
|
// Skip object as it is not an endpointslice
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
parentService, ok := eps.Labels[discoveryv1.LabelServiceName]
|
||||||
|
if !ok {
|
||||||
|
// There is no parent service and thus we cannot match this endpointslice to any service
|
||||||
|
// As far as i'm aware, this is only possible if you create epps objects by hand
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%s/%s", eps.Namespace, parentService)
|
||||||
|
|
||||||
|
return []string{key}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEPSsForServiceFuncFromIndexer(indexer cache.Indexer) getEPSsForServiceFunc {
|
||||||
|
return func(key string) ([]*discoveryv1.EndpointSlice, error) {
|
||||||
|
objs, err := indexer.ByIndex(discoveryv1.LabelServiceName, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
epss := make([]*discoveryv1.EndpointSlice, 0, len(objs))
|
||||||
|
for _, obj := range objs {
|
||||||
|
eps, ok := obj.(*discoveryv1.EndpointSlice)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
epss = append(epss, eps)
|
||||||
|
}
|
||||||
|
|
||||||
|
return epss, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,15 +25,21 @@ import (
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newEndpointSliceLister(t *testing.T) *EndpointSliceLister {
|
func newEndpointSliceLister(t *testing.T) (*EndpointSliceLister, cache.Indexer) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
return &EndpointSliceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
|
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
|
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, newEPSsIndexer())
|
||||||
|
|
||||||
|
return &EndpointSliceLister{
|
||||||
|
Store: store,
|
||||||
|
endpointSliceIndex: getEPSsForServiceFuncFromIndexer(indexer),
|
||||||
|
}, indexer
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEndpointSliceLister(t *testing.T) {
|
func TestEndpointSliceLister(t *testing.T) {
|
||||||
t.Run("the key does not exist", func(t *testing.T) {
|
t.Run("the key does not exist", func(t *testing.T) {
|
||||||
el := newEndpointSliceLister(t)
|
el, _ := newEndpointSliceLister(t)
|
||||||
|
|
||||||
key := "namespace/svcname"
|
key := "namespace/svcname"
|
||||||
_, err := el.MatchByKey(key)
|
_, err := el.MatchByKey(key)
|
||||||
|
@ -47,10 +53,11 @@ func TestEndpointSliceLister(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("the key exists", func(t *testing.T) {
|
t.Run("the key exists", func(t *testing.T) {
|
||||||
el := newEndpointSliceLister(t)
|
el, indexer := newEndpointSliceLister(t)
|
||||||
|
|
||||||
key := "namespace/svcname"
|
key := "namespace/svcname"
|
||||||
endpointSlice := &discoveryv1.EndpointSlice{
|
epss := []*discoveryv1.EndpointSlice{
|
||||||
|
{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Namespace: "namespace",
|
Namespace: "namespace",
|
||||||
Name: "anothername-foo",
|
Name: "anothername-foo",
|
||||||
|
@ -58,11 +65,8 @@ func TestEndpointSliceLister(t *testing.T) {
|
||||||
discoveryv1.LabelServiceName: "svcname",
|
discoveryv1.LabelServiceName: "svcname",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
},
|
||||||
if err := el.Add(endpointSlice); err != nil {
|
{
|
||||||
t.Errorf("unexpected error %v", err)
|
|
||||||
}
|
|
||||||
endpointSlice = &discoveryv1.EndpointSlice{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Namespace: "namespace",
|
Namespace: "namespace",
|
||||||
Name: "svcname-bar",
|
Name: "svcname-bar",
|
||||||
|
@ -70,35 +74,39 @@ func TestEndpointSliceLister(t *testing.T) {
|
||||||
discoveryv1.LabelServiceName: "othersvc",
|
discoveryv1.LabelServiceName: "othersvc",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
},
|
||||||
if err := el.Add(endpointSlice); err != nil {
|
{
|
||||||
t.Errorf("unexpected error %v", err)
|
|
||||||
}
|
|
||||||
endpointSlice = &discoveryv1.EndpointSlice{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Namespace: "namespace",
|
Namespace: "namespace",
|
||||||
Name: "svcname-buz",
|
Name: "svcname-buz",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
discoveryv1.LabelServiceName: "svcname",
|
discoveryv1.LabelServiceName: "svcname2",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := el.Add(endpointSlice); err != nil {
|
for _, eps := range epss {
|
||||||
|
if err := el.Add(eps); err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
|
if err := indexer.Add(eps); err != nil {
|
||||||
|
t.Errorf("unexpected error %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
eps, err := el.MatchByKey(key)
|
eps, err := el.MatchByKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
if err == nil && len(eps) != 1 {
|
if err == nil && len(eps) != 1 {
|
||||||
t.Errorf("expected one slice %v, error, got %d slices", endpointSlice, len(eps))
|
t.Errorf("expected one slice %v, error, got %d slices, keys stored in indexer: %v, eps returned by storer: %v", epss[0], len(eps), indexer.ListKeys(), eps)
|
||||||
}
|
}
|
||||||
if len(eps) > 0 && eps[0].GetName() != endpointSlice.GetName() {
|
if len(eps) > 0 && eps[0].GetName() != epss[0].GetName() {
|
||||||
t.Errorf("expected %v, error, got %v", endpointSlice.GetName(), eps[0].GetName())
|
t.Errorf("expected %v, error, got %v", epss[0].GetName(), eps[0].GetName())
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("svc long name", func(t *testing.T) {
|
t.Run("svc long name", func(t *testing.T) {
|
||||||
el := newEndpointSliceLister(t)
|
el, indexer := newEndpointSliceLister(t)
|
||||||
ns := "namespace"
|
ns := "namespace"
|
||||||
ns2 := "another-ns"
|
ns2 := "another-ns"
|
||||||
svcName := "test-backend-http-test-http-test-http-test-http-test-http-truncated"
|
svcName := "test-backend-http-test-http-test-http-test-http-test-http-truncated"
|
||||||
|
@ -116,6 +124,9 @@ func TestEndpointSliceLister(t *testing.T) {
|
||||||
if err := el.Add(endpointSlice); err != nil {
|
if err := el.Add(endpointSlice); err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
|
if err := indexer.Add(endpointSlice); err != nil {
|
||||||
|
t.Errorf("unexpected error %v", err)
|
||||||
|
}
|
||||||
endpointSlice2 := &discoveryv1.EndpointSlice{
|
endpointSlice2 := &discoveryv1.EndpointSlice{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Namespace: ns2,
|
Namespace: ns2,
|
||||||
|
@ -128,6 +139,9 @@ func TestEndpointSliceLister(t *testing.T) {
|
||||||
if err := el.Add(endpointSlice2); err != nil {
|
if err := el.Add(endpointSlice2); err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
|
if err := indexer.Add(endpointSlice); err != nil {
|
||||||
|
t.Errorf("unexpected error %v", err)
|
||||||
|
}
|
||||||
eps, err := el.MatchByKey(key)
|
eps, err := el.MatchByKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
|
|
|
@ -341,7 +341,15 @@ func New(
|
||||||
}
|
}
|
||||||
|
|
||||||
store.informers.EndpointSlice = infFactory.Discovery().V1().EndpointSlices().Informer()
|
store.informers.EndpointSlice = infFactory.Discovery().V1().EndpointSlices().Informer()
|
||||||
|
// Add new endpointslices indexer to markup epps upfront for fast pinpoint retrieval
|
||||||
|
if err := store.informers.EndpointSlice.AddIndexers(newEPSsIndexer()); err != nil {
|
||||||
|
// This error only occurs due to errors in code, this panic is not possible in runtime
|
||||||
|
// if the underlying code is correct. Typically, this error signals conflicts in indexer
|
||||||
|
panic(fmt.Sprintf("failed to add new index for endpointslices: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
store.listers.EndpointSlice.Store = store.informers.EndpointSlice.GetStore()
|
store.listers.EndpointSlice.Store = store.informers.EndpointSlice.GetStore()
|
||||||
|
store.listers.EndpointSlice.endpointSliceIndex = getEPSsForServiceFuncFromIndexer(store.informers.EndpointSlice.GetIndexer())
|
||||||
|
|
||||||
store.informers.Secret = infFactorySecrets.Core().V1().Secrets().Informer()
|
store.informers.Secret = infFactorySecrets.Core().V1().Secrets().Informer()
|
||||||
store.listers.Secret.Store = store.informers.Secret.GetStore()
|
store.listers.Secret.Store = store.informers.Secret.GetStore()
|
||||||
|
|
Loading…
Reference in a new issue