Skip to content

Commit c50f8aa

Browse files
authored
Delayed queue deletion (#1952)
1 parent 78c253e commit c50f8aa

File tree

3 files changed

+72
-11
lines changed

3 files changed

+72
-11
lines changed

pkg/lib/cron/cron.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func Run(f func() error, errHandler func(error), delay time.Duration) Cron {
3232
cronCancel := make(chan struct{}, 1)
3333

3434
runCron := func() {
35-
defer recoverer(errHandler)
35+
defer Recoverer(errHandler)
3636
err := f()
3737
if err != nil && errHandler != nil {
3838
errHandler(err)
@@ -69,7 +69,7 @@ func (c *Cron) Cancel() {
6969
c.cronCancel <- struct{}{}
7070
}
7171

72-
func recoverer(errHandler func(error)) {
72+
func Recoverer(errHandler func(error)) {
7373
if errInterface := recover(); errInterface != nil {
7474
err := errors.CastRecoverError(errInterface)
7575
errors.PrintStacktrace(err)

pkg/operator/resources/job/batchapi/job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func handleJobSubmissionError(jobKey spec.JobKey, jobErr error) {
224224
func deleteJobRuntimeResources(jobKey spec.JobKey) error {
225225
err := errors.FirstError(
226226
deleteK8sJob(jobKey),
227-
deleteQueueByJobKeyIfExists(jobKey),
227+
deleteQueueWithDelay(jobKey),
228228
saveMetricsToCloud(jobKey),
229229
)
230230

pkg/operator/resources/job/batchapi/queue.go

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ package batchapi
1919
import (
2020
"fmt"
2121
"strings"
22+
"time"
2223

2324
"github.com/aws/aws-sdk-go/aws"
2425
"github.com/aws/aws-sdk-go/service/sqs"
2526
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
27+
"github.com/cortexlabs/cortex/pkg/lib/cron"
2628
"github.com/cortexlabs/cortex/pkg/lib/errors"
2729
libjson "github.com/cortexlabs/cortex/pkg/lib/json"
2830
s "github.com/cortexlabs/cortex/pkg/lib/strings"
@@ -31,6 +33,11 @@ import (
3133
"github.com/cortexlabs/cortex/pkg/types/spec"
3234
)
3335

36+
const (
37+
_markForDeletion = "cortex.dev/to-be-deleted"
38+
_queueGraceKillTimePeriod = 5 * time.Minute
39+
)
40+
3441
func apiQueueNamePrefix(apiName string) string {
3542
return config.CoreConfig.SQSNamePrefix() + apiName + "-"
3643
}
@@ -121,21 +128,75 @@ func listQueueURLsForAllAPIs() ([]string, error) {
121128
return queueURLs, nil
122129
}
123130

124-
func deleteQueueByJobKey(jobKey spec.JobKey) error {
131+
func markForDeletion(queueURL string) error {
132+
_, err := config.AWS.SQS().TagQueue(&sqs.TagQueueInput{
133+
QueueUrl: aws.String(queueURL),
134+
Tags: aws.StringMap(map[string]string{
135+
_markForDeletion: time.Now().Format(time.RFC3339Nano),
136+
}),
137+
})
138+
if err != nil {
139+
return errors.WithStack(err)
140+
}
141+
return nil
142+
}
143+
144+
func deleteQueueWithDelay(jobKey spec.JobKey) error {
125145
queueURL, err := getJobQueueURL(jobKey)
126146
if err != nil {
127147
return err
128148
}
129149

130-
return deleteQueueByURL(queueURL)
131-
}
132-
133-
func deleteQueueByJobKeyIfExists(jobKey spec.JobKey) error {
134-
err := deleteQueueByJobKey(jobKey)
135-
if err != nil && awslib.IsNonExistentQueueErr(errors.CauseOrSelf(err)) {
150+
output, err := config.AWS.SQS().ListQueueTags(&sqs.ListQueueTagsInput{
151+
QueueUrl: aws.String(queueURL),
152+
})
153+
if err != nil {
154+
if !awslib.IsNonExistentQueueErr(errors.CauseOrSelf(err)) {
155+
operatorLogger.Error(err)
156+
}
136157
return nil
137158
}
138-
return err
159+
160+
if value, exists := output.Tags[_markForDeletion]; exists {
161+
markedTime, err := time.Parse(time.RFC3339Nano, *value)
162+
if err != nil {
163+
err = deleteQueueByURL(queueURL)
164+
if err != nil {
165+
if !awslib.IsNonExistentQueueErr(errors.CauseOrSelf(err)) {
166+
operatorLogger.Error(err)
167+
}
168+
return nil
169+
}
170+
}
171+
172+
if time.Since(markedTime) > _queueGraceKillTimePeriod {
173+
err := deleteQueueByURL(queueURL)
174+
if err != nil {
175+
if !awslib.IsNonExistentQueueErr(errors.CauseOrSelf(err)) {
176+
operatorLogger.Error(err)
177+
}
178+
return nil
179+
}
180+
}
181+
} else {
182+
operatorLogger.Info("scheduling deleting queue " + jobKey.UserString())
183+
err = markForDeletion(queueURL)
184+
if err != nil && awslib.IsNonExistentQueueErr(errors.CauseOrSelf(err)) {
185+
return nil
186+
}
187+
188+
time.AfterFunc(_queueGraceKillTimePeriod, func() {
189+
defer cron.Recoverer(nil)
190+
operatorLogger.Info("deleting queue " + jobKey.UserString())
191+
err := deleteQueueByURL(queueURL)
192+
// ignore non existent queue errors
193+
if err != nil && !awslib.IsNonExistentQueueErr(errors.CauseOrSelf(err)) {
194+
operatorLogger.Error(err)
195+
}
196+
})
197+
}
198+
199+
return nil
139200
}
140201

141202
func deleteQueueByURL(queueURL string) error {

0 commit comments

Comments
 (0)