Skip to content

Commit 22572ec

Browse files
committed
Implement Pausable and PushBack for the bytefifos
Signed-off-by: Andrew Thornton <[email protected]>
1 parent 4fec9f3 commit 22572ec

File tree

6 files changed

+150
-28
lines changed

6 files changed

+150
-28
lines changed

modules/queue/bytefifo.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type ByteFIFO interface {
1616
Pop(ctx context.Context) ([]byte, error)
1717
// Close this fifo
1818
Close() error
19+
// PushBack pushes data back to the top of the fifo
20+
PushBack(ctx context.Context, data []byte) error
1921
}
2022

2123
// UniqueByteFIFO defines a FIFO that Uniques its contents
@@ -50,6 +52,11 @@ func (*DummyByteFIFO) Len(ctx context.Context) int64 {
5052
return 0
5153
}
5254

55+
// PushBack pushes data back to the top of the fifo
56+
func (*DummyByteFIFO) PushBack(ctx context.Context, data []byte) error {
57+
return nil
58+
}
59+
5360
var _ UniqueByteFIFO = &DummyUniqueByteFIFO{}
5461

5562
// DummyUniqueByteFIFO represents a dummy unique fifo

modules/queue/queue_bytefifo.go

Lines changed: 115 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"fmt"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"code.gitea.io/gitea/modules/log"
@@ -52,8 +53,7 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
5253
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
5354
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
5455

55-
return &ByteFIFOQueue{
56-
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
56+
q := &ByteFIFOQueue{
5757
byteFIFO: byteFIFO,
5858
typ: typ,
5959
shutdownCtx: shutdownCtx,
@@ -65,7 +65,17 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
6565
name: config.Name,
6666
waitOnEmpty: config.WaitOnEmpty,
6767
pushed: make(chan struct{}, 1),
68-
}, nil
68+
}
69+
q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) {
70+
for _, unhandled := range handle(data...) {
71+
if fail := q.PushBack(unhandled); fail != nil {
72+
failed = append(failed, fail)
73+
}
74+
}
75+
return
76+
}, config.WorkerPoolConfiguration)
77+
78+
return q, nil
6979
}
7080

7181
// Name returns the name of this queue
@@ -78,6 +88,25 @@ func (q *ByteFIFOQueue) Push(data Data) error {
7888
return q.PushFunc(data, nil)
7989
}
8090

91+
// PushBack pushes data to the fifo
92+
func (q *ByteFIFOQueue) PushBack(data Data) error {
93+
if !assignableTo(data, q.exemplar) {
94+
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
95+
}
96+
json := jsoniter.ConfigCompatibleWithStandardLibrary
97+
bs, err := json.Marshal(data)
98+
if err != nil {
99+
return err
100+
}
101+
defer func() {
102+
select {
103+
case q.pushed <- struct{}{}:
104+
default:
105+
}
106+
}()
107+
return q.byteFIFO.PushBack(q.terminateCtx, bs)
108+
}
109+
81110
// PushFunc pushes data to the fifo
82111
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
83112
if !assignableTo(data, q.exemplar) {
@@ -88,14 +117,12 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
88117
if err != nil {
89118
return err
90119
}
91-
if q.waitOnEmpty {
92-
defer func() {
93-
select {
94-
case q.pushed <- struct{}{}:
95-
default:
96-
}
97-
}()
98-
}
120+
defer func() {
121+
select {
122+
case q.pushed <- struct{}{}:
123+
default:
124+
}
125+
}()
99126
return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
100127
}
101128

@@ -109,6 +136,15 @@ func (q *ByteFIFOQueue) IsEmpty() bool {
109136
return q.byteFIFO.Len(q.terminateCtx) == 0
110137
}
111138

139+
// Flush flushes the ByteFIFOQueue
140+
func (q *ByteFIFOQueue) Flush(timeout time.Duration) error {
141+
select {
142+
case q.pushed <- struct{}{}:
143+
default:
144+
}
145+
return q.WorkerPool.Flush(timeout)
146+
}
147+
112148
// Run runs the bytefifo queue
113149
func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
114150
atShutdown(q.Shutdown)
@@ -143,47 +179,89 @@ func (q *ByteFIFOQueue) readToChan() {
143179

144180
// Default backoff values
145181
backOffTime := time.Millisecond * 100
182+
backOffTimer := time.NewTimer(0)
183+
if !backOffTimer.Stop() {
184+
select {
185+
case <-backOffTimer.C:
186+
default:
187+
}
188+
}
146189

147190
loop:
148191
for {
149-
err := q.doPop()
150-
if err == errQueueEmpty {
151-
log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
192+
paused, resumed := q.IsPausedIsResumed()
193+
if paused {
194+
log.Trace("Queue %s pausing", q.name)
152195
select {
153-
case <-q.pushed:
154-
// reset backOffTime
155-
backOffTime = 100 * time.Millisecond
156-
continue loop
196+
case <-resumed:
197+
log.Trace("Queue %s resuming", q.name)
157198
case <-q.shutdownCtx.Done():
158-
// Oops we've been shutdown whilst waiting
159-
// Make sure the worker pool is shutdown too
199+
// tell the pool to shutdown.
160200
q.baseCtxCancel()
161201
return
202+
case data := <-q.dataChan:
203+
if err := q.PushBack(data); err != nil {
204+
log.Error("Unable to push back data into queue %s", q.name)
205+
}
206+
atomic.AddInt64(&q.numInQueue, -1)
162207
}
163208
}
164209

165-
// Reset the backOffTime if there is no error or an unmarshalError
166-
if err == nil || err == errUnmarshal {
167-
backOffTime = 100 * time.Millisecond
210+
// empty the pushed channel
211+
select {
212+
case <-q.pushed:
213+
default:
214+
}
215+
216+
err := q.doPop()
217+
218+
if !backOffTimer.Stop() {
219+
select {
220+
case <-backOffTimer.C:
221+
default:
222+
}
168223
}
169224

170225
if err != nil {
226+
if err == errQueueEmpty && q.waitOnEmpty {
227+
log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
228+
229+
// reset the backoff time but don't set the timer
230+
backOffTime = 100 * time.Millisecond
231+
} else if err == errUnmarshal {
232+
// reset the timer and backoff
233+
backOffTime = 100 * time.Millisecond
234+
backOffTimer.Reset(backOffTime)
235+
} else {
236+
// backoff
237+
backOffTimer.Reset(backOffTime)
238+
}
239+
171240
// Need to Backoff
172241
select {
173242
case <-q.shutdownCtx.Done():
174243
// Oops we've been shutdown whilst backing off
175244
// Make sure the worker pool is shutdown too
176245
q.baseCtxCancel()
177246
return
178-
case <-time.After(backOffTime):
179-
// OK we've waited - so backoff a bit
247+
case <-q.pushed:
248+
// Data has been pushed to the fifo (or flush has been called)
249+
// reset the backoff time
250+
backOffTime = 100 * time.Millisecond
251+
continue loop
252+
case <-backOffTimer.C:
253+
// Calculate the next backoff time
180254
backOffTime += backOffTime / 2
181255
if backOffTime > maxBackOffTime {
182256
backOffTime = maxBackOffTime
183257
}
184258
continue loop
185259
}
186260
}
261+
262+
// Reset the backoff time
263+
backOffTime = 100 * time.Millisecond
264+
187265
select {
188266
case <-q.shutdownCtx.Done():
189267
// Oops we've been shutdown
@@ -288,9 +366,8 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
288366
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
289367
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
290368

291-
return &ByteFIFOUniqueQueue{
369+
q := &ByteFIFOUniqueQueue{
292370
ByteFIFOQueue: ByteFIFOQueue{
293-
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
294371
byteFIFO: byteFIFO,
295372
typ: typ,
296373
shutdownCtx: shutdownCtx,
@@ -301,7 +378,17 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
301378
workers: config.Workers,
302379
name: config.Name,
303380
},
304-
}, nil
381+
}
382+
q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) {
383+
for _, unhandled := range handle(data...) {
384+
if fail := q.PushBack(unhandled); fail != nil {
385+
failed = append(failed, fail)
386+
}
387+
}
388+
return
389+
}, config.WorkerPoolConfiguration)
390+
391+
return q, nil
305392
}
306393

307394
// Has checks if the provided data is in the queue

modules/queue/queue_disk.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn fu
9494
return fifo.internal.LPush(data)
9595
}
9696

97+
// PushBack pushes data to the top of the fifo
98+
func (fifo *LevelQueueByteFIFO) PushBack(ctx context.Context, data []byte) error {
99+
return fifo.internal.RPush(data)
100+
}
101+
97102
// Pop pops data from the start of the fifo
98103
func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
99104
data, err := fifo.internal.RPop()

modules/queue/queue_redis.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
5757

5858
type redisClient interface {
5959
RPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd
60+
LPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd
6061
LPop(ctx context.Context, key string) *redis.StringCmd
6162
LLen(ctx context.Context, key string) *redis.IntCmd
6263
SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
@@ -103,6 +104,11 @@ func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func()
103104
return fifo.client.RPush(ctx, fifo.queueName, data).Err()
104105
}
105106

107+
// PushBack pushes data to the top of the fifo
108+
func (fifo *RedisByteFIFO) PushBack(ctx context.Context, data []byte) error {
109+
return fifo.client.LPush(ctx, fifo.queueName, data).Err()
110+
}
111+
106112
// Pop pops data from the start of the fifo
107113
func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) {
108114
data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()

modules/queue/unique_queue_disk.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte,
9393
return fifo.internal.LPushFunc(data, fn)
9494
}
9595

96+
// PushBack pushes data to the top of the fifo
97+
func (fifo *LevelUniqueQueueByteFIFO) PushBack(ctx context.Context, data []byte) error {
98+
return fifo.internal.RPush(data)
99+
}
100+
96101
// Pop pops data from the start of the fifo
97102
func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
98103
data, err := fifo.internal.RPop()

modules/queue/unique_queue_redis.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,18 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn f
105105
return fifo.client.RPush(ctx, fifo.queueName, data).Err()
106106
}
107107

108+
// PushBack pushes data to the top of the fifo
109+
func (fifo *RedisUniqueByteFIFO) PushBack(ctx context.Context, data []byte) error {
110+
added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
111+
if err != nil {
112+
return err
113+
}
114+
if added == 0 {
115+
return ErrAlreadyInQueue
116+
}
117+
return fifo.client.LPush(ctx, fifo.queueName, data).Err()
118+
}
119+
108120
// Pop pops data from the start of the fifo
109121
func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
110122
data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()

0 commit comments

Comments
 (0)