Skip to content

Commit c14a7c4

Browse files
committed
fine tune queue flush
1 parent 150f47f commit c14a7c4

File tree

4 files changed

+33
-14
lines changed

4 files changed

+33
-14
lines changed

modules/queue/manager.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@ package queue
55

66
import (
77
"context"
8+
"errors"
89
"sync"
910
"time"
1011

1112
"code.gitea.io/gitea/modules/log"
1213
"code.gitea.io/gitea/modules/setting"
13-
14-
"golang.org/x/sync/errgroup"
1514
)
1615

1716
// Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
@@ -34,6 +33,7 @@ type ManagedWorkerPoolQueue interface {
3433

3534
// FlushWithContext tries to make the handler process all items in the queue synchronously.
3635
// It is for testing purpose only. It's not designed to be used in a cluster.
36+
// Negative timeout means discarding all items in the queue.
3737
FlushWithContext(ctx context.Context, timeout time.Duration) error
3838

3939
// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
@@ -78,15 +78,16 @@ func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
7878

7979
// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
8080
// It is for testing purpose only. It's not designed to be used in a cluster.
81+
// Negative timeout means discarding all items in the queue.
8182
func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
82-
g := errgroup.Group{}
83+
var finalErrors []error
8384
qs := m.ManagedQueues()
8485
for _, q := range qs {
85-
g.Go(func() error {
86-
return q.FlushWithContext(ctx, timeout)
87-
})
86+
if err := q.FlushWithContext(ctx, timeout); err != nil {
87+
finalErrors = append(finalErrors, err)
88+
}
8889
}
89-
return g.Wait()
90+
return errors.Join(finalErrors...)
9091
}
9192

9293
// CreateSimpleQueue creates a simple queue from global setting config provider by name

modules/queue/workergroup.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,15 +197,30 @@ func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
197197
defer log.Debug("Queue %q finishes flushing", q.GetName())
198198

199199
// stop all workers, and prepare a new worker context to start new workers
200-
201200
wg.ctxWorkerCancel()
202201
wg.wg.Wait()
203202

204203
defer func() {
205-
close(flush)
204+
close(flush.c)
206205
wg.doPrepareWorkerContext()
207206
}()
208207

208+
if flush.timeout < 0 {
209+
// discard everything
210+
wg.batchBuffer = nil
211+
for {
212+
select {
213+
case _ = <-wg.popItemChan:
214+
case _ = <-wg.popItemErr:
215+
case _ = <-q.batchChan:
216+
case <-q.ctxRun.Done():
217+
return
218+
default:
219+
return
220+
}
221+
}
222+
}
223+
209224
// drain the batch channel first
210225
loop:
211226
for {

modules/queue/workerqueue.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ type WorkerPoolQueue[T any] struct {
4242
workerNumMu sync.Mutex
4343
}
4444

45-
type flushType chan struct{}
45+
type flushType struct {
46+
timeout time.Duration
47+
c chan struct{}
48+
}
4649

4750
var _ ManagedWorkerPoolQueue = (*WorkerPoolQueue[any])(nil)
4851

@@ -104,12 +107,12 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.
104107
if timeout > 0 {
105108
after = time.After(timeout)
106109
}
107-
c := make(flushType)
110+
flush := flushType{timeout: timeout, c: make(chan struct{})}
108111

109112
// send flush request
110113
// if it blocks, it means that there is a flush in progress or the queue hasn't been started yet
111114
select {
112-
case q.flushChan <- c:
115+
case q.flushChan <- flush:
113116
case <-ctx.Done():
114117
return ctx.Err()
115118
case <-q.ctxRun.Done():
@@ -120,7 +123,7 @@ func (q *WorkerPoolQueue[T]) FlushWithContext(ctx context.Context, timeout time.
120123

121124
// wait for flush to finish
122125
select {
123-
case <-c:
126+
case <-flush.c:
124127
return nil
125128
case <-ctx.Done():
126129
return ctx.Err()

modules/testlogger/testlogger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func PrintCurrentTest(t testing.TB, skip ...int) func() {
118118
_, _ = fmt.Fprintf(os.Stdout, "+++ %s ... still flushing after %v ...\n", t.Name(), SlowFlush)
119119
}
120120
})
121-
if err := queue.GetManager().FlushAll(context.Background(), time.Minute); err != nil {
121+
if err := queue.GetManager().FlushAll(context.Background(), -1); err != nil {
122122
t.Errorf("Flushing queues failed with error %v", err)
123123
}
124124
timer.Stop()

0 commit comments

Comments
 (0)