Skip to content

Commit b453c15

Browse files
committed
Move flushall to queue.Manager and add to testlogger
1 parent a883a9a commit b453c15

File tree

3 files changed

+68
-70
lines changed

3 files changed

+68
-70
lines changed

integrations/testlogger.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package integrations
66

77
import (
8+
"context"
89
"encoding/json"
910
"fmt"
1011
"os"
@@ -100,13 +101,8 @@ func PrintCurrentTest(t testing.TB, skip ...int) func() {
100101
}
101102
writerCloser.setT(&t)
102103
return func() {
103-
for i := 0; i < 5; i++ {
104-
qs := queue.GetManager().ManagedQueues()
105-
for _, q := range qs {
106-
if err := q.Flush(10 * time.Second); err != nil {
107-
t.Errorf("Flushing queue %s failed with error %v", q.Name, err)
108-
}
109-
}
104+
if err := queue.GetManager().FlushAll(context.Background(), 20*time.Second); err != nil {
105+
t.Errorf("Flushing queues failed with error %v", err)
110106
}
111107
_ = writerCloser.Close()
112108
}

modules/queue/manager.go

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ type ManagedQueue struct {
4444

4545
// Flushable represents a pool or queue that is flushable
4646
type Flushable interface {
47-
// Flush will add a flush worker to the pool
47+
// Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
4848
Flush(time.Duration) error
49+
// FlushWithContext is very similar to Flush
50+
// NB: The worker will not be registered with the manager.
51+
FlushWithContext(ctx context.Context) error
4952
// IsEmpty will return if the managed pool is empty and has no work
5053
IsEmpty() bool
5154
}
@@ -147,6 +150,64 @@ func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
147150
return m.Queues[qid]
148151
}
149152

153+
// FlushAll flushes all the flushable queues attached to this manager
154+
func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
155+
var ctx context.Context
156+
var cancel context.CancelFunc
157+
start := time.Now()
158+
end := start
159+
hasTimeout := false
160+
if timeout > 0 {
161+
ctx, cancel = context.WithTimeout(baseCtx, timeout)
162+
end = start.Add(timeout)
163+
hasTimeout = true
164+
} else {
165+
ctx, cancel = context.WithCancel(baseCtx)
166+
}
167+
defer cancel()
168+
169+
for {
170+
select {
171+
case <-ctx.Done():
172+
return ctx.Err()
173+
default:
174+
}
175+
mqs := m.ManagedQueues()
176+
wg := sync.WaitGroup{}
177+
wg.Add(len(mqs))
178+
allEmpty := true
179+
for _, mq := range mqs {
180+
if mq.IsEmpty() {
181+
wg.Done()
182+
continue
183+
}
184+
allEmpty = false
185+
if flushable, ok := mq.Managed.(Flushable); ok {
186+
go func() {
187+
localCtx, localCancel := context.WithCancel(ctx)
188+
pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
189+
err := flushable.FlushWithContext(localCtx)
190+
if err != nil && err != ctx.Err() {
191+
cancel()
192+
}
193+
mq.CancelWorkers(pid)
194+
localCancel()
195+
wg.Done()
196+
}()
197+
} else {
198+
wg.Done()
199+
}
200+
201+
}
202+
if allEmpty {
203+
break
204+
}
205+
wg.Wait()
206+
}
207+
return nil
208+
209+
}
210+
150211
// ManagedQueues returns the managed queues
151212
func (m *Manager) ManagedQueues() []*ManagedQueue {
152213
m.mutex.Lock()

routers/private/manager.go

Lines changed: 3 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@
55
package private
66

77
import (
8-
"context"
98
"net/http"
10-
"sync"
11-
"time"
129

1310
"code.gitea.io/gitea/modules/graceful"
1411
"code.gitea.io/gitea/modules/log"
@@ -20,28 +17,15 @@ import (
2017

2118
// FlushQueues flushes all the Queues
2219
func FlushQueues(ctx *macaron.Context, opts private.FlushOptions) {
23-
var flushCtx context.Context
24-
var cancel context.CancelFunc
25-
start := time.Now()
26-
end := start
27-
hasTimeout := false
2820

2921
baseCtx := ctx.Req.Request.Context()
3022
if opts.NonBlocking {
3123
baseCtx = graceful.GetManager().HammerContext()
3224
}
3325

34-
if opts.Timeout > 0 {
35-
flushCtx, cancel = context.WithTimeout(baseCtx, opts.Timeout)
36-
end = start.Add(opts.Timeout)
37-
hasTimeout = true
38-
} else {
39-
flushCtx, cancel = context.WithCancel(baseCtx)
40-
}
41-
4226
if opts.NonBlocking {
4327
go func() {
44-
err := doFlush(flushCtx, cancel, start, end, hasTimeout)
28+
err := queue.GetManager().FlushAll(baseCtx, opts.Timeout)
4529
if err != nil {
4630
log.Error("Flushing request timed-out with error: %v", err)
4731
}
@@ -51,54 +35,11 @@ func FlushQueues(ctx *macaron.Context, opts private.FlushOptions) {
5135
})
5236
return
5337
}
54-
err := doFlush(flushCtx, cancel, start, end, hasTimeout)
38+
err := queue.GetManager().FlushAll(baseCtx, opts.Timeout)
5539
if err != nil {
5640
ctx.JSON(http.StatusRequestTimeout, map[string]interface{}{
57-
"err": flushCtx.Err().Error(),
41+
"err": err,
5842
})
5943
}
6044
ctx.PlainText(http.StatusOK, []byte("success"))
6145
}
62-
63-
func doFlush(ctx context.Context, cancel context.CancelFunc, start, end time.Time, hasTimeout bool) error {
64-
defer cancel()
65-
for {
66-
select {
67-
case <-ctx.Done():
68-
return ctx.Err()
69-
default:
70-
}
71-
mqs := queue.GetManager().ManagedQueues()
72-
wg := sync.WaitGroup{}
73-
wg.Add(len(mqs))
74-
allEmpty := true
75-
for _, mq := range mqs {
76-
if mq.IsEmpty() {
77-
wg.Done()
78-
continue
79-
}
80-
allEmpty = false
81-
if pool, ok := mq.Managed.(queue.WorkerPool); ok {
82-
go func() {
83-
localCtx, localCancel := context.WithCancel(ctx)
84-
pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
85-
err := pool.FlushWithContext(localCtx)
86-
if err != nil && err != ctx.Err() {
87-
cancel()
88-
}
89-
mq.CancelWorkers(pid)
90-
localCancel()
91-
wg.Done()
92-
}()
93-
} else {
94-
wg.Done()
95-
}
96-
97-
}
98-
if allEmpty {
99-
break
100-
}
101-
wg.Wait()
102-
}
103-
return nil
104-
}

0 commit comments

Comments
 (0)