1
1
import { Logger } from "@trigger.dev/core/logger" ;
2
+ import { type RetryOptions } from "@trigger.dev/core/v3/schemas" ;
3
+ import { calculateNextRetryDelay } from "@trigger.dev/core/v3" ;
2
4
import { type RedisOptions } from "ioredis" ;
3
5
import os from "os" ;
4
6
import { Worker as NodeWorker } from "worker_threads" ;
@@ -9,11 +11,7 @@ type WorkerCatalog = {
9
11
[ key : string ] : {
10
12
schema : z . ZodFirstPartySchemaTypes | z . ZodDiscriminatedUnion < any , any > ;
11
13
visibilityTimeoutMs : number ;
12
- retry : {
13
- maxAttempts : number ;
14
- minDelayMs ?: number ;
15
- scaleFactor ?: number ;
16
- } ;
14
+ retry : RetryOptions ;
17
15
} ;
18
16
} ;
19
17
@@ -25,6 +23,7 @@ type JobHandler<Catalog extends WorkerCatalog, K extends keyof Catalog> = (param
25
23
id : string ;
26
24
payload : z . infer < Catalog [ K ] [ "schema" ] > ;
27
25
visibilityTimeoutMs : number ;
26
+ attempt : number ;
28
27
} ) => Promise < void > ;
29
28
30
29
type WorkerOptions < TCatalog extends WorkerCatalog > = {
@@ -148,7 +147,7 @@ class Worker<TCatalog extends WorkerCatalog> {
148
147
}
149
148
150
149
await Promise . all (
151
- items . map ( async ( { id, job, item, visibilityTimeoutMs } ) => {
150
+ items . map ( async ( { id, job, item, visibilityTimeoutMs, attempt } ) => {
152
151
const catalogItem = this . options . catalog [ job as any ] ;
153
152
const handler = this . jobs [ job as any ] ;
154
153
if ( ! handler ) {
@@ -157,7 +156,7 @@ class Worker<TCatalog extends WorkerCatalog> {
157
156
}
158
157
159
158
try {
160
- await handler ( { id, payload : item , visibilityTimeoutMs } ) ;
159
+ await handler ( { id, payload : item , visibilityTimeoutMs, attempt } ) ;
161
160
await this . queue . ack ( id ) ;
162
161
} catch ( error ) {
163
162
this . logger . error ( `Error processing item, it threw an error:` , {
@@ -170,7 +169,23 @@ class Worker<TCatalog extends WorkerCatalog> {
170
169
} ) ;
171
170
// Requeue the failed item with a delay
172
171
try {
173
- const retryDelay = catalogItem . retry . minDelayMs ?? 1_000 ;
172
+ attempt = attempt + 1 ;
173
+
174
+ const retryDelay = calculateNextRetryDelay ( catalogItem . retry , attempt ) ;
175
+
176
+ if ( ! retryDelay ) {
177
+ this . logger . error ( `Failed item ${ id } has reached max attempts, acking.` , {
178
+ name : this . options . name ,
179
+ id,
180
+ job,
181
+ item,
182
+ visibilityTimeoutMs,
183
+ attempt,
184
+ } ) ;
185
+ await this . queue . ack ( id ) ;
186
+ return ;
187
+ }
188
+
174
189
const retryDate = new Date ( Date . now ( ) + retryDelay ) ;
175
190
this . logger . info ( `Requeued failed item ${ id } with delay` , {
176
191
name : this . options . name ,
@@ -180,23 +195,28 @@ class Worker<TCatalog extends WorkerCatalog> {
180
195
retryDate,
181
196
retryDelay,
182
197
visibilityTimeoutMs,
198
+ attempt,
183
199
} ) ;
184
200
await this . queue . enqueue ( {
185
201
id,
186
202
job,
187
203
item,
188
204
availableAt : retryDate ,
205
+ attempt,
189
206
visibilityTimeoutMs,
190
207
} ) ;
191
208
} catch ( requeueError ) {
192
- this . logger . error ( `Failed to requeue item, threw error:` , {
193
- name : this . options . name ,
194
- id,
195
- job,
196
- item,
197
- visibilityTimeoutMs,
198
- error : requeueError ,
199
- } ) ;
209
+ this . logger . error (
210
+ `Failed to requeue item, threw error. Will automatically get rescheduled after the visilibity timeout.` ,
211
+ {
212
+ name : this . options . name ,
213
+ id,
214
+ job,
215
+ item,
216
+ visibilityTimeoutMs,
217
+ error : requeueError ,
218
+ }
219
+ ) ;
200
220
}
201
221
}
202
222
} )
0 commit comments