Skip to content

Commit 9a2f037

Browse files
committed
use correct context to cancel list and watch & wait for all informers to complete
Signed-off-by: Tim Ramlot <[email protected]>
1 parent b756161 commit 9a2f037

File tree

1 file changed

+52
-24
lines changed

1 file changed

+52
-24
lines changed

pkg/cache/internal/informers_map.go

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ type InformersMap struct {
117117
// paramCodec is used by list and watch
118118
paramCodec runtime.ParameterCodec
119119

120-
// stop is the stop channel to stop informers
121-
stop <-chan struct{}
122-
123120
// resync is the base frequency the informers are resynced
124121
// a 10 percent jitter will be added to the resync period between informers
125122
// so that all informers will not send list requests simultaneously.
@@ -128,13 +125,22 @@ type InformersMap struct {
128125
// mu guards access to the map
129126
mu sync.RWMutex
130127

131-
// start is true if the informers have been started
128+
// started is true if the informers have been started
132129
started bool
133130

134131
// startWait is a channel that is closed after the
135132
// informer has been started.
136133
startWait chan struct{}
137134

135+
// waitGroup is the wait group that is used to wait for all informers to stop
136+
waitGroup sync.WaitGroup
137+
138+
// stopped is true if the informers have been stopped
139+
stopped bool
140+
141+
// ctx is the context to stop informers
142+
ctx context.Context
143+
138144
// namespace is the namespace that all ListWatches are restricted to
139145
// default or empty string means all namespaces
140146
namespace string
@@ -158,24 +164,42 @@ func (ip *InformersMap) Start(ctx context.Context) error {
158164
defer ip.mu.Unlock()
159165

160166
// Set the stop channel so it can be passed to informers that are added later
161-
ip.stop = ctx.Done()
167+
ip.ctx = ctx
168+
169+
ip.waitGroup.Add(len(ip.informers.Structured) + len(ip.informers.Unstructured) + len(ip.informers.Metadata))
162170

163171
// Start each informer
164172
for _, i := range ip.informers.Structured {
165-
go i.Informer.Run(ctx.Done())
173+
i := i
174+
go func() {
175+
defer ip.waitGroup.Done()
176+
i.Informer.Run(ctx.Done())
177+
}()
166178
}
167179
for _, i := range ip.informers.Unstructured {
168-
go i.Informer.Run(ctx.Done())
180+
i := i
181+
go func() {
182+
defer ip.waitGroup.Done()
183+
i.Informer.Run(ctx.Done())
184+
}()
169185
}
170186
for _, i := range ip.informers.Metadata {
171-
go i.Informer.Run(ctx.Done())
187+
i := i
188+
go func() {
189+
defer ip.waitGroup.Done()
190+
i.Informer.Run(ctx.Done())
191+
}()
172192
}
173193

174194
// Set started to true so we immediately start any informers added later.
175195
ip.started = true
176196
close(ip.startWait)
177197
}()
178-
<-ctx.Done()
198+
<-ctx.Done() // Block until the context is done
199+
ip.mu.Lock()
200+
ip.stopped = true // Set stopped to true so we don't start any new informers
201+
ip.mu.Unlock()
202+
ip.waitGroup.Wait() // Block until all informers have stopped
179203
return nil
180204
}
181205

@@ -260,6 +284,10 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
260284
ip.mu.Lock()
261285
defer ip.mu.Unlock()
262286

287+
if ip.stopped {
288+
return nil, false, fmt.Errorf("cannot add informer to stopped InformersMap (GVK: %v, obj: %T)", gvk, obj)
289+
}
290+
263291
// Check the cache to see if we already have an Informer. If we do, return the Informer.
264292
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
265293
// so neither returned early, but the first one created it.
@@ -311,16 +339,16 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
311339
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
312340
// can you add eventhandlers?
313341
if ip.started {
314-
go i.Informer.Run(ip.stop)
342+
ip.waitGroup.Add(1)
343+
go func() {
344+
defer ip.waitGroup.Done()
345+
i.Informer.Run(ip.ctx.Done())
346+
}()
315347
}
316348
return i, ip.started, nil
317349
}
318350

319351
func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) {
320-
// TODO(vincepri): Wire the context in here and don't use TODO().
321-
// Can we use the context from the Get call?
322-
ctx := context.TODO()
323-
324352
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
325353
// groupVersionKind to the Resource API we will use.
326354
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
@@ -351,16 +379,16 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
351379
return &cache.ListWatch{
352380
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
353381
if namespace != "" {
354-
return resources.Namespace(namespace).List(ctx, opts)
382+
return resources.Namespace(namespace).List(ip.ctx, opts)
355383
}
356-
return resources.List(ctx, opts)
384+
return resources.List(ip.ctx, opts)
357385
},
358386
// Setup the watch function
359387
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
360388
if namespace != "" {
361-
return resources.Namespace(namespace).Watch(ctx, opts)
389+
return resources.Namespace(namespace).Watch(ip.ctx, opts)
362390
}
363-
return resources.Watch(ctx, opts)
391+
return resources.Watch(ip.ctx, opts)
364392
},
365393
}, nil
366394
//
@@ -386,9 +414,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
386414
err error
387415
)
388416
if namespace != "" {
389-
list, err = resources.Namespace(namespace).List(ctx, opts)
417+
list, err = resources.Namespace(namespace).List(ip.ctx, opts)
390418
} else {
391-
list, err = resources.List(ctx, opts)
419+
list, err = resources.List(ip.ctx, opts)
392420
}
393421
if list != nil {
394422
for i := range list.Items {
@@ -400,9 +428,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
400428
// Setup the watch function
401429
WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) {
402430
if namespace != "" {
403-
watcher, err = resources.Namespace(namespace).Watch(ctx, opts)
431+
watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts)
404432
} else {
405-
watcher, err = resources.Watch(ctx, opts)
433+
watcher, err = resources.Watch(ip.ctx, opts)
406434
}
407435
if err != nil {
408436
return nil, err
@@ -433,7 +461,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
433461

434462
// Create the resulting object, and execute the request.
435463
res := listObj.DeepCopyObject()
436-
if err := req.Do(ctx).Into(res); err != nil {
464+
if err := req.Do(ip.ctx).Into(res); err != nil {
437465
return nil, err
438466
}
439467
return res, nil
@@ -446,7 +474,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
446474
req.Namespace(namespace)
447475
}
448476
// Call the watch.
449-
return req.Watch(ctx)
477+
return req.Watch(ip.ctx)
450478
},
451479
}, nil
452480
}

0 commit comments

Comments
 (0)