Skip to content

Commit e5fc5ff

Browse files
committed
sync comment & config & variable name
1 parent c99ee5d commit e5fc5ff

File tree

4 files changed

+20
-23
lines changed

4 files changed

+20
-23
lines changed

modules/queue/workerqueue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,11 @@ func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQu
178178
case "redis":
179179
return t, newBaseRedisGeneric
180180
default: // level(leveldb,levelqueue,persistable-channel)
181-
return "levelqueue", newBaseLevelQueueGeneric
181+
return "level", newBaseLevelQueueGeneric
182182
}
183183
}
184184

185-
func NewWorkerPoolQueueByIniConfig[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
185+
func NewWorkerPoolQueueBySetting[T any](name string, queueSetting setting.QueueSettings, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
186186
if handler == nil {
187187
log.Debug("Use dummy queue for %q because handler is nil and caller doesn't want to process the queue items", name)
188188
queueSetting.Type = "dummy"
@@ -234,7 +234,7 @@ func createWorkerPoolQueue[T any](name string, cfgProvider setting.ConfigProvide
234234
log.Error("Failed to get queue settings for %q: %v", name, err)
235235
return nil
236236
}
237-
w := NewWorkerPoolQueueByIniConfig(name, queueSetting, handler, unique)
237+
w := NewWorkerPoolQueueBySetting(name, queueSetting, handler, unique)
238238
GetManager().AddManagedQueue(w)
239239
return w
240240
}

modules/queue/workerqueue_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ func TestWorkerPoolQueueUnhandled(t *testing.T) {
3939

4040
mu := sync.Mutex{}
4141

42-
test := func(iniCfg setting.QueueSettings) {
43-
iniCfg.Length = 100
44-
iniCfg.Datadir = t.TempDir() + "/test-queue"
42+
test := func(queueSetting setting.QueueSettings) {
43+
queueSetting.Length = 100
44+
queueSetting.Datadir = t.TempDir() + "/test-queue"
4545
m := map[int]int{}
4646
// odds are handled once, evens are handled twice
4747
handler := func(items ...int) (unhandled []int) {
@@ -56,15 +56,15 @@ func TestWorkerPoolQueueUnhandled(t *testing.T) {
5656
return unhandled
5757
}
5858

59-
q := NewWorkerPoolQueueByIniConfig("test-workpoolqueue", iniCfg, handler, false)
59+
q := NewWorkerPoolQueueBySetting("test-workpoolqueue", queueSetting, handler, false)
6060
stop := runWorkerPoolQueue(q)
61-
for i := 0; i < iniCfg.Length; i++ {
61+
for i := 0; i < queueSetting.Length; i++ {
6262
assert.NoError(t, q.Push(i))
6363
}
6464
assert.NoError(t, q.FlushWithContext(context.Background(), 0))
6565
stop()
6666

67-
for i := 0; i < iniCfg.Length; i++ {
67+
for i := 0; i < queueSetting.Length; i++ {
6868
assert.EqualValues(t, i%2, m[i]%2)
6969
}
7070
}
@@ -106,9 +106,9 @@ func TestWorkerPoolQueuePersistence(t *testing.T) {
106106
})
107107
}
108108

109-
func testWorkerPoolQueuePersistence(t *testing.T, iniCfg setting.QueueSettings) {
110-
testCount := iniCfg.Length
111-
iniCfg.Datadir = t.TempDir() + "/test-queue"
109+
func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSettings) {
110+
testCount := queueSetting.Length
111+
queueSetting.Datadir = t.TempDir() + "/test-queue"
112112

113113
mu := sync.Mutex{}
114114

@@ -132,7 +132,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, iniCfg setting.QueueSettings)
132132
return nil
133133
}
134134

135-
q := NewWorkerPoolQueueByIniConfig("pr_patch_checker_test", iniCfg, testHandler, true)
135+
q := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
136136
stop := runWorkerPoolQueue(q)
137137
for i := 0; i < testCount; i++ {
138138
_ = q.Push("task-" + strconv.Itoa(i))
@@ -156,7 +156,7 @@ func testWorkerPoolQueuePersistence(t *testing.T, iniCfg setting.QueueSettings)
156156
return nil
157157
}
158158

159-
q := NewWorkerPoolQueueByIniConfig("pr_patch_checker_test", iniCfg, testHandler, true)
159+
q := NewWorkerPoolQueueBySetting("pr_patch_checker_test", queueSetting, testHandler, true)
160160
stop := runWorkerPoolQueue(q)
161161
assert.NoError(t, q.FlushWithContext(context.Background(), 0))
162162
stop()

modules/setting/queue.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ type QueueSettings struct {
1616
Type string
1717
Datadir string
1818
ConnStr string // for levelqueue or redis
19-
Length int // queue length
19+
Length int // max queue length before blocking
2020

21-
QueueName, SetName string
21+
QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue
2222

2323
BatchLength int
2424
MaxWorkers int
@@ -29,14 +29,15 @@ var queueSettingsDefault = QueueSettings{
2929
Datadir: "queues/", // relative to AppDataPath
3030
Length: 20, // queue length before a channel queue will block
3131

32-
QueueName: "_queue", // the suffix of the default redis/disk queue name
33-
SetName: "_unique", // the suffix of the default redis/disk unique queue set name (for unique queue)
32+
QueueName: "_queue",
33+
SetName: "_unique",
3434
BatchLength: 20,
3535
MaxWorkers: 10,
3636
}
3737

3838
func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) {
3939
cfg := queueSettingsDefault
40+
cfg.Name = name
4041
if sec, err := rootCfg.GetSection("queue"); err == nil {
4142
if err = sec.MapTo(&cfg); err != nil {
4243
log.Error("Failed to map queue common config for %q: %v", name, err)
@@ -78,8 +79,6 @@ func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error
7879
return cfg, nil
7980
}
8081

81-
// LoadQueueSettings sets up the default settings for Queues
82-
// This is exported for tests to be able to use the queue
8382
func LoadQueueSettings() {
8483
loadQueueFrom(CfgProvider)
8584
}
@@ -95,8 +94,6 @@ func loadQueueFrom(rootCfg ConfigProvider) {
9594
handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH")
9695
}
9796

98-
// handleOldLengthConfiguration allows fallback to older configuration. `[queue.name]` `LENGTH` will override this configuration, but
99-
// if that is left unset then we should fall back to the older configuration. (Except where the new length woul be <=0)
10097
func handleOldLengthConfiguration(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) {
10198
if rootCfg.Section(oldSection).HasKey(oldKey) {
10299
log.Fatal("Removed queue option:`[%s].%s`. Use new options in`[queue.%s]`", oldSection, oldKey, newQueueName)

services/pull/check_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
2929
return nil
3030
}
3131
iniCfg := queue.IniConfig{Length: 10, BatchLength: 1, Workers: 1}
32-
prPatchCheckerQueue = queue.NewWorkerPoolQueueByIniConfig("pr_patch_checker", iniCfg, testHandler, true)
32+
prPatchCheckerQueue = queue.NewWorkerPoolQueueBySetting("pr_patch_checker", iniCfg, testHandler, true)
3333

3434
var queueShutdown []func()
3535
var queueTerminate []func()

0 commit comments

Comments
 (0)