Skip to content

Commit b2dea35

Browse files
committed
Graceful: Make the cron tasks graceful
1 parent 00ddf85 commit b2dea35

File tree

8 files changed

+144
-24
lines changed

8 files changed

+144
-24
lines changed

integrations/auth_ldap_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package integrations
66

77
import (
8+
"context"
89
"net/http"
910
"os"
1011
"strings"
@@ -147,7 +148,7 @@ func TestLDAPUserSync(t *testing.T) {
147148
}
148149
defer prepareTestEnv(t)()
149150
addAuthSourceLDAP(t, "")
150-
models.SyncExternalUsers()
151+
models.SyncExternalUsers(context.Background())
151152

152153
session := loginUser(t, "user1")
153154
// Check if users exists
@@ -206,7 +207,8 @@ func TestLDAPUserSSHKeySync(t *testing.T) {
206207
}
207208
defer prepareTestEnv(t)()
208209
addAuthSourceLDAP(t, "sshPublicKey")
209-
models.SyncExternalUsers()
210+
211+
models.SyncExternalUsers(context.Background())
210212

211213
// Check if users has SSH keys synced
212214
for _, u := range gitLDAPUsers {

models/branches.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package models
66

77
import (
8+
"context"
89
"fmt"
910
"time"
1011

@@ -525,7 +526,8 @@ func (deletedBranch *DeletedBranch) LoadUser() {
525526
}
526527

527528
// RemoveOldDeletedBranches removes old deleted branches
528-
func RemoveOldDeletedBranches() {
529+
func RemoveOldDeletedBranches(ctx context.Context) {
530+
// Nothing to do for shutdown or terminate
529531
log.Trace("Doing: DeletedBranchesCleanup")
530532

531533
deleteBefore := time.Now().Add(-setting.Cron.DeletedBranchesCleanup.OlderThan)

models/repo.go

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package models
77

88
import (
99
"bytes"
10+
"context"
1011
"crypto/md5"
1112
"errors"
1213
"fmt"
@@ -2268,19 +2269,32 @@ func DeleteRepositoryArchives() error {
22682269
}
22692270

22702271
// DeleteOldRepositoryArchives deletes old repository archives.
2271-
func DeleteOldRepositoryArchives() {
2272+
func DeleteOldRepositoryArchives(ctx context.Context) {
22722273
log.Trace("Doing: ArchiveCleanup")
22732274

2274-
if err := x.Where("id > 0").Iterate(new(Repository), deleteOldRepositoryArchives); err != nil {
2275+
if err := x.Where("id > 0").Iterate(new(Repository), func(idx int, bean interface{}) error {
2276+
return deleteOldRepositoryArchives(ctx, idx, bean)
2277+
}); err != nil {
22752278
log.Error("ArchiveClean: %v", err)
22762279
}
22772280
}
22782281

2279-
func deleteOldRepositoryArchives(idx int, bean interface{}) error {
2282+
func deleteOldRepositoryArchives(ctx context.Context, idx int, bean interface{}) error {
22802283
repo := bean.(*Repository)
22812284
basePath := filepath.Join(repo.RepoPath(), "archives")
2285+
select {
2286+
case <-ctx.Done():
2287+
return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives in %v ", repo)
2288+
default:
2289+
}
22822290

22832291
for _, ty := range []string{"zip", "targz"} {
2292+
select {
2293+
case <-ctx.Done():
2294+
return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s", repo, ty)
2295+
default:
2296+
}
2297+
22842298
path := filepath.Join(basePath, ty)
22852299
file, err := os.Open(path)
22862300
if err != nil {
@@ -2303,6 +2317,11 @@ func deleteOldRepositoryArchives(idx int, bean interface{}) error {
23032317
minimumOldestTime := time.Now().Add(-setting.Cron.ArchiveCleanup.OlderThan)
23042318
for _, info := range files {
23052319
if info.ModTime().Before(minimumOldestTime) && !info.IsDir() {
2320+
select {
2321+
case <-ctx.Done():
2322+
return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s - %s", repo, ty, info.Name())
2323+
default:
2324+
}
23062325
toDelete := filepath.Join(path, info.Name())
23072326
// This is a best-effort purge, so we do not check error codes to confirm removal.
23082327
if err = os.Remove(toDelete); err != nil {
@@ -2396,13 +2415,17 @@ func SyncRepositoryHooks() error {
23962415
}
23972416

23982417
// GitFsck calls 'git fsck' to check repository health.
2399-
func GitFsck() {
2418+
func GitFsck(ctx context.Context) {
24002419
log.Trace("Doing: GitFsck")
2401-
24022420
if err := x.
24032421
Where("id>0 AND is_fsck_enabled=?", true).BufferSize(setting.Database.IterateBufferSize).
24042422
Iterate(new(Repository),
24052423
func(idx int, bean interface{}) error {
2424+
select {
2425+
case <-ctx.Done():
2426+
return fmt.Errorf("Aborted due to shutdown")
2427+
default:
2428+
}
24062429
repo := bean.(*Repository)
24072430
repoPath := repo.RepoPath()
24082431
log.Trace("Running health check on repository %s", repoPath)
@@ -2448,13 +2471,19 @@ type repoChecker struct {
24482471
desc string
24492472
}
24502473

2451-
func repoStatsCheck(checker *repoChecker) {
2474+
func repoStatsCheck(ctx context.Context, checker *repoChecker) {
24522475
results, err := x.Query(checker.querySQL)
24532476
if err != nil {
24542477
log.Error("Select %s: %v", checker.desc, err)
24552478
return
24562479
}
24572480
for _, result := range results {
2481+
select {
2482+
case <-ctx.Done():
2483+
log.Warn("Aborting due to shutdown")
2484+
return
2485+
default:
2486+
}
24582487
id := com.StrTo(result["id"]).MustInt64()
24592488
log.Trace("Updating %s: %d", checker.desc, id)
24602489
_, err = x.Exec(checker.correctSQL, id, id)
@@ -2465,7 +2494,7 @@ func repoStatsCheck(checker *repoChecker) {
24652494
}
24662495

24672496
// CheckRepoStats checks the repository stats
2468-
func CheckRepoStats() {
2497+
func CheckRepoStats(ctx context.Context) {
24692498
log.Trace("Doing: CheckRepoStats")
24702499

24712500
checkers := []*repoChecker{
@@ -2501,7 +2530,13 @@ func CheckRepoStats() {
25012530
},
25022531
}
25032532
for i := range checkers {
2504-
repoStatsCheck(checkers[i])
2533+
select {
2534+
case <-ctx.Done():
2535+
log.Warn("Aborting due to shutdown")
2536+
return
2537+
default:
2538+
repoStatsCheck(ctx, checkers[i])
2539+
}
25052540
}
25062541

25072542
// ***** START: Repository.NumClosedIssues *****
@@ -2511,6 +2546,12 @@ func CheckRepoStats() {
25112546
log.Error("Select %s: %v", desc, err)
25122547
} else {
25132548
for _, result := range results {
2549+
select {
2550+
case <-ctx.Done():
2551+
log.Warn("Aborting due to shutdown")
2552+
return
2553+
default:
2554+
}
25142555
id := com.StrTo(result["id"]).MustInt64()
25152556
log.Trace("Updating %s: %d", desc, id)
25162557
_, err = x.Exec("UPDATE `repository` SET num_closed_issues=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, false, id)
@@ -2528,6 +2569,12 @@ func CheckRepoStats() {
25282569
log.Error("Select %s: %v", desc, err)
25292570
} else {
25302571
for _, result := range results {
2572+
select {
2573+
case <-ctx.Done():
2574+
log.Warn("Aborting due to shutdown")
2575+
return
2576+
default:
2577+
}
25312578
id := com.StrTo(result["id"]).MustInt64()
25322579
log.Trace("Updating %s: %d", desc, id)
25332580
_, err = x.Exec("UPDATE `repository` SET num_closed_pulls=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, true, id)
@@ -2545,6 +2592,12 @@ func CheckRepoStats() {
25452592
log.Error("Select repository count 'num_forks': %v", err)
25462593
} else {
25472594
for _, result := range results {
2595+
select {
2596+
case <-ctx.Done():
2597+
log.Warn("Aborting due to shutdown")
2598+
return
2599+
default:
2600+
}
25482601
id := com.StrTo(result["id"]).MustInt64()
25492602
log.Trace("Updating repository count 'num_forks': %d", id)
25502603

models/user.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package models
77

88
import (
99
"container/list"
10+
"context"
1011
"crypto/md5"
1112
"crypto/sha256"
1213
"crypto/subtle"
@@ -1695,7 +1696,7 @@ func synchronizeLdapSSHPublicKeys(usr *User, s *LoginSource, sshPublicKeys []str
16951696
}
16961697

16971698
// SyncExternalUsers is used to synchronize users with external authorization source
1698-
func SyncExternalUsers() {
1699+
func SyncExternalUsers(ctx context.Context) {
16991700
log.Trace("Doing: SyncExternalUsers")
17001701

17011702
ls, err := LoginSources()
@@ -1710,6 +1711,12 @@ func SyncExternalUsers() {
17101711
if !s.IsActived || !s.IsSyncEnabled {
17111712
continue
17121713
}
1714+
select {
1715+
case <-ctx.Done():
1716+
log.Warn("SyncExternalUsers: Aborted due to shutdown before update of %s", s.Name)
1717+
return
1718+
default:
1719+
}
17131720

17141721
if s.IsLDAP() {
17151722
log.Trace("Doing: SyncExternalUsers[%s]", s.Name)
@@ -1727,6 +1734,12 @@ func SyncExternalUsers() {
17271734
log.Error("SyncExternalUsers: %v", err)
17281735
return
17291736
}
1737+
select {
1738+
case <-ctx.Done():
1739+
log.Warn("SyncExternalUsers: Aborted due to shutdown before update of %s", s.Name)
1740+
return
1741+
default:
1742+
}
17301743

17311744
sr, err := s.LDAP().SearchEntries()
17321745
if err != nil {
@@ -1735,6 +1748,19 @@ func SyncExternalUsers() {
17351748
}
17361749

17371750
for _, su := range sr {
1751+
select {
1752+
case <-ctx.Done():
1753+
log.Warn("SyncExternalUsers: Aborted due to shutdown at update of %s before completed update of users", s.Name)
1754+
// Rewrite authorized_keys file if LDAP Public SSH Key attribute is set and any key was added or removed
1755+
if sshKeysNeedUpdate {
1756+
err = RewriteAllPublicKeys()
1757+
if err != nil {
1758+
log.Error("RewriteAllPublicKeys: %v", err)
1759+
}
1760+
}
1761+
return
1762+
default:
1763+
}
17381764
if len(su.Username) == 0 {
17391765
continue
17401766
}
@@ -1819,6 +1845,13 @@ func SyncExternalUsers() {
18191845
}
18201846
}
18211847

1848+
select {
1849+
case <-ctx.Done():
1850+
log.Warn("SyncExternalUsers: Aborted due to shutdown at update of %s before delete users", s.Name)
1851+
return
1852+
default:
1853+
}
1854+
18221855
// Deactivate users not present in LDAP
18231856
if updateExisting {
18241857
for _, usr := range users {

modules/cron/cron.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
package cron
77

88
import (
9+
"context"
910
"time"
1011

1112
"code.gitea.io/gitea/models"
13+
"code.gitea.io/gitea/modules/graceful"
1214
"code.gitea.io/gitea/modules/log"
1315
"code.gitea.io/gitea/modules/migrations"
1416
"code.gitea.io/gitea/modules/setting"
@@ -37,17 +39,19 @@ var taskStatusTable = sync.NewStatusTable()
3739
type Func func()
3840

3941
// WithUnique wrap a cron func with an unique running check
40-
func WithUnique(name string, body Func) Func {
42+
func WithUnique(name string, body func(context.Context)) Func {
4143
return func() {
4244
if !taskStatusTable.StartIfNotRunning(name) {
4345
return
4446
}
4547
defer taskStatusTable.Stop(name)
46-
body()
48+
graceful.GetManager().RunWithShutdownContext(body)
4749
}
4850
}
4951

5052
// NewContext begins cron tasks
53+
// Each cron task is run within the shutdown context as a running server
54+
// AtShutdown the cron server is stopped
5155
func NewContext() {
5256
var (
5357
entry *cron.Entry
@@ -129,6 +133,7 @@ func NewContext() {
129133
go WithUnique(updateMigrationPosterID, migrations.UpdateMigrationPosterID)()
130134

131135
c.Start()
136+
graceful.GetManager().RunAtShutdown(context.Background(), c.Stop)
132137
}
133138

134139
// ListTasks returns all running cron tasks.

modules/migrations/update.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,28 @@
55
package migrations
66

77
import (
8+
"context"
9+
810
"code.gitea.io/gitea/models"
911
"code.gitea.io/gitea/modules/log"
1012
"code.gitea.io/gitea/modules/structs"
1113
)
1214

1315
// UpdateMigrationPosterID updates all migrated repositories' issues and comments posterID
14-
func UpdateMigrationPosterID() {
16+
func UpdateMigrationPosterID(ctx context.Context) {
1517
for _, gitService := range structs.SupportedFullGitService {
16-
if err := updateMigrationPosterIDByGitService(gitService); err != nil {
18+
select {
19+
case <-ctx.Done():
20+
log.Warn("UpdateMigrationPosterID aborted due to shutdown before %s", gitService.Name())
21+
default:
22+
}
23+
if err := updateMigrationPosterIDByGitService(ctx, gitService); err != nil {
1724
log.Error("updateMigrationPosterIDByGitService failed: %v", err)
1825
}
1926
}
2027
}
2128

22-
func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error {
29+
func updateMigrationPosterIDByGitService(ctx context.Context, tp structs.GitServiceType) error {
2330
provider := tp.Name()
2431
if len(provider) == 0 {
2532
return nil
@@ -28,6 +35,13 @@ func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error {
2835
const batchSize = 100
2936
var start int
3037
for {
38+
select {
39+
case <-ctx.Done():
40+
log.Warn("UpdateMigrationPosterIDByGitService(%s) aborted due to shutdown", tp.Name())
41+
return nil
42+
default:
43+
}
44+
3145
users, err := models.FindExternalUsersByProvider(models.FindExternalUserOptions{
3246
Provider: provider,
3347
Start: start,
@@ -38,6 +52,12 @@ func updateMigrationPosterIDByGitService(tp structs.GitServiceType) error {
3852
}
3953

4054
for _, user := range users {
55+
select {
56+
case <-ctx.Done():
57+
log.Warn("UpdateMigrationPosterIDByGitService(%s) aborted due to shutdown", tp.Name())
58+
return nil
59+
default:
60+
}
4161
externalUserID := user.ExternalID
4262
if err := models.UpdateMigrationsByType(tp, externalUserID, user.UserID); err != nil {
4363
log.Error("UpdateMigrationsByType type %s external user id %v to local user id %v failed: %v", tp.Name(), user.ExternalID, user.UserID, err)

0 commit comments

Comments
 (0)