Skip to content

Commit f30e11d

Browse files
pkg/{cache,client}: add options for cache miss policy (#2406)
This commit allows users to opt out of the "start informers in the background" behavior that the current cache implementation uses. Additionally, when opting out of this behavior, the client can be configured to do a live lookup on a cache miss. The default behaviors are: pkg/cache: backfill data on a miss (today's default, unchanged) pkg/client: live lookup when cache is configured to miss Signed-off-by: Steve Kuznetsov <[email protected]>
1 parent d781099 commit f30e11d

File tree

8 files changed

+176
-7
lines changed

8 files changed

+176
-7
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ generate: $(CONTROLLER_GEN) ## Runs controller-gen for internal types for config
112112

113113
.PHONY: clean
114114
clean: ## Cleanup.
115+
$(GOLANGCI_LINT) cache clean
115116
$(MAKE) clean-bin
116117

117118
.PHONY: clean-bin

pkg/cache/cache.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,15 @@ type Options struct {
145145
// instead of `reconcile.Result{}`.
146146
SyncPeriod *time.Duration
147147

148+
// ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user
149+
// requests, using Get() and List(), a resource the cache does not already have an informer for.
150+
//
151+
// This error is distinct from an errors.NotFound.
152+
//
153+
// Defaults to false, which means that the cache will start a new informer
154+
// for every new requested resource.
155+
ReaderFailOnMissingInformer bool
156+
148157
// DefaultNamespaces maps namespace names to cache configs. If set, only
149158
// the namespaces in here will be watched and it will by used to default
150159
// ByObject.Namespaces for all objects if that is nil.
@@ -329,6 +338,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
329338
Transform: config.Transform,
330339
UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),
331340
}),
341+
readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
332342
}
333343
}
334344
}

pkg/cache/cache_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cache_test
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"reflect"
2324
"sort"
@@ -117,6 +118,11 @@ func deletePod(pod client.Object) {
117118
var _ = Describe("Informer Cache", func() {
118119
CacheTest(cache.New, cache.Options{})
119120
})
121+
122+
var _ = Describe("Informer Cache with ReaderFailOnMissingInformer", func() {
123+
CacheTestReaderFailOnMissingInformer(cache.New, cache.Options{ReaderFailOnMissingInformer: true})
124+
})
125+
120126
var _ = Describe("Multi-Namespace Informer Cache", func() {
121127
CacheTest(cache.New, cache.Options{
122128
DefaultNamespaces: map[string]cache.Config{
@@ -422,6 +428,85 @@ var _ = Describe("Cache with selectors", func() {
422428
})
423429
})
424430

431+
func CacheTestReaderFailOnMissingInformer(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) {
432+
Describe("Cache test with ReaderFailOnMissingInformer = true", func() {
433+
var (
434+
informerCache cache.Cache
435+
informerCacheCtx context.Context
436+
informerCacheCancel context.CancelFunc
437+
errNotCached *cache.ErrResourceNotCached
438+
)
439+
440+
BeforeEach(func() {
441+
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
442+
Expect(cfg).NotTo(BeNil())
443+
444+
By("creating the informer cache")
445+
var err error
446+
informerCache, err = createCacheFunc(cfg, opts)
447+
Expect(err).NotTo(HaveOccurred())
448+
By("running the cache and waiting for it to sync")
449+
// pass as an arg so that we don't race between close and re-assign
450+
go func(ctx context.Context) {
451+
defer GinkgoRecover()
452+
Expect(informerCache.Start(ctx)).To(Succeed())
453+
}(informerCacheCtx)
454+
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
455+
})
456+
457+
AfterEach(func() {
458+
informerCacheCancel()
459+
})
460+
461+
Describe("as a Reader", func() {
462+
Context("with structured objects", func() {
463+
It("should not be able to list objects that haven't been watched previously", func() {
464+
By("listing all services in the cluster")
465+
listObj := &corev1.ServiceList{}
466+
Expect(errors.As(informerCache.List(context.Background(), listObj), &errNotCached)).To(BeTrue())
467+
})
468+
469+
It("should not be able to get objects that haven't been watched previously", func() {
470+
By("getting the Kubernetes service")
471+
svc := &corev1.Service{}
472+
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
473+
Expect(errors.As(informerCache.Get(context.Background(), svcKey, svc), &errNotCached)).To(BeTrue())
474+
})
475+
476+
It("should be able to list objects that are configured to be watched", func() {
477+
By("indicating that we need to watch services")
478+
_, err := informerCache.GetInformer(context.Background(), &corev1.Service{})
479+
Expect(err).ToNot(HaveOccurred())
480+
481+
By("listing all services in the cluster")
482+
svcList := &corev1.ServiceList{}
483+
Expect(informerCache.List(context.Background(), svcList)).To(Succeed())
484+
485+
By("verifying that the returned service looks reasonable")
486+
Expect(svcList.Items).To(HaveLen(1))
487+
Expect(svcList.Items[0].Name).To(Equal("kubernetes"))
488+
Expect(svcList.Items[0].Namespace).To(Equal("default"))
489+
})
490+
491+
It("should be able to get objects that are configured to be watched", func() {
492+
By("indicating that we need to watch services")
493+
_, err := informerCache.GetInformer(context.Background(), &corev1.Service{})
494+
Expect(err).ToNot(HaveOccurred())
495+
496+
By("getting the Kubernetes service")
497+
svc := &corev1.Service{}
498+
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
499+
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())
500+
501+
By("verifying that the returned service looks reasonable")
502+
Expect(svc.Name).To(Equal("kubernetes"))
503+
Expect(svc.Namespace).To(Equal("default"))
504+
})
505+
})
506+
})
507+
})
508+
}
509+
425510
func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) {
426511
Describe("Cache test", func() {
427512
var (

pkg/cache/informer_cache.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,28 @@ func (*ErrCacheNotStarted) Error() string {
4646
return "the cache is not started, can not read objects"
4747
}
4848

49+
var _ error = (*ErrCacheNotStarted)(nil)
50+
51+
// ErrResourceNotCached indicates that the resource type
52+
// the client asked the cache for is not cached, i.e. the
53+
// corresponding informer does not exist yet.
54+
type ErrResourceNotCached struct {
55+
GVK schema.GroupVersionKind
56+
}
57+
58+
// Error returns the error
59+
func (r ErrResourceNotCached) Error() string {
60+
return fmt.Sprintf("%s is not cached", r.GVK.String())
61+
}
62+
63+
var _ error = (*ErrResourceNotCached)(nil)
64+
4965
// informerCache is a Kubernetes Object cache populated from internal.Informers.
5066
// informerCache wraps internal.Informers.
5167
type informerCache struct {
5268
scheme *runtime.Scheme
5369
*internal.Informers
70+
readerFailOnMissingInformer bool
5471
}
5572

5673
// Get implements Reader.
@@ -60,7 +77,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
6077
return err
6178
}
6279

63-
started, cache, err := ic.Informers.Get(ctx, gvk, out)
80+
started, cache, err := ic.getInformerForKind(ctx, gvk, out)
6481
if err != nil {
6582
return err
6683
}
@@ -78,7 +95,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts .
7895
return err
7996
}
8097

81-
started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj)
98+
started, cache, err := ic.getInformerForKind(ctx, *gvk, cacheTypeObj)
8299
if err != nil {
83100
return err
84101
}
@@ -124,7 +141,7 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
124141
return &gvk, cacheTypeObj, nil
125142
}
126143

127-
// GetInformerForKind returns the informer for the GroupVersionKind.
144+
// GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started.
128145
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
129146
// Map the gvk to an object
130147
obj, err := ic.scheme.New(gvk)
@@ -139,7 +156,7 @@ func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou
139156
return i.Informer, nil
140157
}
141158

142-
// GetInformer returns the informer for the obj.
159+
// GetInformer returns the informer for the obj. If no informer exists, one will be started.
143160
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
144161
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
145162
if err != nil {
@@ -153,6 +170,18 @@ func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
153170
return i.Informer, nil
154171
}
155172

173+
func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *internal.Cache, error) {
174+
if ic.readerFailOnMissingInformer {
175+
cache, started, ok := ic.Informers.Peek(gvk, obj)
176+
if !ok {
177+
return false, nil, &ErrResourceNotCached{GVK: gvk}
178+
}
179+
return started, cache, nil
180+
}
181+
182+
return ic.Informers.Get(ctx, gvk, obj)
183+
}
184+
156185
// NeedLeaderElection implements the LeaderElectionRunnable interface
157186
// to indicate that this can be started without requiring the leader lock.
158187
func (ic *informerCache) NeedLeaderElection() bool {

pkg/cache/internal/informers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ func (ip *Informers) WaitForCacheSync(ctx context.Context) bool {
230230
return cache.WaitForCacheSync(ctx.Done(), ip.getHasSyncedFuncs()...)
231231
}
232232

233-
func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) {
233+
// Peek attempts to get the informer for the GVK, but does not start one if one does not exist.
234+
func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) {
234235
ip.mu.RLock()
235236
defer ip.mu.RUnlock()
236237
i, ok := ip.informersByType(obj)[gvk]
@@ -241,7 +242,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *
241242
// the Informer from the map.
242243
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) {
243244
// Return the informer if it is found
244-
i, started, ok := ip.get(gvk, obj)
245+
i, started, ok := ip.Peek(gvk, obj)
245246
if !ok {
246247
var err error
247248
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {

pkg/client/client.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,12 @@ type CacheOptions struct {
7777
// Reader is a cache-backed reader that will be used to read objects from the cache.
7878
// +required
7979
Reader Reader
80-
// DisableFor is a list of objects that should not be read from the cache.
80+
// DisableFor is a list of objects that should never be read from the cache.
81+
// Objects configured here always result in a live lookup.
8182
DisableFor []Object
8283
// Unstructured is a flag that indicates whether the cache-backed client should
8384
// read unstructured objects or lists from the cache.
85+
// If false, unstructured objects will always result in a live lookup.
8486
Unstructured bool
8587
}
8688

@@ -342,9 +344,11 @@ func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...Get
342344
if isUncached, err := c.shouldBypassCache(obj); err != nil {
343345
return err
344346
} else if !isUncached {
347+
// Attempt to get from the cache.
345348
return c.cache.Get(ctx, key, obj, opts...)
346349
}
347350

351+
// Perform a live lookup.
348352
switch obj.(type) {
349353
case runtime.Unstructured:
350354
return c.unstructuredClient.Get(ctx, key, obj, opts...)
@@ -362,9 +366,11 @@ func (c *client) List(ctx context.Context, obj ObjectList, opts ...ListOption) e
362366
if isUncached, err := c.shouldBypassCache(obj); err != nil {
363367
return err
364368
} else if !isUncached {
369+
// Attempt to get from the cache.
365370
return c.cache.List(ctx, obj, opts...)
366371
}
367372

373+
// Perform a live lookup.
368374
switch x := obj.(type) {
369375
case runtime.Unstructured:
370376
return c.unstructuredClient.List(ctx, obj, opts...)

pkg/client/client_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package client_test
1919
import (
2020
"context"
2121
"encoding/json"
22+
"errors"
2223
"fmt"
2324
"reflect"
2425
"sync/atomic"
@@ -43,6 +44,7 @@ import (
4344
"k8s.io/utils/pointer"
4445

4546
"sigs.k8s.io/controller-runtime/examples/crd/pkg"
47+
"sigs.k8s.io/controller-runtime/pkg/cache"
4648
"sigs.k8s.io/controller-runtime/pkg/client"
4749
)
4850

@@ -143,6 +145,7 @@ var _ = Describe("Client", func() {
143145
var count uint64 = 0
144146
var replicaCount int32 = 2
145147
var ns = "default"
148+
var errNotCached *cache.ErrResourceNotCached
146149
ctx := context.TODO()
147150

148151
BeforeEach(func() {
@@ -278,6 +281,16 @@ U5wwSivyi7vmegHKmblOzNVKA5qPO8zWzqBC
278281
Expect(cache.Called).To(Equal(2))
279282
})
280283

284+
It("should propagate ErrResourceNotCached errors", func() {
285+
c := &fakeUncachedReader{}
286+
cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: c}})
287+
Expect(err).NotTo(HaveOccurred())
288+
Expect(cl).NotTo(BeNil())
289+
Expect(errors.As(cl.Get(ctx, client.ObjectKey{Name: "test"}, &appsv1.Deployment{}), &errNotCached)).To(BeTrue())
290+
Expect(errors.As(cl.List(ctx, &appsv1.DeploymentList{}), &errNotCached)).To(BeTrue())
291+
Expect(c.Called).To(Equal(2))
292+
})
293+
281294
It("should not use the provided reader cache if provided, on get and list for uncached GVKs", func() {
282295
cache := &fakeReader{}
283296
cl, err := client.New(cfg, client.Options{Cache: &client.CacheOptions{Reader: cache, DisableFor: []client.Object{&corev1.Namespace{}}}})
@@ -3938,6 +3951,20 @@ func (f *fakeReader) List(ctx context.Context, list client.ObjectList, opts ...c
39383951
return nil
39393952
}
39403953

3954+
type fakeUncachedReader struct {
3955+
Called int
3956+
}
3957+
3958+
func (f *fakeUncachedReader) Get(_ context.Context, _ client.ObjectKey, _ client.Object, opts ...client.GetOption) error {
3959+
f.Called++
3960+
return &cache.ErrResourceNotCached{}
3961+
}
3962+
3963+
func (f *fakeUncachedReader) List(_ context.Context, _ client.ObjectList, _ ...client.ListOption) error {
3964+
f.Called++
3965+
return &cache.ErrResourceNotCached{}
3966+
}
3967+
39413968
func ptr[T any](to T) *T {
39423969
return &to
39433970
}

pkg/manager/manager_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,16 @@ var _ = Describe("manger.Manager", func() {
600600
cancel()
601601
})
602602

603+
It("should be able to create a manager with a cache that fails on missing informer", func() {
604+
m, err := New(cfg, Options{
605+
Cache: cache.Options{
606+
ReaderFailOnMissingInformer: true,
607+
},
608+
})
609+
Expect(m).ToNot(BeNil())
610+
Expect(err).ToNot(HaveOccurred())
611+
})
612+
603613
It("should return an error if the metrics bind address is already in use", func() {
604614
ln, err := net.Listen("tcp", ":0") //nolint:gosec
605615
Expect(err).ShouldNot(HaveOccurred())

0 commit comments

Comments
 (0)