@@ -163,32 +163,18 @@ func (ip *InformersMap) Start(ctx context.Context) error {
163
163
ip .mu .Lock ()
164
164
defer ip .mu .Unlock ()
165
165
166
- // Set the stop channel so it can be passed to informers that are added later
166
+ // Set the context so it can be passed to informers that are added later
167
167
ip .ctx = ctx
168
168
169
- ip .waitGroup .Add (len (ip .informers .Structured ) + len (ip .informers .Unstructured ) + len (ip .informers .Metadata ))
170
-
171
169
// Start each informer
172
170
for _ , i := range ip .informers .Structured {
173
- i := i
174
- go func () {
175
- defer ip .waitGroup .Done ()
176
- i .Informer .Run (ctx .Done ())
177
- }()
171
+ ip .startInformerLocked (i .Informer )
178
172
}
179
173
for _ , i := range ip .informers .Unstructured {
180
- i := i
181
- go func () {
182
- defer ip .waitGroup .Done ()
183
- i .Informer .Run (ctx .Done ())
184
- }()
174
+ ip .startInformerLocked (i .Informer )
185
175
}
186
176
for _ , i := range ip .informers .Metadata {
187
- i := i
188
- go func () {
189
- defer ip .waitGroup .Done ()
190
- i .Informer .Run (ctx .Done ())
191
- }()
177
+ ip .startInformerLocked (i .Informer )
192
178
}
193
179
194
180
// Set started to true so we immediately start any informers added later.
@@ -203,6 +189,21 @@ func (ip *InformersMap) Start(ctx context.Context) error {
203
189
return nil
204
190
}
205
191
192
+ func (ip * InformersMap ) startInformerLocked (informer cache.SharedIndexInformer ) {
193
+ // Don't start the informer in case we are already waiting for the items in
194
+ // the waitGroup to finish, since waitGroups don't support waiting and adding
195
+ // at the same time.
196
+ if ip .stopped {
197
+ return
198
+ }
199
+
200
+ ip .waitGroup .Add (1 )
201
+ go func () {
202
+ defer ip .waitGroup .Done ()
203
+ informer .Run (ip .ctx .Done ())
204
+ }()
205
+ }
206
+
206
207
func (ip * InformersMap ) waitForStarted (ctx context.Context ) bool {
207
208
select {
208
209
case <- ip .startWait :
@@ -331,15 +332,10 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
331
332
}
332
333
ip .informersByType (obj )[gvk ] = i
333
334
334
- // Start the Informer if need by
335
- // TODO(seans): write thorough tests and document what happens here - can you add indexers?
336
- // can you add eventhandlers?
337
- if ip .started && ! ip .stopped {
338
- ip .waitGroup .Add (1 )
339
- go func () {
340
- defer ip .waitGroup .Done ()
341
- i .Informer .Run (ip .ctx .Done ())
342
- }()
335
+ // Start the informer in case the InformersMap has started, otherwise it will be
336
+ // started when the InformersMap starts.
337
+ if ip .started {
338
+ ip .startInformerLocked (i .Informer )
343
339
}
344
340
return i , ip .started , nil
345
341
}
0 commit comments