Skip to content

Commit 22c4563

Browse files
committed
Add UniqueQueue implementations
1 parent ca14f21 commit 22c4563

File tree

6 files changed

+1093
-0
lines changed

6 files changed

+1093
-0
lines changed

modules/queue/queue_redis.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ type redisClient interface {
2626
RPush(key string, args ...interface{}) *redis.IntCmd
2727
LPop(key string) *redis.StringCmd
2828
LLen(key string) *redis.IntCmd
29+
SAdd(key string, members ...interface{}) *redis.IntCmd
30+
SRem(key string, members ...interface{}) *redis.IntCmd
31+
SIsMember(key string, member interface{}) *redis.BoolCmd
2932
Ping() *redis.StatusCmd
3033
Close() error
3134
}

modules/queue/unique_queue_channel.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2020 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package queue
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"sync"
11+
12+
"code.gitea.io/gitea/modules/log"
13+
)
14+
15+
// ChannelUniqueQueueType is the type for channel queue
16+
const ChannelUniqueQueueType Type = "unique-channel"
17+
18+
// ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue
19+
type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
20+
21+
// ChannelUniqueQueue implements UniqueQueue
22+
//
23+
// It is basically a thin wrapper around a WorkerPool but keeps a store of
24+
// what has been pushed within a table
25+
type ChannelUniqueQueue struct {
26+
*WorkerPool
27+
lock sync.Mutex
28+
table map[Data]bool
29+
exemplar interface{}
30+
workers int
31+
name string
32+
}
33+
34+
// NewChannelUniqueQueue create a memory channel queue
35+
func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
36+
configInterface, err := toConfig(ChannelUniqueQueueConfiguration{}, cfg)
37+
if err != nil {
38+
return nil, err
39+
}
40+
config := configInterface.(ChannelUniqueQueueConfiguration)
41+
if config.BatchLength == 0 {
42+
config.BatchLength = 1
43+
}
44+
queue := &ChannelUniqueQueue{
45+
table: map[Data]bool{},
46+
exemplar: exemplar,
47+
workers: config.Workers,
48+
name: config.Name,
49+
}
50+
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
51+
for _, datum := range data {
52+
queue.lock.Lock()
53+
delete(queue.table, datum)
54+
queue.lock.Unlock()
55+
handle(datum)
56+
}
57+
}, config.WorkerPoolConfiguration)
58+
59+
queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar)
60+
return queue, nil
61+
}
62+
63+
// Run starts to run the queue
64+
func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
65+
atShutdown(context.Background(), func() {
66+
log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name)
67+
})
68+
atTerminate(context.Background(), func() {
69+
log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name)
70+
})
71+
log.Debug("ChannelUniqueQueue: %s Starting", q.name)
72+
go func() {
73+
_ = q.AddWorkers(q.workers, 0)
74+
}()
75+
}
76+
77+
// Push will push data into the queue if the data is not already in the queue
78+
func (q *ChannelUniqueQueue) Push(data Data) error {
79+
return q.PushFunc(data, nil)
80+
}
81+
82+
// PushFunc will push data into the queue
83+
func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
84+
if !assignableTo(data, q.exemplar) {
85+
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
86+
}
87+
q.lock.Lock()
88+
locked := true
89+
defer func() {
90+
if locked {
91+
q.lock.Unlock()
92+
}
93+
}()
94+
if _, ok := q.table[data]; ok {
95+
return ErrAlreadyInQueue
96+
}
97+
// FIXME: We probably need to implement some sort of limit here
98+
// If the downstream queue blocks this table will grow without limit
99+
q.table[data] = true
100+
if fn != nil {
101+
err := fn()
102+
if err != nil {
103+
delete(q.table, data)
104+
return err
105+
}
106+
}
107+
locked = false
108+
q.lock.Unlock()
109+
q.WorkerPool.Push(data)
110+
return nil
111+
}
112+
113+
// Has checks if the data is in the queue
114+
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
115+
q.lock.Lock()
116+
defer q.lock.Unlock()
117+
_, has := q.table[data]
118+
return has, nil
119+
}
120+
121+
// Name returns the name of this queue
122+
func (q *ChannelUniqueQueue) Name() string {
123+
return q.name
124+
}
125+
126+
func init() {
127+
queuesMap[ChannelUniqueQueueType] = NewChannelUniqueQueue
128+
}

modules/queue/unique_queue_disk.go

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Copyright 2019 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package queue
6+
7+
import (
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
"sync"
12+
"time"
13+
14+
"code.gitea.io/gitea/modules/log"
15+
16+
"gitea.com/lunny/levelqueue"
17+
)
18+
19+
// LevelUniqueQueueType is the type for level queue
20+
const LevelUniqueQueueType Type = "unique-level"
21+
22+
// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
23+
type LevelUniqueQueueConfiguration struct {
24+
WorkerPoolConfiguration
25+
DataDir string
26+
Workers int
27+
Name string
28+
}
29+
30+
// LevelUniqueQueue implements a disk library queue
31+
type LevelUniqueQueue struct {
32+
*WorkerPool
33+
queue *levelqueue.UniqueQueue
34+
closed chan struct{}
35+
terminated chan struct{}
36+
lock sync.Mutex
37+
exemplar interface{}
38+
workers int
39+
name string
40+
}
41+
42+
// NewLevelUniqueQueue creates a ledis local queue
43+
func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
44+
configInterface, err := toConfig(LevelUniqueQueueConfiguration{}, cfg)
45+
if err != nil {
46+
return nil, err
47+
}
48+
config := configInterface.(LevelUniqueQueueConfiguration)
49+
50+
internal, err := levelqueue.OpenUnique(config.DataDir)
51+
if err != nil {
52+
return nil, err
53+
}
54+
55+
queue := &LevelUniqueQueue{
56+
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
57+
queue: internal,
58+
exemplar: exemplar,
59+
closed: make(chan struct{}),
60+
terminated: make(chan struct{}),
61+
workers: config.Workers,
62+
name: config.Name,
63+
}
64+
queue.qid = GetManager().Add(queue, LevelUniqueQueueType, config, exemplar)
65+
return queue, nil
66+
}
67+
68+
// Run starts to run the queue
69+
func (l *LevelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
70+
atShutdown(context.Background(), l.Shutdown)
71+
atTerminate(context.Background(), l.Terminate)
72+
log.Debug("LevelUniqueQueue: %s Starting", l.name)
73+
74+
go func() {
75+
_ = l.AddWorkers(l.workers, 0)
76+
}()
77+
78+
go l.readToChan()
79+
80+
log.Trace("LevelUniqueQueue: %s Waiting til closed", l.name)
81+
<-l.closed
82+
83+
log.Trace("LevelUniqueQueue: %s Waiting til done", l.name)
84+
l.Wait()
85+
86+
log.Trace("LevelUniqueQueue: %s Waiting til cleaned", l.name)
87+
ctx, cancel := context.WithCancel(context.Background())
88+
atTerminate(ctx, cancel)
89+
l.CleanUp(ctx)
90+
cancel()
91+
log.Trace("LevelUniqueQueue: %s Cleaned", l.name)
92+
93+
}
94+
95+
func (l *LevelUniqueQueue) readToChan() {
96+
for {
97+
select {
98+
case <-l.closed:
99+
// tell the pool to shutdown.
100+
l.cancel()
101+
return
102+
default:
103+
bs, err := l.queue.RPop()
104+
if err != nil {
105+
if err != levelqueue.ErrNotFound {
106+
log.Error("LevelUniqueQueue: %s Error on RPop: %v", l.name, err)
107+
}
108+
time.Sleep(time.Millisecond * 100)
109+
continue
110+
}
111+
112+
if len(bs) == 0 {
113+
time.Sleep(time.Millisecond * 100)
114+
continue
115+
}
116+
117+
data, err := unmarshalAs(bs, l.exemplar)
118+
if err != nil {
119+
log.Error("LevelUniqueQueue: %s Failed to unmarshal with error: %v", l.name, err)
120+
time.Sleep(time.Millisecond * 100)
121+
continue
122+
}
123+
124+
log.Trace("LevelUniqueQueue %s: Task found: %#v", l.name, data)
125+
l.WorkerPool.Push(data)
126+
127+
}
128+
}
129+
}
130+
131+
// Push will push the data to the queue
132+
func (l *LevelUniqueQueue) Push(data Data) error {
133+
return l.PushFunc(data, nil)
134+
}
135+
136+
// PushFunc will push the data to the queue running fn if the data will be added
137+
func (l *LevelUniqueQueue) PushFunc(data Data, fn func() error) error {
138+
if !assignableTo(data, l.exemplar) {
139+
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
140+
}
141+
bs, err := json.Marshal(data)
142+
if err != nil {
143+
return err
144+
}
145+
err = l.queue.LPushFunc(bs, fn)
146+
if err == levelqueue.ErrAlreadyInQueue {
147+
return ErrAlreadyInQueue
148+
}
149+
return err
150+
}
151+
152+
// Has checks if the provided data is in the queue already
153+
func (l *LevelUniqueQueue) Has(data Data) (bool, error) {
154+
if !assignableTo(data, l.exemplar) {
155+
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
156+
}
157+
bs, err := json.Marshal(data)
158+
if err != nil {
159+
return false, err
160+
}
161+
return l.queue.Has(bs)
162+
}
163+
164+
// Shutdown this queue and stop processing
165+
func (l *LevelUniqueQueue) Shutdown() {
166+
l.lock.Lock()
167+
defer l.lock.Unlock()
168+
log.Trace("LevelUniqueQueue: %s Shutting down", l.name)
169+
select {
170+
case <-l.closed:
171+
default:
172+
close(l.closed)
173+
}
174+
log.Debug("LevelUniqueQueue: %s Shutdown", l.name)
175+
176+
}
177+
178+
// Terminate this queue and close the queue
179+
func (l *LevelUniqueQueue) Terminate() {
180+
log.Trace("LevelUniqueQueue: %s Terminating", l.name)
181+
l.Shutdown()
182+
l.lock.Lock()
183+
select {
184+
case <-l.terminated:
185+
l.lock.Unlock()
186+
default:
187+
close(l.terminated)
188+
l.lock.Unlock()
189+
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
190+
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
191+
}
192+
}
193+
log.Debug("LevelUniqueQueue: %s Terminated", l.name)
194+
}
195+
196+
// Name returns the name of this queue
197+
func (l *LevelUniqueQueue) Name() string {
198+
return l.name
199+
}
200+
201+
func init() {
202+
queuesMap[LevelUniqueQueueType] = NewLevelUniqueQueue
203+
}

0 commit comments

Comments
 (0)