Skip to content

Commit bd066ca

Browse files
committed
Add doctor commands for looking at the level DB
Signed-off-by: Andrew Thornton <[email protected]>
1 parent db1931c commit bd066ca

File tree

1 file changed

+216
-0
lines changed

1 file changed

+216
-0
lines changed

modules/doctor/queue.go

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Copyright 2022 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package doctor
6+
7+
import (
8+
"context"
9+
10+
"code.gitea.io/gitea/modules/log"
11+
"code.gitea.io/gitea/modules/nosql"
12+
"code.gitea.io/gitea/modules/queue"
13+
"code.gitea.io/gitea/modules/setting"
14+
"gitea.com/lunny/levelqueue"
15+
)
16+
17+
var uniqueQueueNames = []string{
18+
"code_indexer",
19+
"repo_stats_update",
20+
"mirror",
21+
"pr_patch_checker",
22+
"repo-archive",
23+
}
24+
25+
var queueNames = []string{
26+
"issue_indexer",
27+
"notification-service",
28+
"mail",
29+
"push_update",
30+
"task",
31+
}
32+
33+
var levelqueueTypes = []string{
34+
string(queue.PersistableChannelQueueType),
35+
string(queue.PersistableChannelUniqueQueueType),
36+
string(queue.LevelQueueType),
37+
string(queue.LevelUniqueQueueType),
38+
}
39+
40+
func checkUniqueQueues(ctx context.Context, logger log.Logger, autofix bool) error {
41+
for _, name := range uniqueQueueNames {
42+
q := setting.GetQueueSettings(name)
43+
if q.Type == "" {
44+
q.Type = string(queue.PersistableChannelQueueType)
45+
}
46+
found := false
47+
for _, typ := range levelqueueTypes {
48+
if typ == q.Type {
49+
found = true
50+
break
51+
}
52+
}
53+
if !found {
54+
logger.Info("Queue: %s\nType: %s\nNo LevelDB", q.Name, q.Type)
55+
continue
56+
}
57+
58+
connection := q.ConnectionString
59+
if connection == "" {
60+
connection = q.DataDir
61+
}
62+
63+
db, err := nosql.GetManager().GetLevelDB(connection)
64+
if err != nil {
65+
logger.Error("Queue: %s\nUnable to open DB connection %s: %v", q.Name, connection, err)
66+
return err
67+
}
68+
defer db.Close()
69+
70+
prefix := q.Name
71+
72+
iQueue, err := levelqueue.NewQueue(db, []byte(prefix), false)
73+
if err != nil {
74+
logger.Error("Queue: %s\nUnable to open Queue component: %v", q.Name, err)
75+
return err
76+
}
77+
78+
iSet, err := levelqueue.NewSet(db, []byte(prefix+"-unique"), false)
79+
if err != nil {
80+
logger.Error("Queue: %s\nUnable to open Set component: %v", q.Name, err)
81+
return err
82+
}
83+
84+
qLen := iQueue.Len()
85+
sMembers, err := iSet.Members()
86+
if err != nil {
87+
logger.Error("Queue: %s\nUnable to get members of Set component: %v", q.Name, err)
88+
return err
89+
}
90+
sLen := len(sMembers)
91+
92+
if int(qLen) == sLen {
93+
if qLen == 0 {
94+
logger.Info("Queue: %s\nType: %s\nLevelDB: %s", q.Name, q.Type, "empty")
95+
} else {
96+
logger.Info("Queue: %s\nType: %s\nLevelDB contains: %d entries", q.Name, q.Type, qLen)
97+
}
98+
continue
99+
}
100+
logger.Warn("Queue: %s\nType: %s\nContains different numbers of elements in Queue component %d to Set component %d", q.Name, q.Type, qLen, sLen)
101+
if !autofix {
102+
continue
103+
}
104+
105+
// Empty out the old set members
106+
for _, member := range sMembers {
107+
_, err := iSet.Remove(member)
108+
if err != nil {
109+
logger.Error("Queue: %s\nUnable to remove Set member %s: %v", q.Name, string(member), err)
110+
return err
111+
}
112+
}
113+
114+
// Now iterate across the queue
115+
for i := int64(0); i < qLen; i++ {
116+
// Pop from the left
117+
qData, err := iQueue.LPop()
118+
if err != nil {
119+
logger.Error("Queue: %s\nUnable to LPop out: %v", q.Name, err)
120+
return err
121+
}
122+
// And add to the right
123+
err = iQueue.RPush(qData)
124+
if err != nil {
125+
logger.Error("Queue: %s\nUnable to RPush back: %v", q.Name, err)
126+
return err
127+
}
128+
// And add back to the set
129+
_, err = iSet.Add(qData)
130+
if err != nil {
131+
logger.Error("Queue: %s\nUnable to add back in to Set: %v", q.Name, err)
132+
return err
133+
}
134+
}
135+
}
136+
return nil
137+
}
138+
139+
func queueListDB(ctx context.Context, logger log.Logger, autofix bool) error {
140+
connections := []string{}
141+
142+
for _, name := range append(uniqueQueueNames, queueNames...) {
143+
q := setting.GetQueueSettings(name)
144+
if q.Type == "" {
145+
q.Type = string(queue.PersistableChannelQueueType)
146+
}
147+
found := false
148+
for _, typ := range levelqueueTypes {
149+
if typ == q.Type {
150+
found = true
151+
break
152+
}
153+
}
154+
if !found {
155+
continue
156+
}
157+
if q.ConnectionString != "" {
158+
found := false
159+
for _, connection := range connections {
160+
if connection == q.ConnectionString {
161+
found = true
162+
}
163+
}
164+
if !found {
165+
connections = append(connections, q.ConnectionString)
166+
}
167+
continue
168+
}
169+
found = false
170+
for _, connection := range connections {
171+
if connection == q.DataDir {
172+
found = true
173+
}
174+
}
175+
if !found {
176+
connections = append(connections, q.DataDir)
177+
}
178+
}
179+
180+
for _, connection := range connections {
181+
logger.Info("LevelDB: %s", connection)
182+
db, err := nosql.GetManager().GetLevelDB(connection)
183+
if err != nil {
184+
logger.Error("Connection: %s Unable to open DB: %v", connection, err)
185+
return err
186+
}
187+
defer db.Close()
188+
iter := db.NewIterator(nil, nil)
189+
for iter.Next() {
190+
logger.Info("%s\n%s", log.NewColoredIDValue(string(iter.Key())), string(iter.Value()))
191+
}
192+
iter.Release()
193+
}
194+
return nil
195+
}
196+
197+
func init() {
198+
Register(&Check{
199+
Title: "Check if there are corrupt level uniquequeues",
200+
Name: "uniquequeues-corrupt",
201+
IsDefault: false,
202+
Run: checkUniqueQueues,
203+
AbortIfFailed: false,
204+
SkipDatabaseInitialization: false,
205+
Priority: 1,
206+
})
207+
Register(&Check{
208+
Title: "List all entries in leveldb",
209+
Name: "queues-listdb",
210+
IsDefault: false,
211+
Run: queueListDB,
212+
AbortIfFailed: false,
213+
SkipDatabaseInitialization: false,
214+
Priority: 1,
215+
})
216+
}

0 commit comments

Comments
 (0)