Skip to content

Commit e42b9b4

Browse files
shomronkevindelgado
authored andcommitted
Allow removing individual informers from the cache (#935)
This change adds support for removing individual informers from the cache using the new Remove() method. This is allowed before or after the cache has been started. Informers are stopped at the time of removal - once stopped, they will no longer deliver events to registered event handlers, and registered watched will be aborted. Also adds non-blocking API for getting informers without waiting for their cache to sync - GetInformerNonBlocking(). Signed-off-by: Oren Shomron <[email protected]>
1 parent 6af4e7c commit e42b9b4

File tree

7 files changed

+246
-6
lines changed

7 files changed

+246
-6
lines changed

pkg/cache/cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
126126
// Construct a new Mapper if unset
127127
if opts.Mapper == nil {
128128
var err error
129-
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config)
129+
opts.Mapper, err = apiutil.NewDynamicRESTMapper(config)
130130
if err != nil {
131131
log.WithName("setup").Error(err, "Failed to get API Group-Resources")
132132
return opts, fmt.Errorf("could not create RESTMapper from config")

pkg/cache/informer_cache.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"reflect"
2323
"strings"
2424

25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2526
apimeta "k8s.io/apimachinery/pkg/api/meta"
2627
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2728
"k8s.io/apimachinery/pkg/runtime"
@@ -159,6 +160,24 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj runtime.Object) (I
159160
return i.Informer, err
160161
}
161162

163+
// GetInformerNonBlocking returns the informer for the obj without waiting for its cache to sync.
164+
func (ip *informerCache) GetInformerNonBlocking(obj runtime.Object) (Informer, error) {
165+
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
// Use a cancelled context to signal non-blocking
171+
ctx, cancel := context.WithCancel(context.Background())
172+
cancel()
173+
174+
_, i, err := ip.InformersMap.Get(ctx, gvk, obj)
175+
if err != nil && !apierrors.IsTimeout(err) {
176+
return nil, err
177+
}
178+
return i.Informer, nil
179+
}
180+
162181
// NeedLeaderElection implements the LeaderElectionRunnable interface
163182
// to indicate that this can be started without requiring the leader lock
164183
func (ip *informerCache) NeedLeaderElection() bool {
@@ -216,3 +235,14 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc)
216235

217236
return indexer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
218237
}
238+
239+
// Remove removes an informer specified by the obj argument from the cache and stops it if it existed.
240+
func (ip *informerCache) Remove(obj runtime.Object) error {
241+
gvk, err := apiutil.GVKForObject(obj, ip.Scheme)
242+
if err != nil {
243+
return err
244+
}
245+
246+
ip.InformersMap.Remove(gvk, obj)
247+
return nil
248+
}

pkg/cache/internal/deleg_map.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,20 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
9292
return m.structured.Get(ctx, gvk, obj)
9393
}
9494

95+
// Remove will remove an new Informer from the InformersMap and stop it if it exists.
96+
func (m *InformersMap) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
97+
_, isUnstructured := obj.(*unstructured.Unstructured)
98+
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
99+
isUnstructured = isUnstructured || isUnstructuredList
100+
101+
switch {
102+
case isUnstructured:
103+
m.unstructured.Remove(gvk)
104+
default:
105+
m.structured.Remove(gvk)
106+
}
107+
}
108+
95109
// newStructuredInformersMap creates a new InformersMap for structured objects.
96110
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
97111
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)

pkg/cache/internal/informers_map.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ type MapEntry struct {
7070

7171
// CacheReader wraps Informer and implements the CacheReader interface for a single type
7272
Reader CacheReader
73+
74+
// Stop can be used to stop this individual informer without stopping the entire specificInformersMap.
75+
stop chan struct{}
7376
}
7477

7578
// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
@@ -121,6 +124,17 @@ type specificInformersMap struct {
121124
namespace string
122125
}
123126

127+
// Start starts the informer managed by a MapEntry.
128+
// Blocks until the informer stops. The informer can be stopped
129+
// either individually (via the entry's stop channel) or globally
130+
// via the provided stop argument.
131+
func (e *MapEntry) Start(stop <-chan struct{}) {
132+
// Stop on either the whole map stopping or just this informer being removed.
133+
internalStop, cancel := anyOf(stop, e.stop)
134+
defer cancel()
135+
e.Informer.Run(internalStop)
136+
}
137+
124138
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
125139
// It doesn't return start because it can't return an error, and it's not a runnable directly.
126140
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
@@ -132,8 +146,8 @@ func (ip *specificInformersMap) Start(stop <-chan struct{}) {
132146
ip.stop = stop
133147

134148
// Start each informer
135-
for _, informer := range ip.informersByGVK {
136-
go informer.Informer.Run(stop)
149+
for _, entry := range ip.informersByGVK {
150+
go entry.Start(stop)
137151
}
138152

139153
// Set started to true so we immediately start any informers added later.
@@ -183,8 +197,12 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion
183197

184198
if started && !i.Informer.HasSynced() {
185199
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
186-
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
187-
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
200+
// Cancel for context, informer stopping, or entire map stopping.
201+
syncStop, cancel := mergeChan(ctx.Done(), i.stop, ip.stop)
202+
defer cancel()
203+
if !cache.WaitForCacheSync(syncStop, i.Informer.HasSynced) {
204+
// Return entry even on timeout - caller may have intended a non-blocking fetch.
205+
return started, i, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
188206
}
189207
}
190208

@@ -214,18 +232,32 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
214232
i := &MapEntry{
215233
Informer: ni,
216234
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
235+
stop: make(chan struct{}),
217236
}
218237
ip.informersByGVK[gvk] = i
219238

220239
// Start the Informer if need by
221240
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
222241
// can you add eventhandlers?
223242
if ip.started {
224-
go i.Informer.Run(ip.stop)
243+
go i.Start(ip.stop)
225244
}
226245
return i, ip.started, nil
227246
}
228247

248+
// Remove removes an informer entry and stops it if it was running.
249+
func (ip *specificInformersMap) Remove(gvk schema.GroupVersionKind) {
250+
ip.mu.Lock()
251+
defer ip.mu.Unlock()
252+
253+
entry, ok := ip.informersByGVK[gvk]
254+
if !ok {
255+
return
256+
}
257+
close(entry.stop)
258+
delete(ip.informersByGVK, gvk)
259+
}
260+
229261
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
230262
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
231263
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package internal
2+
3+
import (
4+
"testing"
5+
6+
. "github.com/onsi/ginkgo"
7+
. "github.com/onsi/gomega"
8+
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
9+
)
10+
11+
func TestCacheInternal(t *testing.T) {
12+
RegisterFailHandler(Fail)
13+
RunSpecsWithDefaultAndCustomReporters(t, "Cache Internal Suite", []Reporter{printer.NewlineReporter{}})
14+
}

pkg/cache/internal/sync.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
// anyOf returns a "done" channel that is closed when any of its input channels
9+
// are closed or when the retuned cancel function is called, whichever comes first.
10+
//
11+
// The cancel function should always be called by the caller to ensure
12+
// resources are properly released.
13+
func anyOf(ch ...<-chan struct{}) (<-chan struct{}, context.CancelFunc) {
14+
var once sync.Once
15+
cancel := make(chan struct{})
16+
cancelFunc := func() {
17+
once.Do(func() {
18+
close(cancel)
19+
})
20+
}
21+
return anyInternal(append(ch, cancel)...), cancelFunc
22+
}
23+
24+
func anyInternal(ch ...<-chan struct{}) <-chan struct{} {
25+
switch len(ch) {
26+
case 0:
27+
return nil
28+
case 1:
29+
return ch[0]
30+
}
31+
32+
done := make(chan struct{})
33+
go func() {
34+
defer close(done)
35+
36+
switch len(ch) {
37+
case 2:
38+
// This case saves a recursion + goroutine when there are exactly 2 channels.
39+
select {
40+
case <-ch[0]:
41+
case <-ch[1]:
42+
}
43+
default:
44+
// >=3 channels to merge
45+
select {
46+
case <-ch[0]:
47+
case <-ch[1]:
48+
case <-ch[2]:
49+
case <-anyInternal(append(ch[3:], done)...):
50+
}
51+
}
52+
}()
53+
54+
return done
55+
}
56+
57+
// mergeChan returns a channel that is closed when any of the input channels are signaled.
58+
// The caller must call the returned CancelFunc to ensure no resources are leaked.
59+
func mergeChan(a, b, c <-chan struct{}) (<-chan struct{}, context.CancelFunc) {
60+
var once sync.Once
61+
out := make(chan struct{})
62+
cancel := make(chan struct{})
63+
cancelFunc := func() {
64+
once.Do(func() {
65+
close(cancel)
66+
})
67+
}
68+
go func() {
69+
defer close(out)
70+
select {
71+
case <-a:
72+
case <-b:
73+
case <-c:
74+
case <-cancel:
75+
}
76+
}()
77+
78+
return out, cancelFunc
79+
}

pkg/cache/internal/sync_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
. "github.com/onsi/ginkgo"
9+
)
10+
11+
var _ = Describe("anyOf", func() {
12+
// Generate contexts for different number of input channels
13+
for n := 0; n < 4; n++ {
14+
n := n
15+
Context(fmt.Sprintf("with %d channels", n), func() {
16+
var (
17+
channels []chan struct{}
18+
done <-chan struct{}
19+
cancel context.CancelFunc
20+
)
21+
BeforeEach(func() {
22+
channels = make([]chan struct{}, n)
23+
in := make([]<-chan struct{}, n)
24+
for i := 0; i < n; i++ {
25+
ch := make(chan struct{})
26+
channels[i] = ch
27+
in[i] = ch
28+
}
29+
done, cancel = anyOf(in...)
30+
})
31+
AfterEach(func() {
32+
cancel()
33+
})
34+
35+
It("isn't closed initially", func() {
36+
select {
37+
case <-done:
38+
Fail("done was closed before cancel")
39+
case <-time.After(5 * time.Millisecond):
40+
// Ok.
41+
}
42+
})
43+
44+
// Verify that done is closed when we call cancel explicitly.
45+
It("closes when cancelled", func() {
46+
cancel()
47+
select {
48+
case <-done:
49+
// Ok.
50+
case <-time.After(5 * time.Millisecond):
51+
Fail("timed out waiting for cancel")
52+
}
53+
})
54+
55+
// Generate test cases for closing each individual channel.
56+
// Verify that done is closed in response.
57+
for i := 0; i < n; i++ {
58+
i := i
59+
It(fmt.Sprintf("closes when channel %d is closed", i), func() {
60+
close(channels[i])
61+
select {
62+
case <-done:
63+
// Ok.
64+
case <-time.After(5 * time.Millisecond):
65+
Fail("timed out waiting for cancel")
66+
}
67+
})
68+
}
69+
})
70+
}
71+
})

0 commit comments

Comments
 (0)