Skip to content

Commit 4112380

Browse files
committed
Use queue instead of memory queue in webhook send service
1 parent 257cea6 commit 4112380

File tree

4 files changed

+59
-155
lines changed

4 files changed

+59
-155
lines changed

modules/sync/unique_queue.go

Lines changed: 0 additions & 104 deletions
This file was deleted.

routers/init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func GlobalInitInstalled(ctx context.Context) {
145145
mustInit(stats_indexer.Init)
146146

147147
mirror_service.InitSyncMirrors()
148-
webhook.InitDeliverHooks()
148+
mustInit(webhook.Init)
149149
mustInit(pull_service.Init)
150150
mustInit(task.Init)
151151
mustInit(repo_migrations.Init)

services/webhook/deliver.go

Lines changed: 18 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"io"
1616
"net/http"
1717
"net/url"
18-
"strconv"
1918
"strings"
2019
"sync"
2120
"time"
@@ -26,6 +25,7 @@ import (
2625
"code.gitea.io/gitea/modules/log"
2726
"code.gitea.io/gitea/modules/process"
2827
"code.gitea.io/gitea/modules/proxy"
28+
"code.gitea.io/gitea/modules/queue"
2929
"code.gitea.io/gitea/modules/setting"
3030

3131
"github.com/gobwas/glob"
@@ -202,17 +202,19 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
202202
return nil
203203
}
204204

205-
// DeliverHooks checks and delivers undelivered hooks.
205+
// populateDeliverHooks checks and delivers undelivered hooks.
206206
// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue
207207
// or a full queue. Then more hooks could be sent at same time.
208-
func DeliverHooks(ctx context.Context) {
208+
func populateDeliverHooks(ctx context.Context) {
209209
select {
210210
case <-ctx.Done():
211211
return
212212
default:
213213
}
214+
214215
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: DeliverHooks", process.SystemProcessType, true)
215216
defer finished()
217+
216218
tasks, err := webhook_model.FindUndeliveredHookTasks()
217219
if err != nil {
218220
log.Error("DeliverHooks: %v", err)
@@ -226,42 +228,9 @@ func DeliverHooks(ctx context.Context) {
226228
return
227229
default:
228230
}
229-
if err = Deliver(ctx, t); err != nil {
230-
log.Error("deliver: %v", err)
231-
}
232-
}
233-
234-
// Start listening on new hook requests.
235-
for {
236-
select {
237-
case <-ctx.Done():
238-
hookQueue.Close()
239-
return
240-
case repoIDStr := <-hookQueue.Queue():
241-
log.Trace("DeliverHooks [repo_id: %v]", repoIDStr)
242-
hookQueue.Remove(repoIDStr)
243-
244-
repoID, err := strconv.ParseInt(repoIDStr, 10, 64)
245-
if err != nil {
246-
log.Error("Invalid repo ID: %s", repoIDStr)
247-
continue
248-
}
249231

250-
tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID)
251-
if err != nil {
252-
log.Error("Get repository [%d] hook tasks: %v", repoID, err)
253-
continue
254-
}
255-
for _, t := range tasks {
256-
select {
257-
case <-ctx.Done():
258-
return
259-
default:
260-
}
261-
if err = Deliver(ctx, t); err != nil {
262-
log.Error("deliver: %v", err)
263-
}
264-
}
232+
if err := addToTask(t.RepoID); err != nil {
233+
log.Error("DeliverHook failed [%d]: %v", t.RepoID, err)
265234
}
266235
}
267236
}
@@ -297,8 +266,8 @@ func webhookProxy() func(req *http.Request) (*url.URL, error) {
297266
}
298267
}
299268

300-
// InitDeliverHooks starts the hooks delivery thread
301-
func InitDeliverHooks() {
269+
// Init starts the hooks delivery thread
270+
func Init() error {
302271
timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
303272

304273
allowedHostListValue := setting.Webhook.AllowedHostList
@@ -316,5 +285,13 @@ func InitDeliverHooks() {
316285
},
317286
}
318287

319-
go graceful.GetManager().RunWithShutdownContext(DeliverHooks)
288+
hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, "")
289+
if hookQueue == nil {
290+
return fmt.Errorf("Unable to create webhook_sender Queue")
291+
}
292+
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
293+
294+
populateDeliverHooks(graceful.GetManager().HammerContext())
295+
296+
return nil
320297
}

services/webhook/webhook.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
repo_model "code.gitea.io/gitea/models/repo"
1313
webhook_model "code.gitea.io/gitea/models/webhook"
1414
"code.gitea.io/gitea/modules/git"
15+
"code.gitea.io/gitea/modules/graceful"
1516
"code.gitea.io/gitea/modules/log"
17+
"code.gitea.io/gitea/modules/queue"
1618
"code.gitea.io/gitea/modules/setting"
1719
api "code.gitea.io/gitea/modules/structs"
18-
"code.gitea.io/gitea/modules/sync"
1920
"code.gitea.io/gitea/modules/util"
2021

2122
"github.com/gobwas/glob"
@@ -80,7 +81,7 @@ func IsValidHookTaskType(name string) bool {
8081
}
8182

8283
// hookQueue is a global queue of web hooks
83-
var hookQueue = sync.NewUniqueQueue(setting.Webhook.QueueLength)
84+
var hookQueue queue.UniqueQueue
8485

8586
// getPayloadBranch returns branch for hook event, if applicable.
8687
func getPayloadBranch(p api.Payloader) string {
@@ -101,14 +102,47 @@ func getPayloadBranch(p api.Payloader) string {
101102
return ""
102103
}
103104

105+
// handle passed PR IDs and test the PRs
106+
func handle(data ...queue.Data) []queue.Data {
107+
for _, datum := range data {
108+
repoIDStr := datum.(string)
109+
log.Trace("DeliverHooks [repo_id: %v]", repoIDStr)
110+
111+
repoID, err := strconv.ParseInt(repoIDStr, 10, 64)
112+
if err != nil {
113+
log.Error("Invalid repo ID: %s", repoIDStr)
114+
continue
115+
}
116+
117+
tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID)
118+
if err != nil {
119+
log.Error("Get repository [%d] hook tasks: %v", repoID, err)
120+
continue
121+
}
122+
for _, t := range tasks {
123+
if err = Deliver(graceful.GetManager().HammerContext(), t); err != nil {
124+
log.Error("deliver: %v", err)
125+
}
126+
}
127+
}
128+
return nil
129+
}
130+
131+
func addToTask(repoID int64) error {
132+
err := hookQueue.PushFunc(strconv.FormatInt(repoID, 10), nil)
133+
if err != nil && err != queue.ErrAlreadyInQueue {
134+
return err
135+
}
136+
return nil
137+
}
138+
104139
// PrepareWebhook adds special webhook to task queue for given payload.
105140
func PrepareWebhook(w *webhook_model.Webhook, repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error {
106141
if err := prepareWebhook(w, repo, event, p); err != nil {
107142
return err
108143
}
109144

110-
go hookQueue.Add(strconv.FormatInt(repo.ID, 10))
111-
return nil
145+
return addToTask(repo.ID)
112146
}
113147

114148
func checkBranch(w *webhook_model.Webhook, branch string) bool {
@@ -188,8 +222,7 @@ func PrepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventT
188222
return err
189223
}
190224

191-
go hookQueue.Add(strconv.FormatInt(repo.ID, 10))
192-
return nil
225+
return addToTask(repo.ID)
193226
}
194227

195228
func prepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error {
@@ -240,7 +273,5 @@ func ReplayHookTask(w *webhook_model.Webhook, uuid string) error {
240273
return err
241274
}
242275

243-
go hookQueue.Add(strconv.FormatInt(t.RepoID, 10))
244-
245-
return nil
276+
return addToTask(t.RepoID)
246277
}

0 commit comments

Comments
 (0)