Skip to content

Commit cc2a25b

Browse files
authored
Merge pull request #2285 from maxsmythe/dynamic-informer-cache
🌱 Proposal for dynamic informer cache
2 parents 15d7928 + f4dbd14 commit cc2a25b

File tree

9 files changed

+329
-6
lines changed

9 files changed

+329
-6
lines changed

pkg/cache/cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ type Informers interface {
8383
// of the underlying object.
8484
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)
8585

86+
// RemoveInformer removes an informer entry and stops it if it was running.
87+
RemoveInformer(ctx context.Context, obj client.Object) error
88+
8689
// Start runs all the informers known to this cache until the context is closed.
8790
// It blocks.
8891
Start(ctx context.Context) error
@@ -121,6 +124,8 @@ type Informer interface {
121124

122125
// HasSynced return true if the informers underlying store has synced.
123126
HasSynced() bool
127+
// IsStopped returns true if the informer has been stopped.
128+
IsStopped() bool
124129
}
125130

126131
// AllNamespaces should be used as the map key to deliminate namespace settings

pkg/cache/cache_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1887,6 +1887,42 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
18871887
By("verifying the object is received on the channel")
18881888
Eventually(out).Should(Receive(Equal(pod)))
18891889
})
1890+
It("should be able to stop and restart informers", func() {
1891+
By("getting a shared index informer for a pod")
1892+
pod := &corev1.Pod{
1893+
ObjectMeta: metav1.ObjectMeta{
1894+
Name: "informer-obj",
1895+
Namespace: "default",
1896+
},
1897+
Spec: corev1.PodSpec{
1898+
Containers: []corev1.Container{
1899+
{
1900+
Name: "nginx",
1901+
Image: "nginx",
1902+
},
1903+
},
1904+
},
1905+
}
1906+
sii, err := informerCache.GetInformer(context.TODO(), pod)
1907+
Expect(err).NotTo(HaveOccurred())
1908+
Expect(sii).NotTo(BeNil())
1909+
Expect(sii.HasSynced()).To(BeTrue())
1910+
1911+
By("removing the existing informer")
1912+
Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed())
1913+
Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue())
1914+
1915+
By("recreating the informer")
1916+
1917+
sii2, err := informerCache.GetInformer(context.TODO(), pod)
1918+
Expect(err).NotTo(HaveOccurred())
1919+
Expect(sii2).NotTo(BeNil())
1920+
Expect(sii2.HasSynced()).To(BeTrue())
1921+
1922+
By("validating the two informers are in different states")
1923+
Expect(sii.IsStopped()).To(BeTrue())
1924+
Expect(sii2.IsStopped()).To(BeFalse())
1925+
})
18901926
It("should be able to get an informer by group/version/kind", func() {
18911927
By("getting an shared index informer for gvk = core/v1/pod")
18921928
gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
@@ -2116,6 +2152,48 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
21162152
Eventually(out).Should(Receive(Equal(pod)))
21172153
})
21182154

2155+
It("should be able to stop and restart informers", func() {
2156+
By("getting a shared index informer for a pod")
2157+
pod := &unstructured.Unstructured{
2158+
Object: map[string]interface{}{
2159+
"spec": map[string]interface{}{
2160+
"containers": []map[string]interface{}{
2161+
{
2162+
"name": "nginx",
2163+
"image": "nginx",
2164+
},
2165+
},
2166+
},
2167+
},
2168+
}
2169+
pod.SetName("informer-obj2")
2170+
pod.SetNamespace("default")
2171+
pod.SetGroupVersionKind(schema.GroupVersionKind{
2172+
Group: "",
2173+
Version: "v1",
2174+
Kind: "Pod",
2175+
})
2176+
sii, err := informerCache.GetInformer(context.TODO(), pod)
2177+
Expect(err).NotTo(HaveOccurred())
2178+
Expect(sii).NotTo(BeNil())
2179+
Expect(sii.HasSynced()).To(BeTrue())
2180+
2181+
By("removing the existing informer")
2182+
Expect(informerCache.RemoveInformer(context.TODO(), pod)).To(Succeed())
2183+
Eventually(sii.IsStopped).WithTimeout(5 * time.Second).Should(BeTrue())
2184+
2185+
By("recreating the informer")
2186+
2187+
sii2, err := informerCache.GetInformer(context.TODO(), pod)
2188+
Expect(err).NotTo(HaveOccurred())
2189+
Expect(sii2).NotTo(BeNil())
2190+
Expect(sii2.HasSynced()).To(BeTrue())
2191+
2192+
By("validating the two informers are in different states")
2193+
Expect(sii.IsStopped()).To(BeTrue())
2194+
Expect(sii2.IsStopped()).To(BeFalse())
2195+
})
2196+
21192197
It("should be able to index an object field then retrieve objects by that field", func() {
21202198
By("creating the cache")
21212199
informer, err := cache.New(cfg, cache.Options{})

pkg/cache/delegating_by_gvk_cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis
5252
return cache.List(ctx, list, opts...)
5353
}
5454

55+
func (dbt *delegatingByGVKCache) RemoveInformer(ctx context.Context, obj client.Object) error {
56+
cache, err := dbt.cacheForObject(obj)
57+
if err != nil {
58+
return err
59+
}
60+
return cache.RemoveInformer(ctx, obj)
61+
}
62+
5563
func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
5664
cache, err := dbt.cacheForObject(obj)
5765
if err != nil {

pkg/cache/informer_cache.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,17 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou
190190
return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
191191
}
192192

193+
// RemoveInformer deactivates and removes the informer from the cache.
194+
func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error {
195+
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
196+
if err != nil {
197+
return err
198+
}
199+
200+
ic.Informers.Remove(gvk, obj)
201+
return nil
202+
}
203+
193204
// NeedLeaderElection implements the LeaderElectionRunnable interface
194205
// to indicate that this can be started without requiring the leader lock.
195206
func (ic *informerCache) NeedLeaderElection() bool {

pkg/cache/informertest/fake_cache.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object, opts
7373
return c.informerFor(gvk, obj)
7474
}
7575

76+
// RemoveInformer implements Informers.
77+
func (c *FakeInformers) RemoveInformer(ctx context.Context, obj client.Object) error {
78+
if c.Scheme == nil {
79+
c.Scheme = scheme.Scheme
80+
}
81+
gvks, _, err := c.Scheme.ObjectKinds(obj)
82+
if err != nil {
83+
return err
84+
}
85+
gvk := gvks[0]
86+
delete(c.InformersByGVK, gvk)
87+
return nil
88+
}
89+
7690
// WaitForCacheSync implements Informers.
7791
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
7892
if c.Synced == nil {

pkg/cache/internal/informers.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/client-go/rest"
3737
"k8s.io/client-go/tools/cache"
3838
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
39+
"sigs.k8s.io/controller-runtime/pkg/internal/syncs"
3940
)
4041

4142
// InformersOpts configures an InformerMap.
@@ -88,6 +89,20 @@ type Cache struct {
8889

8990
// CacheReader wraps Informer and implements the CacheReader interface for a single type
9091
Reader CacheReader
92+
93+
// Stop can be used to stop this individual informer.
94+
stop chan struct{}
95+
}
96+
97+
// Start starts the informer managed by a MapEntry.
98+
// Blocks until the informer stops. The informer can be stopped
99+
// either individually (via the entry's stop channel) or globally
100+
// via the provided stop argument.
101+
func (c *Cache) Start(stop <-chan struct{}) {
102+
// Stop on either the whole map stopping or just this informer being removed.
103+
internalStop, cancel := syncs.MergeChans(stop, c.stop)
104+
defer cancel()
105+
c.Informer.Run(internalStop)
91106
}
92107

93108
type tracker struct {
@@ -180,13 +195,13 @@ func (ip *Informers) Start(ctx context.Context) error {
180195

181196
// Start each informer
182197
for _, i := range ip.tracker.Structured {
183-
ip.startInformerLocked(i.Informer)
198+
ip.startInformerLocked(i)
184199
}
185200
for _, i := range ip.tracker.Unstructured {
186-
ip.startInformerLocked(i.Informer)
201+
ip.startInformerLocked(i)
187202
}
188203
for _, i := range ip.tracker.Metadata {
189-
ip.startInformerLocked(i.Informer)
204+
ip.startInformerLocked(i)
190205
}
191206

192207
// Set started to true so we immediately start any informers added later.
@@ -201,7 +216,7 @@ func (ip *Informers) Start(ctx context.Context) error {
201216
return nil
202217
}
203218

204-
func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
219+
func (ip *Informers) startInformerLocked(cacheEntry *Cache) {
205220
// Don't start the informer in case we are already waiting for the items in
206221
// the waitGroup to finish, since waitGroups don't support waiting and adding
207222
// at the same time.
@@ -212,7 +227,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
212227
ip.waitGroup.Add(1)
213228
go func() {
214229
defer ip.waitGroup.Done()
215-
informer.Run(ip.ctx.Done())
230+
cacheEntry.Start(ip.ctx.Done())
216231
}()
217232
}
218233

@@ -288,6 +303,21 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
288303
return started, i, nil
289304
}
290305

306+
// Remove removes an informer entry and stops it if it was running.
307+
func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
308+
ip.mu.Lock()
309+
defer ip.mu.Unlock()
310+
311+
informerMap := ip.informersByType(obj)
312+
313+
entry, ok := informerMap[gvk]
314+
if !ok {
315+
return
316+
}
317+
close(entry.stop)
318+
delete(informerMap, gvk)
319+
}
320+
291321
func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache {
292322
switch obj.(type) {
293323
case runtime.Unstructured:
@@ -356,13 +386,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
356386
scopeName: mapping.Scope.Name(),
357387
disableDeepCopy: ip.unsafeDisableDeepCopy,
358388
},
389+
stop: make(chan struct{}),
359390
}
360391
ip.informersByType(obj)[gvk] = i
361392

362393
// Start the informer in case the InformersMap has started, otherwise it will be
363394
// started when the InformersMap starts.
364395
if ip.started {
365-
ip.startInformerLocked(i.Informer)
396+
ip.startInformerLocked(i)
366397
}
367398
return i, ip.started, nil
368399
}

pkg/cache/multi_namespace_cache.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,27 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
109109
return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
110110
}
111111

112+
func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error {
113+
// If the object is clusterscoped, get the informer from clusterCache,
114+
// if not use the namespaced caches.
115+
isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
116+
if err != nil {
117+
return err
118+
}
119+
if !isNamespaced {
120+
return c.clusterCache.RemoveInformer(ctx, obj)
121+
}
122+
123+
for _, cache := range c.namespaceToCache {
124+
err := cache.RemoveInformer(ctx, obj)
125+
if err != nil {
126+
return err
127+
}
128+
}
129+
130+
return nil
131+
}
132+
112133
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
113134
// If the object is cluster scoped, get the informer from clusterCache,
114135
// if not use the namespaced caches.
@@ -391,3 +412,13 @@ func (i *multiNamespaceInformer) HasSynced() bool {
391412
}
392413
return true
393414
}
415+
416+
// IsStopped checks if each namespaced informer has stopped, returns false if any are still running.
417+
func (i *multiNamespaceInformer) IsStopped() bool {
418+
for _, informer := range i.namespaceToInformer {
419+
if stopped := informer.IsStopped(); !stopped {
420+
return false
421+
}
422+
}
423+
return true
424+
}

pkg/internal/syncs/syncs.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package syncs
2+
3+
import (
4+
"context"
5+
"reflect"
6+
"sync"
7+
)
8+
9+
// MergeChans returns a channel that is closed when any of the input channels are signaled.
10+
// The caller must call the returned CancelFunc to ensure no resources are leaked.
11+
func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) {
12+
var once sync.Once
13+
out := make(chan T)
14+
cancel := make(chan T)
15+
cancelFunc := func() {
16+
once.Do(func() {
17+
close(cancel)
18+
})
19+
<-out
20+
}
21+
cases := make([]reflect.SelectCase, len(chans)+1)
22+
for i := range chans {
23+
cases[i] = reflect.SelectCase{
24+
Dir: reflect.SelectRecv,
25+
Chan: reflect.ValueOf(chans[i]),
26+
}
27+
}
28+
cases[len(cases)-1] = reflect.SelectCase{
29+
Dir: reflect.SelectRecv,
30+
Chan: reflect.ValueOf(cancel),
31+
}
32+
go func() {
33+
defer close(out)
34+
_, _, _ = reflect.Select(cases)
35+
}()
36+
37+
return out, cancelFunc
38+
}

0 commit comments

Comments
 (0)