Skip to content

Commit ca14f21

Browse files
committed
Add UniqueQueue interface and functions to create them
1 parent 8eb4ce3 commit ca14f21

File tree

5 files changed

+87
-0
lines changed

5 files changed

+87
-0
lines changed

docs/content/doc/advanced/config-cheat-sheet.en-us.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ relation to port exhaustion.
252252
- `BATCH_LENGTH`: **20**: Batch data before passing to the handler
253253
- `CONN_STR`: **addrs=127.0.0.1:6379 db=0**: Connection string for the redis queue type.
254254
- `QUEUE_NAME`: **_queue**: The suffix for default redis queue name. Individual queues will default to **`name`**`QUEUE_NAME` but can be overriden in the specific `queue.name` section.
255+
- `SET_NAME`: **_unique**: The suffix that will added to the default redis
256+
set name for unique queues. Individual queues will default to
257+
**`name`**`QUEUE_NAME`_`SET_NAME`_ but can be overridden in the specific
258+
`queue.name` section.
255259
- `WRAP_IF_NECESSARY`: **true**: Will wrap queues with a timeoutable queue if the selected queue is not ready to be created - (Only relevant for the level queue.)
256260
- `MAX_ATTEMPTS`: **10**: Maximum number of attempts to create the wrapped queue
257261
- `TIMEOUT`: **GRACEFUL_HAMMER_TIME + 30s**: Timeout the creation of the wrapped queue if it takes longer than this to create.

modules/queue/queue.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ func (b *DummyQueue) Push(Data) error {
8181
return nil
8282
}
8383

84+
// PushFunc fakes a push of data to the queue with a function. The function is never run.
85+
func (b *DummyQueue) PushFunc(Data, func() error) error {
86+
return nil
87+
}
88+
89+
// Has always returns false as this queue never does anything
90+
func (b *DummyQueue) Has(Data) (bool, error) {
91+
return false, nil
92+
}
93+
8494
// Flush always returns nil
8595
func (b *DummyQueue) Flush(time.Duration) error {
8696
return nil

modules/queue/setting.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package queue
77
import (
88
"encoding/json"
99
"fmt"
10+
"strings"
1011

1112
"code.gitea.io/gitea/modules/log"
1213
"code.gitea.io/gitea/modules/setting"
@@ -36,6 +37,7 @@ func getQueueSettings(name string) (setting.QueueSettings, []byte) {
3637
opts["Password"] = q.Password
3738
opts["DBIndex"] = q.DBIndex
3839
opts["QueueName"] = q.QueueName
40+
opts["SetName"] = q.SetName
3941
opts["Workers"] = q.Workers
4042
opts["MaxWorkers"] = q.MaxWorkers
4143
opts["BlockTimeout"] = q.BlockTimeout
@@ -81,3 +83,38 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
8183
}
8284
return returnable
8385
}
86+
87+
// CreateUniqueQueue for name with provided handler and exemplar
88+
func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue {
89+
q, cfg := getQueueSettings(name)
90+
if len(cfg) == 0 {
91+
return nil
92+
}
93+
94+
if len(q.Type) > 0 && q.Type != "dummy" && !strings.HasPrefix(q.Type, "unique-") {
95+
q.Type = "unique-" + q.Type
96+
}
97+
98+
typ, err := validType(q.Type)
99+
if err != nil || typ == PersistableChannelQueueType {
100+
typ = PersistableChannelUniqueQueueType
101+
}
102+
103+
returnable, err := NewQueue(typ, handle, cfg, exemplar)
104+
if q.WrapIfNecessary && err != nil {
105+
log.Warn("Unable to create unique queue for %s: %v", name, err)
106+
log.Warn("Attempting to create wrapped queue")
107+
returnable, err = NewQueue(WrappedUniqueQueueType, handle, WrappedUniqueQueueConfiguration{
108+
Underlying: typ,
109+
Timeout: q.Timeout,
110+
MaxAttempts: q.MaxAttempts,
111+
Config: cfg,
112+
QueueLength: q.Length,
113+
}, exemplar)
114+
}
115+
if err != nil {
116+
log.Error("Unable to create unique queue for %s: %v", name, err)
117+
return nil
118+
}
119+
return returnable.(UniqueQueue)
120+
}

modules/queue/unique_queue.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2020 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package queue
6+
7+
import (
8+
"context"
9+
"fmt"
10+
)
11+
12+
// UniqueQueue defines a queue which guarantees only one instance of same
13+
// data is in the queue. Instances with same identity will be
14+
// discarded if there is already one in the line.
15+
//
16+
// This queue is particularly useful for preventing duplicated task
17+
// of same purpose.
18+
//
19+
// Users of this queue should be careful to push only the identifier of the
20+
// data
21+
type UniqueQueue interface {
22+
Run(atShutdown, atTerminate func(context.Context, func()))
23+
Push(Data) error
24+
PushFunc(Data, func() error) error
25+
Has(Data) (bool, error)
26+
}
27+
28+
// ErrAlreadyInQueue is returned when trying to push data to the queue that is already in the queue
29+
var ErrAlreadyInQueue = fmt.Errorf("already in queue")

modules/setting/queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type QueueSettings struct {
2626
Addresses string
2727
Password string
2828
QueueName string
29+
SetName string
2930
DBIndex int
3031
WrapIfNecessary bool
3132
MaxAttempts int
@@ -54,8 +55,13 @@ func GetQueueSettings(name string) QueueSettings {
5455
q.DataDir = key.MustString(q.DataDir)
5556
case "QUEUE_NAME":
5657
q.QueueName = key.MustString(q.QueueName)
58+
case "SET_NAME":
59+
q.SetName = key.MustString(q.SetName)
5760
}
5861
}
62+
if len(q.SetName) == 0 && len(Queue.SetName) > 0 {
63+
q.SetName = q.QueueName + Queue.SetName
64+
}
5965
if !filepath.IsAbs(q.DataDir) {
6066
q.DataDir = filepath.Join(AppDataPath, q.DataDir)
6167
}
@@ -100,6 +106,7 @@ func NewQueueService() {
100106
Queue.BoostTimeout = sec.Key("BOOST_TIMEOUT").MustDuration(5 * time.Minute)
101107
Queue.BoostWorkers = sec.Key("BOOST_WORKERS").MustInt(5)
102108
Queue.QueueName = sec.Key("QUEUE_NAME").MustString("_queue")
109+
Queue.SetName = sec.Key("SET_NAME").MustString("")
103110

104111
// Now handle the old issue_indexer configuration
105112
section := Cfg.Section("queue.issue_indexer")

0 commit comments

Comments
 (0)