Skip to content

Commit 7ba1b71

Browse files
zeripathGusted
andauthored
Only attempt to flush queue if the underlying worker pool is not finished (#18593)
* Only attempt to flush queue if the underlying worker pool is not finished There is a possible race whereby a worker pool could be cancelled but yet the underlying queue is not empty. This will lead to flush-all cycling because it cannot empty the pool. Signed-off-by: Andrew Thornton <[email protected]> * Apply suggestions from code review Co-authored-by: Gusted <[email protected]> Co-authored-by: Gusted <[email protected]>
1 parent a51d211 commit 7ba1b71

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

modules/queue/manager.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ type ManagedPool interface {
8484
BoostWorkers() int
8585
// SetPoolSettings sets the user updatable settings for the pool
8686
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
87+
// Done returns a channel that will be closed when the Pool's baseCtx is closed
88+
Done() <-chan struct{}
8789
}
8890

8991
// ManagedQueueList implements the sort.Interface
@@ -211,6 +213,15 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
211213
continue
212214
}
213215
}
216+
if pool, ok := mq.Managed.(ManagedPool); ok {
217+
// No point into flushing pools when their base's ctx is already done.
218+
select {
219+
case <-pool.Done():
220+
wg.Done()
221+
continue
222+
default:
223+
}
224+
}
214225

215226
allEmpty = false
216227
if flushable, ok := mq.Managed.(Flushable); ok {

modules/queue/workerpool.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
7474
return pool
7575
}
7676

77+
// Done returns when this worker pool's base context has been cancelled
78+
func (p *WorkerPool) Done() <-chan struct{} {
79+
return p.baseCtx.Done()
80+
}
81+
7782
// Push pushes the data to the internal channel
7883
func (p *WorkerPool) Push(data Data) {
7984
atomic.AddInt64(&p.numInQueue, 1)

0 commit comments

Comments
 (0)