Skip to content

Commit e466b38

Browse files
committed
use correct context to cancel list and watch & wait for all informers to complete
Signed-off-by: Tim Ramlot <[email protected]>
1 parent b756161 commit e466b38

File tree

1 file changed

+49
-25
lines changed

1 file changed

+49
-25
lines changed

pkg/cache/internal/informers_map.go

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ type InformersMap struct {
117117
// paramCodec is used by list and watch
118118
paramCodec runtime.ParameterCodec
119119

120-
// stop is the stop channel to stop informers
121-
stop <-chan struct{}
122-
123120
// resync is the base frequency the informers are resynced
124121
// a 10 percent jitter will be added to the resync period between informers
125122
// so that all informers will not send list requests simultaneously.
@@ -128,13 +125,22 @@ type InformersMap struct {
128125
// mu guards access to the map
129126
mu sync.RWMutex
130127

131-
// start is true if the informers have been started
128+
// started is true if the informers have been started
132129
started bool
133130

134131
// startWait is a channel that is closed after the
135132
// informer has been started.
136133
startWait chan struct{}
137134

135+
// waitGroup is the wait group that is used to wait for all informers to stop
136+
waitGroup sync.WaitGroup
137+
138+
// stopped is true if the informers have been stopped
139+
stopped bool
140+
141+
// ctx is the context to stop informers
142+
ctx context.Context
143+
138144
// namespace is the namespace that all ListWatches are restricted to
139145
// default or empty string means all namespaces
140146
namespace string
@@ -158,24 +164,42 @@ func (ip *InformersMap) Start(ctx context.Context) error {
158164
defer ip.mu.Unlock()
159165

160166
// Set the stop channel so it can be passed to informers that are added later
161-
ip.stop = ctx.Done()
167+
ip.ctx = ctx
168+
169+
ip.waitGroup.Add(len(ip.informers.Structured) + len(ip.informers.Unstructured) + len(ip.informers.Metadata))
162170

163171
// Start each informer
164172
for _, i := range ip.informers.Structured {
165-
go i.Informer.Run(ctx.Done())
173+
i := i
174+
go func() {
175+
defer ip.waitGroup.Done()
176+
i.Informer.Run(ctx.Done())
177+
}()
166178
}
167179
for _, i := range ip.informers.Unstructured {
168-
go i.Informer.Run(ctx.Done())
180+
i := i
181+
go func() {
182+
defer ip.waitGroup.Done()
183+
i.Informer.Run(ctx.Done())
184+
}()
169185
}
170186
for _, i := range ip.informers.Metadata {
171-
go i.Informer.Run(ctx.Done())
187+
i := i
188+
go func() {
189+
defer ip.waitGroup.Done()
190+
i.Informer.Run(ctx.Done())
191+
}()
172192
}
173193

174194
// Set started to true so we immediately start any informers added later.
175195
ip.started = true
176196
close(ip.startWait)
177197
}()
178-
<-ctx.Done()
198+
<-ctx.Done() // Block until the context is done
199+
ip.mu.Lock()
200+
ip.stopped = true // Set stopped to true so we don't start any new informers
201+
ip.mu.Unlock()
202+
ip.waitGroup.Wait() // Block until all informers have stopped
179203
return nil
180204
}
181205

@@ -310,17 +334,17 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
310334
// Start the Informer if need by
311335
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
312336
// can you add eventhandlers?
313-
if ip.started {
314-
go i.Informer.Run(ip.stop)
337+
if ip.started && !ip.stopped {
338+
ip.waitGroup.Add(1)
339+
go func() {
340+
defer ip.waitGroup.Done()
341+
i.Informer.Run(ip.ctx.Done())
342+
}()
315343
}
316344
return i, ip.started, nil
317345
}
318346

319347
func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) {
320-
// TODO(vincepri): Wire the context in here and don't use TODO().
321-
// Can we use the context from the Get call?
322-
ctx := context.TODO()
323-
324348
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
325349
// groupVersionKind to the Resource API we will use.
326350
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
@@ -351,16 +375,16 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
351375
return &cache.ListWatch{
352376
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
353377
if namespace != "" {
354-
return resources.Namespace(namespace).List(ctx, opts)
378+
return resources.Namespace(namespace).List(ip.ctx, opts)
355379
}
356-
return resources.List(ctx, opts)
380+
return resources.List(ip.ctx, opts)
357381
},
358382
// Setup the watch function
359383
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
360384
if namespace != "" {
361-
return resources.Namespace(namespace).Watch(ctx, opts)
385+
return resources.Namespace(namespace).Watch(ip.ctx, opts)
362386
}
363-
return resources.Watch(ctx, opts)
387+
return resources.Watch(ip.ctx, opts)
364388
},
365389
}, nil
366390
//
@@ -386,9 +410,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
386410
err error
387411
)
388412
if namespace != "" {
389-
list, err = resources.Namespace(namespace).List(ctx, opts)
413+
list, err = resources.Namespace(namespace).List(ip.ctx, opts)
390414
} else {
391-
list, err = resources.List(ctx, opts)
415+
list, err = resources.List(ip.ctx, opts)
392416
}
393417
if list != nil {
394418
for i := range list.Items {
@@ -400,9 +424,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
400424
// Setup the watch function
401425
WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) {
402426
if namespace != "" {
403-
watcher, err = resources.Namespace(namespace).Watch(ctx, opts)
427+
watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts)
404428
} else {
405-
watcher, err = resources.Watch(ctx, opts)
429+
watcher, err = resources.Watch(ip.ctx, opts)
406430
}
407431
if err != nil {
408432
return nil, err
@@ -433,7 +457,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
433457

434458
// Create the resulting object, and execute the request.
435459
res := listObj.DeepCopyObject()
436-
if err := req.Do(ctx).Into(res); err != nil {
460+
if err := req.Do(ip.ctx).Into(res); err != nil {
437461
return nil, err
438462
}
439463
return res, nil
@@ -446,7 +470,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
446470
req.Namespace(namespace)
447471
}
448472
// Call the watch.
449-
return req.Watch(ctx)
473+
return req.Watch(ip.ctx)
450474
},
451475
}, nil
452476
}

0 commit comments

Comments
 (0)