Skip to content

Commit 7138650

Browse files
committed
Notification: move to use a queue
1 parent 5d8a52c commit 7138650

File tree

16 files changed

+2313
-119
lines changed

16 files changed

+2313
-119
lines changed

integrations/integration_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ func initIntegrationTest() {
213213
defer db.Close()
214214
}
215215
routers.GlobalInit(graceful.GetManager().HammerContext())
216+
NotifierListenerInit()
216217
}
217218

218219
func prepareTestEnv(t testing.TB, skip ...int) func() {

integrations/mssql.ini.tmpl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,10 @@ LEVEL = Debug
8282
INSTALL_LOCK = true
8383
SECRET_KEY = 9pCviYTWSb
8484
INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTU1NTE2MTh9.hhSVGOANkaKk3vfCd2jDOIww4pUk0xtg9JRde5UogyQ
85+
86+
[queue]
87+
TYPE=channel
88+
89+
[queue.test-notifier]
90+
BATCH_LENGTH=1
91+
LENGTH=20

integrations/mysql.ini.tmpl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,10 @@ LEVEL = Debug
8484
INSTALL_LOCK = true
8585
SECRET_KEY = 9pCviYTWSb
8686
INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTU1NTE2MTh9.hhSVGOANkaKk3vfCd2jDOIww4pUk0xtg9JRde5UogyQ
87+
88+
[queue]
89+
TYPE=channel
90+
91+
[queue.test-notifier]
92+
BATCH_LENGTH=1
93+
LENGTH=20

integrations/mysql8.ini.tmpl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,10 @@ LEVEL = Debug
8080
INSTALL_LOCK = true
8181
SECRET_KEY = 9pCviYTWSb
8282
INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTU1NTE2MTh9.hhSVGOANkaKk3vfCd2jDOIww4pUk0xtg9JRde5UogyQ
83+
84+
[queue]
85+
TYPE=channel
86+
87+
[queue.test-notifier]
88+
BATCH_LENGTH=1
89+
LENGTH=20
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright 2019 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package integrations
6+
7+
import (
8+
"encoding/json"
9+
"reflect"
10+
"sync"
11+
"testing"
12+
13+
"code.gitea.io/gitea/models"
14+
"code.gitea.io/gitea/modules/log"
15+
"code.gitea.io/gitea/modules/notification"
16+
"code.gitea.io/gitea/modules/notification/base"
17+
"code.gitea.io/gitea/modules/queue"
18+
)
19+
20+
var notifierListener *NotifierListener
21+
22+
var once = sync.Once{}
23+
24+
type NotifierListener struct {
25+
lock sync.RWMutex
26+
callbacks map[string][]*func(string, [][]byte)
27+
notifier base.Notifier
28+
}
29+
30+
func NotifierListenerInit() {
31+
once.Do(func() {
32+
notifierListener = &NotifierListener{
33+
callbacks: map[string][]*func(string, [][]byte){},
34+
}
35+
notifierListener.notifier = base.NewQueueNotifierWithHandle("test-notifier", notifierListener.handle)
36+
notification.RegisterNotifier(notifierListener.notifier)
37+
})
38+
}
39+
40+
// Register will register a callback with the provided notifier function
41+
func (n *NotifierListener) Register(functionName string, callback *func(string, [][]byte)) {
42+
n.lock.Lock()
43+
n.callbacks[functionName] = append(n.callbacks[functionName], callback)
44+
n.lock.Unlock()
45+
}
46+
47+
// Deregister will remove the provided callback from the provided notifier function
48+
func (n *NotifierListener) Deregister(functionName string, callback *func(string, [][]byte)) {
49+
n.lock.Lock()
50+
found := -1
51+
for i, callbackPtr := range n.callbacks[functionName] {
52+
if callbackPtr == callback {
53+
found = i
54+
break
55+
}
56+
}
57+
if found > -1 {
58+
n.callbacks[functionName] = append(n.callbacks[functionName][0:found], n.callbacks[functionName][found+1:]...)
59+
}
60+
n.lock.Unlock()
61+
}
62+
63+
// RegisterChannel will return a registered channel with function name and return a function to deregister it and close the channel at the end
64+
func (n *NotifierListener) RegisterChannel(name string, argNumber int, exemplar interface{}) (<-chan interface{}, func()) {
65+
t := reflect.TypeOf(exemplar)
66+
channel := make(chan interface{}, 10)
67+
callback := func(_ string, args [][]byte) {
68+
n := reflect.New(t).Elem()
69+
err := json.Unmarshal(args[argNumber], n.Addr().Interface())
70+
if err != nil {
71+
log.Error("Wrong Argument passed to register channel: %v ", err)
72+
}
73+
channel <- n.Interface()
74+
}
75+
n.Register(name, &callback)
76+
77+
return channel, func() {
78+
n.Deregister(name, &callback)
79+
close(channel)
80+
}
81+
}
82+
83+
func (n *NotifierListener) handle(data ...queue.Data) {
84+
n.lock.RLock()
85+
defer n.lock.RUnlock()
86+
for _, datum := range data {
87+
call := datum.(*base.FunctionCall)
88+
callbacks, ok := n.callbacks[call.Name]
89+
if ok && len(callbacks) > 0 {
90+
for _, callback := range callbacks {
91+
(*callback)(call.Name, call.Args)
92+
}
93+
}
94+
}
95+
}
96+
97+
func TestNotifierListener(t *testing.T) {
98+
defer prepareTestEnv(t)()
99+
100+
createPullNotified, deregister := notifierListener.RegisterChannel("NotifyNewPullRequest", 0, &models.PullRequest{})
101+
102+
bs, _ := json.Marshal(&models.PullRequest{})
103+
notifierListener.handle(&base.FunctionCall{
104+
Name: "NotifyNewPullRequest",
105+
Args: [][]byte{
106+
bs,
107+
},
108+
})
109+
<-createPullNotified
110+
111+
notifierListener.notifier.NotifyNewPullRequest(&models.PullRequest{})
112+
<-createPullNotified
113+
114+
notification.NotifyNewPullRequest(&models.PullRequest{})
115+
<-createPullNotified
116+
117+
deregister()
118+
119+
notification.NotifyNewPullRequest(&models.PullRequest{})
120+
// would panic if not deregistered
121+
}

integrations/pgsql.ini.tmpl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,10 @@ LEVEL = Debug
8383
INSTALL_LOCK = true
8484
SECRET_KEY = 9pCviYTWSb
8585
INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTU1NTE2MTh9.hhSVGOANkaKk3vfCd2jDOIww4pUk0xtg9JRde5UogyQ
86+
87+
[queue]
88+
TYPE=channel
89+
90+
[queue.test-notifier]
91+
BATCH_LENGTH=1
92+
LENGTH=20

integrations/pull_merge_test.go

Lines changed: 72 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,31 +61,75 @@ func testPullCleanUp(t *testing.T, session *TestSession, user, repo, pullnum str
6161

6262
func TestPullMerge(t *testing.T) {
6363
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
64-
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
65-
assert.NoError(t, err)
66-
hookTasksLenBefore := len(hookTasks)
64+
createPullNotified, deferableCreate := notifierListener.RegisterChannel("NotifyNewPullRequest", 0, &models.PullRequest{})
65+
defer deferableCreate()
66+
67+
mergePullNotified, deferableMerge := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
68+
defer deferableMerge()
6769

6870
session := loginUser(t, "user1")
6971
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
7072
testEditFile(t, session, "user1", "repo1", "master", "README.md", "Hello, World (Edited)\n")
7173

74+
var prInterface interface{}
75+
7276
resp := testPullCreate(t, session, "user1", "repo1", "master", "This is a pull title")
77+
select {
78+
case prInterface = <-createPullNotified:
79+
case <-time.After(500 * time.Millisecond):
80+
assert.Fail(t, "Took too long to notify!")
81+
}
82+
pr := prInterface.(*models.PullRequest)
83+
pr.LoadBaseRepo()
84+
pr.LoadHeadRepo()
85+
pr.BaseRepo.MustOwner()
86+
pr.HeadRepo.MustOwner()
87+
88+
assert.EqualValues(t, "user1", pr.HeadRepo.Owner.Name)
89+
assert.EqualValues(t, "repo1", pr.HeadRepo.Name)
90+
assert.EqualValues(t, "user2", pr.BaseRepo.Owner.Name)
91+
assert.EqualValues(t, "repo1", pr.BaseRepo.Name)
7392

7493
elem := strings.Split(test.RedirectURL(resp), "/")
7594
assert.EqualValues(t, "pulls", elem[3])
95+
7696
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleMerge)
7797

78-
hookTasks, err = models.HookTasks(1, 1)
79-
assert.NoError(t, err)
80-
assert.Len(t, hookTasks, hookTasksLenBefore+1)
98+
select {
99+
case prInterface = <-mergePullNotified:
100+
case <-time.After(500 * time.Millisecond):
101+
assert.Fail(t, "Took too long to notify!")
102+
}
103+
104+
pr = prInterface.(*models.PullRequest)
105+
pr.LoadBaseRepo()
106+
pr.LoadHeadRepo()
107+
pr.BaseRepo.MustOwner()
108+
pr.HeadRepo.MustOwner()
109+
110+
assert.EqualValues(t, "user1", pr.HeadRepo.Owner.Name)
111+
assert.EqualValues(t, "repo1", pr.HeadRepo.Name)
112+
assert.EqualValues(t, "user2", pr.BaseRepo.Owner.Name)
113+
assert.EqualValues(t, "repo1", pr.BaseRepo.Name)
114+
115+
time.Sleep(100 * time.Millisecond)
116+
select {
117+
case prInterface = <-createPullNotified:
118+
assert.Fail(t, "Should only have one pull create notification: %v", prInterface)
119+
default:
120+
}
121+
select {
122+
case prInterface = <-mergePullNotified:
123+
assert.Fail(t, "Should only have one pull merge notification: %v", prInterface)
124+
default:
125+
}
81126
})
82127
}
83128

84129
func TestPullRebase(t *testing.T) {
85130
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
86-
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
87-
assert.NoError(t, err)
88-
hookTasksLenBefore := len(hookTasks)
131+
mergePullNotified, deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
132+
defer deferable()
89133

90134
session := loginUser(t, "user1")
91135
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
@@ -96,18 +140,18 @@ func TestPullRebase(t *testing.T) {
96140
elem := strings.Split(test.RedirectURL(resp), "/")
97141
assert.EqualValues(t, "pulls", elem[3])
98142
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleRebase)
99-
100-
hookTasks, err = models.HookTasks(1, 1)
101-
assert.NoError(t, err)
102-
assert.Len(t, hookTasks, hookTasksLenBefore+1)
143+
select {
144+
case <-mergePullNotified:
145+
case <-time.After(500 * time.Millisecond):
146+
assert.Fail(t, "Took too long to notify!")
147+
}
103148
})
104149
}
105150

106151
func TestPullRebaseMerge(t *testing.T) {
107152
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
108-
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
109-
assert.NoError(t, err)
110-
hookTasksLenBefore := len(hookTasks)
153+
mergePullNotified, deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
154+
defer deferable()
111155

112156
session := loginUser(t, "user1")
113157
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
@@ -119,17 +163,18 @@ func TestPullRebaseMerge(t *testing.T) {
119163
assert.EqualValues(t, "pulls", elem[3])
120164
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleRebaseMerge)
121165

122-
hookTasks, err = models.HookTasks(1, 1)
123-
assert.NoError(t, err)
124-
assert.Len(t, hookTasks, hookTasksLenBefore+1)
166+
select {
167+
case <-mergePullNotified:
168+
case <-time.After(500 * time.Millisecond):
169+
assert.Fail(t, "Took too long to notify!")
170+
}
125171
})
126172
}
127173

128174
func TestPullSquash(t *testing.T) {
129175
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
130-
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
131-
assert.NoError(t, err)
132-
hookTasksLenBefore := len(hookTasks)
176+
mergePullNotified, deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
177+
defer deferable()
133178

134179
session := loginUser(t, "user1")
135180
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
@@ -142,9 +187,11 @@ func TestPullSquash(t *testing.T) {
142187
assert.EqualValues(t, "pulls", elem[3])
143188
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleSquash)
144189

145-
hookTasks, err = models.HookTasks(1, 1)
146-
assert.NoError(t, err)
147-
assert.Len(t, hookTasks, hookTasksLenBefore+1)
190+
select {
191+
case <-mergePullNotified:
192+
case <-time.After(500 * time.Millisecond):
193+
assert.Fail(t, "Took too long to notify!")
194+
}
148195
})
149196
}
150197

integrations/sqlite.ini

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,10 @@ INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTI3OTU5ODN9.O
8181
[oauth2]
8282
JWT_SECRET = KZb_QLUd4fYVyxetjxC4eZkrBgWM2SndOOWDNtgUUko
8383

84+
[queue]
85+
TYPE=channel
86+
87+
[queue.test-notifier]
88+
BATCH_LENGTH=1
89+
LENGTH=20
90+

0 commit comments

Comments
 (0)