Skip to content

Commit bc35946

Browse files
committed
Proposal for dynamic informer cache
This PR shows how Gatekeeper has forked controller runtime to support the dynamic addition/removal of informers. Happy to flesh this out if people are interested. Not sure what the correct licensing actions are for moving code across CNCF projects. Fix lint error Signed-off-by: Max Smythe <[email protected]> Move mergeChan; add functional opts Signed-off-by: Max Smythe <[email protected]> Refactor functional parameters Signed-off-by: Max Smythe <[email protected]> Fix tests; refactor options type visibility Signed-off-by: Max Smythe <[email protected]> More interface changes Signed-off-by: Max Smythe <[email protected]> Fix lint Signed-off-by: Max Smythe <[email protected]> Add ginkgo so test flags are recognized Signed-off-by: Max Smythe <[email protected]> Split out non-blocking-get options; block on cancel Signed-off-by: Max Smythe <[email protected]> Add tests for removing informers Signed-off-by: Max Smythe <[email protected]> Fix missing newline Signed-off-by: Max Smythe <[email protected]> Oops, no newline Signed-off-by: Max Smythe <[email protected]>
1 parent c20ea14 commit bc35946

File tree

8 files changed

+321
-6
lines changed

8 files changed

+321
-6
lines changed

pkg/cache/cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ type Informers interface {
8181
// of the underlying object.
8282
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)
8383

84+
// RemoveInformer removes an informer entry and stops it if it was running.
85+
RemoveInformer(ctx context.Context, obj client.Object) error
86+
8487
// Start runs all the informers known to this cache until the context is closed.
8588
// It blocks.
8689
Start(ctx context.Context) error
@@ -119,6 +122,8 @@ type Informer interface {
119122

120123
// HasSynced return true if the informers underlying store has synced.
121124
HasSynced() bool
125+
// IsStopped returns true if the informer has been stopped.
126+
IsStopped() bool
122127
}
123128

124129
// Options are the optional arguments for creating a new Cache object.

pkg/cache/cache_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,6 +1758,42 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
17581758
By("verifying the object is received on the channel")
17591759
Eventually(out).Should(Receive(Equal(pod)))
17601760
})
1761+
It("should be able to stop and restart informers", func() {
1762+
By("getting a shared index informer for a pod")
1763+
pod := &corev1.Pod{
1764+
ObjectMeta: metav1.ObjectMeta{
1765+
Name: "informer-obj",
1766+
Namespace: "default",
1767+
},
1768+
Spec: corev1.PodSpec{
1769+
Containers: []corev1.Container{
1770+
{
1771+
Name: "nginx",
1772+
Image: "nginx",
1773+
},
1774+
},
1775+
},
1776+
}
1777+
sii, err := informerCache.GetInformer(context.TODO(), pod)
1778+
Expect(err).NotTo(HaveOccurred())
1779+
Expect(sii).NotTo(BeNil())
1780+
Expect(sii.HasSynced()).To(BeTrue())
1781+
1782+
By("removing the existing informer")
1783+
Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed())
1784+
Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue())
1785+
1786+
By("recreating the informer")
1787+
1788+
sii2, err := informerCache.GetInformer(context.TODO(), pod)
1789+
Expect(err).NotTo(HaveOccurred())
1790+
Expect(sii2).NotTo(BeNil())
1791+
Expect(sii2.HasSynced()).To(BeTrue())
1792+
1793+
By("validating the two informers are in different states")
1794+
Expect(sii.IsStopped()).To(BeTrue())
1795+
Expect(sii2.IsStopped()).To(BeFalse())
1796+
})
17611797
It("should be able to get an informer by group/version/kind", func() {
17621798
By("getting an shared index informer for gvk = core/v1/pod")
17631799
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
@@ -1942,6 +1978,48 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
19421978
Eventually(out).Should(Receive(Equal(pod)))
19431979
})
19441980

1981+
It("should be able to stop and restart informers", func() {
1982+
By("getting a shared index informer for a pod")
1983+
pod := &unstructured.Unstructured{
1984+
Object: map[string]interface{}{
1985+
"spec": map[string]interface{}{
1986+
"containers": []map[string]interface{}{
1987+
{
1988+
"name": "nginx",
1989+
"image": "nginx",
1990+
},
1991+
},
1992+
},
1993+
},
1994+
}
1995+
pod.SetName("informer-obj2")
1996+
pod.SetNamespace("default")
1997+
pod.SetGroupVersionKind(schema.GroupVersionKind{
1998+
Group: "",
1999+
Version: "v1",
2000+
Kind: "Pod",
2001+
})
2002+
sii, err := informerCache.GetInformer(context.TODO(), pod)
2003+
Expect(err).NotTo(HaveOccurred())
2004+
Expect(sii).NotTo(BeNil())
2005+
Expect(sii.HasSynced()).To(BeTrue())
2006+
2007+
By("removing the existing informer")
2008+
Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed())
2009+
Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue())
2010+
2011+
By("recreating the informer")
2012+
2013+
sii2, err := informerCache.GetInformer(context.TODO(), pod)
2014+
Expect(err).NotTo(HaveOccurred())
2015+
Expect(sii2).NotTo(BeNil())
2016+
Expect(sii2.HasSynced()).To(BeTrue())
2017+
2018+
By("validating the two informers are in different states")
2019+
Expect(sii.IsStopped()).To(BeTrue())
2020+
Expect(sii2.IsStopped()).To(BeFalse())
2021+
})
2022+
19452023
It("should be able to index an object field then retrieve objects by that field", func() {
19462024
By("creating the cache")
19472025
informer, err := cache.New(cfg, cache.Options{})

pkg/cache/informer_cache.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,17 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou
190190
return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
191191
}
192192

193+
// RemoveInformer deactivates and removes the informer from the cache.
194+
func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error {
195+
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
196+
if err != nil {
197+
return err
198+
}
199+
200+
ic.Informers.Remove(gvk, obj)
201+
return nil
202+
}
203+
193204
// NeedLeaderElection implements the LeaderElectionRunnable interface
194205
// to indicate that this can be started without requiring the leader lock.
195206
func (ic *informerCache) NeedLeaderElection() bool {

pkg/cache/informertest/fake_cache.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object, opts
7373
return c.informerFor(gvk, obj)
7474
}
7575

76+
// RemoveInformer implements Informers.
77+
func (c *FakeInformers) RemoveInformer(ctx context.Context, obj client.Object) error {
78+
if c.Scheme == nil {
79+
c.Scheme = scheme.Scheme
80+
}
81+
gvks, _, err := c.Scheme.ObjectKinds(obj)
82+
if err != nil {
83+
return err
84+
}
85+
gvk := gvks[0]
86+
delete(c.InformersByGVK, gvk)
87+
return nil
88+
}
89+
7690
// WaitForCacheSync implements Informers.
7791
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
7892
if c.Synced == nil {

pkg/cache/internal/informers.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/client-go/rest"
3737
"k8s.io/client-go/tools/cache"
3838
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
39+
"sigs.k8s.io/controller-runtime/pkg/internal/syncs"
3940
)
4041

4142
// InformersOpts configures an InformerMap.
@@ -86,6 +87,20 @@ type Cache struct {
8687

8788
// CacheReader wraps Informer and implements the CacheReader interface for a single type
8889
Reader CacheReader
90+
91+
// Stop can be used to stop this individual informer.
92+
stop chan struct{}
93+
}
94+
95+
// Start starts the informer managed by a MapEntry.
96+
// Blocks until the informer stops. The informer can be stopped
97+
// either individually (via the entry's stop channel) or globally
98+
// via the provided stop argument.
99+
func (c *Cache) Start(stop <-chan struct{}) {
100+
// Stop on either the whole map stopping or just this informer being removed.
101+
internalStop, cancel := syncs.MergeChans(stop, c.stop)
102+
defer cancel()
103+
c.Informer.Run(internalStop)
89104
}
90105

91106
type tracker struct {
@@ -173,13 +188,13 @@ func (ip *Informers) Start(ctx context.Context) error {
173188

174189
// Start each informer
175190
for _, i := range ip.tracker.Structured {
176-
ip.startInformerLocked(i.Informer)
191+
ip.startInformerLocked(i)
177192
}
178193
for _, i := range ip.tracker.Unstructured {
179-
ip.startInformerLocked(i.Informer)
194+
ip.startInformerLocked(i)
180195
}
181196
for _, i := range ip.tracker.Metadata {
182-
ip.startInformerLocked(i.Informer)
197+
ip.startInformerLocked(i)
183198
}
184199

185200
// Set started to true so we immediately start any informers added later.
@@ -194,7 +209,7 @@ func (ip *Informers) Start(ctx context.Context) error {
194209
return nil
195210
}
196211

197-
func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
212+
func (ip *Informers) startInformerLocked(cacheEntry *Cache) {
198213
// Don't start the informer in case we are already waiting for the items in
199214
// the waitGroup to finish, since waitGroups don't support waiting and adding
200215
// at the same time.
@@ -205,7 +220,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
205220
ip.waitGroup.Add(1)
206221
go func() {
207222
defer ip.waitGroup.Done()
208-
informer.Run(ip.ctx.Done())
223+
cacheEntry.Start(ip.ctx.Done())
209224
}()
210225
}
211226

@@ -281,6 +296,21 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
281296
return started, i, nil
282297
}
283298

299+
// Remove removes an informer entry and stops it if it was running.
300+
func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
301+
ip.mu.Lock()
302+
defer ip.mu.Unlock()
303+
304+
informerMap := ip.informersByType(obj)
305+
306+
entry, ok := informerMap[gvk]
307+
if !ok {
308+
return
309+
}
310+
close(entry.stop)
311+
delete(informerMap, gvk)
312+
}
313+
284314
func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache {
285315
switch obj.(type) {
286316
case runtime.Unstructured:
@@ -342,13 +372,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
342372
scopeName: mapping.Scope.Name(),
343373
disableDeepCopy: ip.unsafeDisableDeepCopy,
344374
},
375+
stop: make(chan struct{}),
345376
}
346377
ip.informersByType(obj)[gvk] = i
347378

348379
// Start the informer in case the InformersMap has started, otherwise it will be
349380
// started when the InformersMap starts.
350381
if ip.started {
351-
ip.startInformerLocked(i.Informer)
382+
ip.startInformerLocked(i)
352383
}
353384
return i, ip.started, nil
354385
}

pkg/cache/multi_namespace_cache.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,27 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
108108
return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
109109
}
110110

111+
func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error {
112+
// If the object is clusterscoped, get the informer from clusterCache,
113+
// if not use the namespaced caches.
114+
isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
115+
if err != nil {
116+
return err
117+
}
118+
if !isNamespaced {
119+
return c.clusterCache.RemoveInformer(ctx, obj)
120+
}
121+
122+
for _, cache := range c.namespaceToCache {
123+
err := cache.RemoveInformer(ctx, obj)
124+
if err != nil {
125+
return err
126+
}
127+
}
128+
129+
return nil
130+
}
131+
111132
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
112133
// If the object is cluster scoped, get the informer from clusterCache,
113134
// if not use the namespaced caches.
@@ -387,3 +408,13 @@ func (i *multiNamespaceInformer) HasSynced() bool {
387408
}
388409
return true
389410
}
411+
412+
// IsStopped checks if each namespaced informer has stopped, returns false if any are still running.
413+
func (i *multiNamespaceInformer) IsStopped() bool {
414+
for _, informer := range i.namespaceToInformer {
415+
if stopped := informer.IsStopped(); !stopped {
416+
return false
417+
}
418+
}
419+
return true
420+
}

pkg/internal/syncs/syncs.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package syncs
2+
3+
import (
4+
"context"
5+
"reflect"
6+
"sync"
7+
)
8+
9+
// MergeChans returns a channel that is closed when any of the input channels are signaled.
10+
// The caller must call the returned CancelFunc to ensure no resources are leaked.
11+
func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) {
12+
var once sync.Once
13+
out := make(chan T)
14+
cancel := make(chan T)
15+
cancelFunc := func() {
16+
once.Do(func() {
17+
close(cancel)
18+
})
19+
<-out
20+
}
21+
cases := make([]reflect.SelectCase, len(chans)+1)
22+
for i := range chans {
23+
cases[i] = reflect.SelectCase{
24+
Dir: reflect.SelectRecv,
25+
Chan: reflect.ValueOf(chans[i]),
26+
}
27+
}
28+
cases[len(cases)-1] = reflect.SelectCase{
29+
Dir: reflect.SelectRecv,
30+
Chan: reflect.ValueOf(cancel),
31+
}
32+
go func() {
33+
defer close(out)
34+
_, _, _ = reflect.Select(cases)
35+
}()
36+
37+
return out, cancelFunc
38+
}

0 commit comments

Comments
 (0)