@@ -222,51 +222,24 @@ func (r *runnableGroup) reconcile() {
222
222
223
223
// Start the runnable.
224
224
go func (rn * readyRunnable ) {
225
- // If we return, the runnable ended cleanly
226
- // or returned an error to the channel.
227
- //
228
- // We should always decrement the WaitGroup here.
229
- defer r .wg .Done ()
230
-
231
- // Track the ready check in the same WaitGroup to prevent goroutine leaks
232
- done := make (chan struct {})
233
-
234
- // Launch the ready check but make sure it doesn't outlive this goroutine
235
225
go func () {
236
- defer close (done )
237
226
if rn .Check (r .ctx ) {
238
227
if rn .signalReady {
239
- // Use non-blocking send to avoid leaking this goroutine if the channel is never read
240
- select {
241
- case r .startReadyCh <- rn :
242
- // Successfully sent
243
- case <- r .ctx .Done ():
244
- // Context canceled, exit without blocking
245
- }
228
+ r .startReadyCh <- rn
246
229
}
247
230
}
248
231
}()
249
232
250
- // Start the runnable.
251
- err := rn .Start (r .ctx )
252
-
253
- // Now that the runnable is done, clean up the ready check goroutine if still running
254
- select {
255
- case <- done :
256
- // Ready check already completed, nothing to do
257
- case <- r .ctx .Done ():
258
- // Context was canceled, ready check should exit soon
259
- }
233
+ // If we return, the runnable ended cleanly
234
+ // or returned an error to the channel.
235
+ //
236
+ // We should always decrement the WaitGroup here.
237
+ defer r .wg .Done ()
260
238
261
- // Send any error from the runnable
262
- if err != nil {
263
- select {
264
- case r .errChan <- err :
265
- // Error sent successfully
266
- default :
267
- // Channel full or closed, can't send the error
268
- }
269
- }
239
+ // Start the runnable.
240
+ if err := rn .Start (r .ctx ); err != nil {
241
+ r .errChan <- err
242
+ }
270
243
}(runnable )
271
244
}
272
245
}
@@ -324,13 +297,18 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
324
297
// StopAndWait waits for all the runnables to finish before returning.
325
298
func (r * runnableGroup ) StopAndWait (ctx context.Context ) {
326
299
r .stopOnce .Do (func () {
300
+ // Close the reconciler channel once we're done.
301
+ defer func () {
302
+ r .stop .Lock ()
303
+ close (r .ch )
304
+ r .stop .Unlock ()
305
+ }()
306
+
327
307
_ = r .Start (ctx )
328
308
r .stop .Lock ()
329
309
// Store the stopped variable so we don't accept any new
330
310
// runnables for the time being.
331
311
r .stopped = true
332
- // Close the channel to signal the reconcile goroutine to exit
333
- close (r .ch )
334
312
r .stop .Unlock ()
335
313
336
314
// Cancel the internal channel.
0 commit comments