1
1
import type {
2
2
CronItem ,
3
3
CronItemOptions ,
4
- Job as GraphileJob ,
4
+ DbJob as GraphileJob ,
5
5
Runner as GraphileRunner ,
6
6
JobHelpers ,
7
7
RunnerOptions ,
8
8
Task ,
9
9
TaskList ,
10
10
TaskSpec ,
11
+ WorkerUtils ,
11
12
} from "graphile-worker" ;
12
- import { run as graphileRun , parseCronItems } from "graphile-worker" ;
13
+ import { run as graphileRun , makeWorkerUtils , parseCronItems } from "graphile-worker" ;
13
14
import { SpanKind , trace } from "@opentelemetry/api" ;
14
15
15
16
import omit from "lodash.omit" ;
16
17
import { z } from "zod" ;
17
- import { PrismaClient , PrismaClientOrTransaction } from "~/db.server" ;
18
+ import { $replica , PrismaClient , PrismaClientOrTransaction } from "~/db.server" ;
18
19
import { PgListenService } from "~/services/db/pgListen.server" ;
19
20
import { workerLogger as logger } from "~/services/logger.server" ;
20
21
import { flattenAttributes } from "@trigger.dev/core/v3" ;
@@ -34,8 +35,8 @@ const RawCronPayloadSchema = z.object({
34
35
35
36
const GraphileJobSchema = z . object ( {
36
37
id : z . coerce . string ( ) ,
37
- queue_name : z . string ( ) . nullable ( ) ,
38
- task_identifier : z . string ( ) ,
38
+ job_queue_id : z . number ( ) . nullable ( ) ,
39
+ task_id : z . number ( ) ,
39
40
payload : z . unknown ( ) ,
40
41
priority : z . number ( ) ,
41
42
run_at : z . coerce . date ( ) ,
@@ -72,7 +73,7 @@ type RecurringTaskPayload = {
72
73
73
74
export type ZodRecurringTasks = {
74
75
[ key : string ] : {
75
- pattern : string ;
76
+ match : string ;
76
77
options ?: CronItemOptions ;
77
78
handler : ( payload : RecurringTaskPayload , job : GraphileJob ) => Promise < void > ;
78
79
} ;
@@ -129,6 +130,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
129
130
#rateLimiter?: ZodWorkerRateLimiter ;
130
131
#shutdownTimeoutInMs?: number ;
131
132
#shuttingDown = false ;
133
+ #workerUtils?: WorkerUtils ;
132
134
133
135
constructor ( options : ZodWorkerOptions < TMessageCatalog > ) {
134
136
this . #name = options . name ;
@@ -158,6 +160,8 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
158
160
159
161
const parsedCronItems = parseCronItems ( this . #createCronItemsFromRecurringTasks( ) ) ;
160
162
163
+ this . #workerUtils = await makeWorkerUtils ( this . #runnerOptions) ;
164
+
161
165
this . #runner = await graphileRun ( {
162
166
...this . #runnerOptions,
163
167
noHandleSignals : true ,
@@ -188,7 +192,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
188
192
this . #logDebug( "Detected incoming migration" , { latestMigration } ) ;
189
193
190
194
if ( latestMigration > 10 ) {
191
- // already migrated past v0.14 - nothing to do
195
+ this . #logDebug ( "Already migrated past v0.14 - nothing to do" , { latestMigration } ) ;
192
196
return ;
193
197
}
194
198
@@ -263,6 +267,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
263
267
264
268
public async stop ( ) {
265
269
await this . #runner?. stop ( ) ;
270
+ await this . #workerUtils?. release ( ) ;
266
271
}
267
272
268
273
public async enqueue < K extends keyof TMessageCatalog > (
@@ -442,12 +447,29 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
442
447
return taskList ;
443
448
}
444
449
450
+ async #getQueueName( queueId : number | null ) {
451
+ if ( queueId === null ) {
452
+ return ;
453
+ }
454
+
455
+ const schema = z . array ( z . object ( { queue_name : z . string ( ) } ) ) ;
456
+
457
+ const rawQueueNameResults = await $replica . $queryRawUnsafe (
458
+ `SELECT queue_name FROM ${ this . graphileWorkerSchema } ._private_job_queues WHERE id = $1` ,
459
+ queueId
460
+ ) ;
461
+
462
+ const queueNameResults = schema . parse ( rawQueueNameResults ) ;
463
+
464
+ return queueNameResults [ 0 ] ?. queue_name ;
465
+ }
466
+
445
467
async #rescheduleTask( payload : unknown , helpers : JobHelpers ) {
446
468
this . #logDebug( "Rescheduling task" , { payload, job : helpers . job } ) ;
447
469
448
470
await this . enqueue ( helpers . job . task_identifier , payload , {
449
471
runAt : helpers . job . run_at ,
450
- queueName : helpers . job . queue_name ?? undefined ,
472
+ queueName : await this . #getQueueName ( helpers . job . job_queue_id ) ,
451
473
priority : helpers . job . priority ,
452
474
jobKey : helpers . job . key ?? undefined ,
453
475
flags : Object . keys ( helpers . job . flags ?? [ ] ) ,
@@ -460,7 +482,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
460
482
461
483
if ( this . #cleanup) {
462
484
cronItems . push ( {
463
- pattern : this . #cleanup. frequencyExpression ,
485
+ match : this . #cleanup. frequencyExpression ,
464
486
identifier : CLEANUP_TASK_NAME ,
465
487
task : CLEANUP_TASK_NAME ,
466
488
options : this . #cleanup. taskOptions ,
@@ -469,7 +491,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
469
491
470
492
if ( this . #reporter) {
471
493
cronItems . push ( {
472
- pattern : "50 * * * *" , // Every hour at 50 minutes past the hour
494
+ match : "50 * * * *" , // Every hour at 50 minutes past the hour
473
495
identifier : REPORTER_TASK_NAME ,
474
496
task : REPORTER_TASK_NAME ,
475
497
} ) ;
@@ -481,7 +503,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
481
503
482
504
for ( const [ key , task ] of Object . entries ( this . #recurringTasks) ) {
483
505
const cronItem : CronItem = {
484
- pattern : task . pattern ,
506
+ match : task . match ,
485
507
identifier : key ,
486
508
task : key ,
487
509
options : task . options ,
@@ -529,7 +551,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
529
551
attributes : {
530
552
"job.task_identifier" : job . task_identifier ,
531
553
"job.id" : job . id ,
532
- ...( job . queue_name ? { "job.queue_name " : job . queue_name } : { } ) ,
554
+ ...( job . job_queue_id ? { "job.queue_id " : job . job_queue_id } : { } ) ,
533
555
...flattenAttributes ( job . payload as Record < string , unknown > , "job.payload" ) ,
534
556
"job.priority" : job . priority ,
535
557
"job.run_at" : job . run_at . toISOString ( ) ,
@@ -599,7 +621,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
599
621
attributes : {
600
622
"job.task_identifier" : job . task_identifier ,
601
623
"job.id" : job . id ,
602
- ...( job . queue_name ? { "job.queue_name " : job . queue_name } : { } ) ,
624
+ ...( job . job_queue_id ? { "job.queue_id " : job . job_queue_id } : { } ) ,
603
625
...flattenAttributes ( job . payload as Record < string , unknown > , "job.payload" ) ,
604
626
"job.priority" : job . priority ,
605
627
"job.run_at" : job . run_at . toISOString ( ) ,
@@ -638,6 +660,10 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
638
660
return ;
639
661
}
640
662
663
+ if ( ! this . #workerUtils) {
664
+ throw new Error ( "WorkerUtils need to be initialized before running job cleanup." ) ;
665
+ }
666
+
641
667
const job = helpers . job ;
642
668
643
669
logger . debug ( "Received cleanup task" , {
@@ -663,23 +689,38 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
663
689
payload,
664
690
} ) ;
665
691
666
- const rawResults = await this . #prisma. $queryRawUnsafe (
667
- `WITH rows AS (SELECT id FROM ${ this . graphileWorkerSchema } .jobs WHERE run_at < $1 AND locked_at IS NULL AND max_attempts = attempts LIMIT $2 FOR UPDATE) DELETE FROM ${ this . graphileWorkerSchema } .jobs WHERE id IN (SELECT id FROM rows) RETURNING id` ,
692
+ const rawResults = await $replica . $queryRawUnsafe (
693
+ `SELECT id
694
+ FROM ${ this . graphileWorkerSchema } .jobs
695
+ WHERE run_at < $1
696
+ AND locked_at IS NULL
697
+ AND max_attempts = attempts
698
+ LIMIT $2` ,
668
699
expirationDate ,
669
700
this . #cleanup. maxCount
670
701
) ;
671
702
672
- const results = Array . isArray ( rawResults ) ? rawResults : [ ] ;
703
+ const results = z
704
+ . array (
705
+ z . object ( {
706
+ id : z . coerce . string ( ) ,
707
+ } )
708
+ )
709
+ . parse ( rawResults ) ;
710
+
711
+ const completedJobs = await this . #workerUtils. completeJobs ( results . map ( ( job ) => job . id ) ) ;
673
712
674
713
logger . debug ( "Cleaned up old jobs" , {
675
- count : results . length ,
714
+ found : results . length ,
715
+ deleted : completedJobs . length ,
676
716
expirationDate,
677
717
payload,
678
718
} ) ;
679
719
680
720
if ( this . #reporter) {
681
721
await this . #reporter( "cleanup_stats" , {
682
- count : results . length ,
722
+ found : results . length ,
723
+ deleted : completedJobs . length ,
683
724
expirationDate,
684
725
ts : payload . _cron . ts ,
685
726
} ) ;
@@ -711,7 +752,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
711
752
const schema = z . array ( z . object ( { count : z . coerce . number ( ) } ) ) ;
712
753
713
754
// Count the number of jobs that have been added since the startAt date and before the payload._cron.ts date
714
- const rawAddedResults = await this . #prisma . $queryRawUnsafe (
755
+ const rawAddedResults = await $replica . $queryRawUnsafe (
715
756
`SELECT COUNT(*) FROM ${ this . graphileWorkerSchema } .jobs WHERE created_at > $1 AND created_at < $2` ,
716
757
startAt ,
717
758
payload . _cron . ts
@@ -720,7 +761,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
720
761
const addedCountResults = schema . parse ( rawAddedResults ) [ 0 ] ;
721
762
722
763
// Count the total number of jobs in the jobs table
723
- const rawTotalResults = await this . #prisma . $queryRawUnsafe (
764
+ const rawTotalResults = await $replica . $queryRawUnsafe (
724
765
`SELECT COUNT(*) FROM ${ this . graphileWorkerSchema } .jobs`
725
766
) ;
726
767
0 commit comments