Skip to content

Commit 543bae4

Browse files
committed
Only attempt to flush queue if the underlying worker pool is not finished (go-gitea#18593)
Backport go-gitea#18593 There is a possible race whereby a worker pool could be cancelled but yet the underlying queue is not empty. This will lead to flush-all cycling because it cannot empty the pool. * On shutdown of Persistant Channel Queues close datachan and empty Partial Backport go-gitea#18415 Although we attempt to empty the datachan in queues - due to races we are better off just closing the channel and forcibly emptying it in shutdown. Fix go-gitea#18618 Signed-off-by: Andrew Thornton <[email protected]>
1 parent 69b7776 commit 543bae4

File tree

3 files changed

+30
-13
lines changed

3 files changed

+30
-13
lines changed

modules/queue/manager.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ type ManagedPool interface {
7272
BoostWorkers() int
7373
// SetPoolSettings sets the user updatable settings for the pool
7474
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
75+
// Done returns a channel that will be closed when this Pool's baseCtx is closed
76+
Done() <-chan struct{}
7577
}
7678

7779
// ManagedQueueList implements the sort.Interface
@@ -141,7 +143,6 @@ func (m *Manager) Remove(qid int64) {
141143
delete(m.Queues, qid)
142144
m.mutex.Unlock()
143145
log.Trace("Queue Manager removed: QID: %d", qid)
144-
145146
}
146147

147148
// GetManagedQueue by qid
@@ -193,6 +194,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
193194
wg.Done()
194195
continue
195196
}
197+
198+
if pool, ok := mq.Managed.(ManagedPool); ok {
199+
// no point flushing pools were their base ctx is already done
200+
select {
201+
case <-pool.Done():
202+
wg.Done()
203+
continue
204+
default:
205+
}
206+
}
207+
196208
allEmpty = false
197209
if flushable, ok := mq.Managed.(Flushable); ok {
198210
log.Debug("Flushing (flushable) queue: %s", mq.Name)
@@ -225,7 +237,6 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
225237
wg.Wait()
226238
}
227239
return nil
228-
229240
}
230241

231242
// ManagedQueues returns the managed queues

modules/queue/queue_disk_channel.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
173173
q.internal.(*LevelQueue).Shutdown()
174174
GetManager().Remove(q.internal.(*LevelQueue).qid)
175175
}
176-
177176
}
178177

179178
// Flush flushes the queue and blocks till the queue is empty
@@ -252,14 +251,13 @@ func (q *PersistableChannelQueue) Shutdown() {
252251
q.channelQueue.Wait()
253252
q.internal.(*LevelQueue).Wait()
254253
// Redirect all remaining data in the chan to the internal channel
255-
go func() {
256-
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
257-
for data := range q.channelQueue.dataChan {
258-
_ = q.internal.Push(data)
259-
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
260-
}
261-
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
262-
}()
254+
close(q.channelQueue.dataChan)
255+
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
256+
for data := range q.channelQueue.dataChan {
257+
_ = q.internal.Push(data)
258+
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
259+
}
260+
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
263261

264262
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
265263
}

modules/queue/workerpool.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
6565
return pool
6666
}
6767

68+
// Done returns when this worker pool's base context has been cancelled
69+
func (p *WorkerPool) Done() <-chan struct{} {
70+
return p.baseCtx.Done()
71+
}
72+
6873
// Push pushes the data to the internal channel
6974
func (p *WorkerPool) Push(data Data) {
7075
atomic.AddInt64(&p.numInQueue, 1)
@@ -326,7 +331,10 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
326331
log.Trace("WorkerPool: %d Flush", p.qid)
327332
for {
328333
select {
329-
case data := <-p.dataChan:
334+
case data, ok := <-p.dataChan:
335+
if !ok {
336+
return nil
337+
}
330338
p.handle(data)
331339
atomic.AddInt64(&p.numInQueue, -1)
332340
case <-p.baseCtx.Done():
@@ -341,7 +349,7 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
341349

342350
func (p *WorkerPool) doWork(ctx context.Context) {
343351
delay := time.Millisecond * 300
344-
var data = make([]Data, 0, p.batchLength)
352+
data := make([]Data, 0, p.batchLength)
345353
for {
346354
select {
347355
case <-ctx.Done():

0 commit comments

Comments
 (0)