@@ -30,6 +30,7 @@ import {
30
30
type Result ,
31
31
} from "@internal/redis" ;
32
32
import { MessageNotFoundError } from "./errors.js" ;
33
+ import { tryCatch } from "@trigger.dev/core" ;
33
34
34
35
const SemanticAttributes = {
35
36
QUEUE : "runqueue.queue" ,
@@ -51,7 +52,6 @@ export type RunQueueOptions = {
51
52
verbose ?: boolean ;
52
53
logger ?: Logger ;
53
54
retryOptions ?: RetryOptions ;
54
- maxDequeueLoopAttempts ?: number ;
55
55
} ;
56
56
57
57
type DequeuedMessage = {
@@ -78,7 +78,6 @@ export class RunQueue {
78
78
private redis : Redis ;
79
79
public keys : RunQueueKeyProducer ;
80
80
private queueSelectionStrategy : RunQueueSelectionStrategy ;
81
- private maxDequeueLoopAttempts : number ;
82
81
83
82
constructor ( private readonly options : RunQueueOptions ) {
84
83
this . retryOptions = options . retryOptions ?? defaultRetrySettings ;
@@ -94,7 +93,6 @@ export class RunQueue {
94
93
95
94
this . keys = options . keys ;
96
95
this . queueSelectionStrategy = options . queueSelectionStrategy ;
97
- this . maxDequeueLoopAttempts = options . maxDequeueLoopAttempts ?? 10 ;
98
96
99
97
this . subscriber = createRedisClient ( options . redis , {
100
98
onError : ( error ) => {
@@ -396,55 +394,45 @@ export class RunQueue {
396
394
397
395
let attemptedEnvs = 0 ;
398
396
let attemptedQueues = 0 ;
399
- let dequeueLoopAttempts = 0 ;
400
397
401
398
const messages : DequeuedMessage [ ] = [ ] ;
402
399
403
- // Each env starts with its list of candidate queues
404
- const tenantQueues : Record < string , string [ ] > = { } ;
405
-
406
- // Initialize tenantQueues with the queues for each env
407
400
for ( const env of envQueues ) {
408
- tenantQueues [ env . envId ] = [ ...env . queues ] ; // Create a copy of the queues array
409
- }
410
-
411
- // Continue until we've hit max count or all tenants have empty queue lists
412
- while (
413
- messages . length < maxCount &&
414
- Object . values ( tenantQueues ) . some ( ( queues ) => queues . length > 0 ) &&
415
- dequeueLoopAttempts < this . maxDequeueLoopAttempts
416
- ) {
417
- dequeueLoopAttempts ++ ;
401
+ attemptedEnvs ++ ;
418
402
419
- for ( const env of envQueues ) {
420
- attemptedEnvs ++ ;
421
-
422
- // Skip if this tenant has no more queues
423
- if ( tenantQueues [ env . envId ] . length === 0 ) {
424
- continue ;
425
- }
426
-
427
- // Pop the next queue (using round-robin order)
428
- const queue = tenantQueues [ env . envId ] . shift ( ) ! ;
403
+ for ( const queue of env . queues ) {
429
404
attemptedQueues ++ ;
430
405
431
406
// Attempt to dequeue from this queue
432
- const message = await this . #callDequeueMessage( {
433
- messageQueue : queue ,
434
- } ) ;
407
+ const [ error , message ] = await tryCatch (
408
+ this . #callDequeueMessage( {
409
+ messageQueue : queue ,
410
+ } )
411
+ ) ;
412
+
413
+ if ( error ) {
414
+ this . logger . error (
415
+ `[dequeueMessageInSharedQueue][${ this . name } ] Failed to dequeue from queue ${ queue } ` ,
416
+ {
417
+ error,
418
+ }
419
+ ) ;
420
+ }
435
421
436
422
if ( message ) {
437
423
messages . push ( message ) ;
438
- // Re-add this queue at the end, since it might have more messages
439
- tenantQueues [ env . envId ] . push ( queue ) ;
440
424
}
441
- // If message is null, do not re-add the queue in this cycle
442
425
443
- // If we've reached maxCount, break out of the loop
426
+ // If we've reached maxCount, we don't want to look at this env anymore
444
427
if ( messages . length >= maxCount ) {
445
428
break ;
446
429
}
447
430
}
431
+
432
+ // If we've reached maxCount, we're completely done
433
+ if ( messages . length >= maxCount ) {
434
+ break ;
435
+ }
448
436
}
449
437
450
438
span . setAttributes ( {
0 commit comments