@@ -1235,6 +1235,228 @@ describe("FairDequeuingStrategy", () => {
1235
1235
expect ( uniquePairings ) . toBeGreaterThan ( 5 ) ; // Should see at least half of possible combinations
1236
1236
}
1237
1237
) ;
1238
+
1239
+ redisTest (
1240
+ "should handle maximumQueuePerEnvCount larger than available queues" ,
1241
+ async ( { redisOptions } ) => {
1242
+ const redis = createRedisClient ( redisOptions ) ;
1243
+
1244
+ const keyProducer = createKeyProducer ( "test" ) ;
1245
+ const strategy = new FairDequeuingStrategy ( {
1246
+ tracer,
1247
+ redis,
1248
+ keys : keyProducer ,
1249
+ defaultEnvConcurrency : 5 ,
1250
+ parentQueueLimit : 100 ,
1251
+ seed : "test-seed-max-larger" ,
1252
+ maximumQueuePerEnvCount : 5 , // Larger than the number of queues we'll create
1253
+ } ) ;
1254
+
1255
+ const now = Date . now ( ) ;
1256
+
1257
+ // Setup two environments with different numbers of queues
1258
+ const envSetups = [
1259
+ {
1260
+ envId : "env-1" ,
1261
+ queues : [ { age : 5000 } , { age : 4000 } ] ,
1262
+ } ,
1263
+ {
1264
+ envId : "env-2" ,
1265
+ queues : [ { age : 3000 } ] ,
1266
+ } ,
1267
+ ] ;
1268
+
1269
+ // Setup queues and concurrency for each env
1270
+ for ( const setup of envSetups ) {
1271
+ await setupConcurrency ( {
1272
+ redis,
1273
+ keyProducer,
1274
+ env : { id : setup . envId , currentConcurrency : 0 , limit : 5 } ,
1275
+ } ) ;
1276
+
1277
+ for ( let i = 0 ; i < setup . queues . length ; i ++ ) {
1278
+ await setupQueue ( {
1279
+ redis,
1280
+ keyProducer,
1281
+ parentQueue : "parent-queue" ,
1282
+ score : now - setup . queues [ i ] . age ,
1283
+ queueId : `queue-${ setup . envId } -${ i } ` ,
1284
+ orgId : `org-${ setup . envId } ` ,
1285
+ envId : setup . envId ,
1286
+ } ) ;
1287
+ }
1288
+ }
1289
+
1290
+ const result = await strategy . distributeFairQueuesFromParentQueue (
1291
+ "parent-queue" ,
1292
+ "consumer-1"
1293
+ ) ;
1294
+
1295
+ // Should get all queues from both environments
1296
+ const env1Queues = result . find ( ( eq ) => eq . envId === "env-1" ) ?. queues ?? [ ] ;
1297
+ const env2Queues = result . find ( ( eq ) => eq . envId === "env-2" ) ?. queues ?? [ ] ;
1298
+
1299
+ // env-1 should have both its queues
1300
+ expect ( env1Queues . length ) . toBe ( 2 ) ;
1301
+ // env-2 should have its single queue
1302
+ expect ( env2Queues . length ) . toBe ( 1 ) ;
1303
+ }
1304
+ ) ;
1305
+
1306
+ redisTest (
1307
+ "should handle empty environments with maximumQueuePerEnvCount" ,
1308
+ async ( { redisOptions } ) => {
1309
+ const redis = createRedisClient ( redisOptions ) ;
1310
+
1311
+ const keyProducer = createKeyProducer ( "test" ) ;
1312
+ const strategy = new FairDequeuingStrategy ( {
1313
+ tracer,
1314
+ redis,
1315
+ keys : keyProducer ,
1316
+ defaultEnvConcurrency : 5 ,
1317
+ parentQueueLimit : 100 ,
1318
+ seed : "test-seed-empty-env" ,
1319
+ maximumQueuePerEnvCount : 2 ,
1320
+ } ) ;
1321
+
1322
+ const now = Date . now ( ) ;
1323
+
1324
+ // Setup two environments, one with queues, one without
1325
+ await setupConcurrency ( {
1326
+ redis,
1327
+ keyProducer,
1328
+ env : { id : "env-1" , currentConcurrency : 0 , limit : 5 } ,
1329
+ } ) ;
1330
+
1331
+ await setupConcurrency ( {
1332
+ redis,
1333
+ keyProducer,
1334
+ env : { id : "env-2" , currentConcurrency : 0 , limit : 5 } ,
1335
+ } ) ;
1336
+
1337
+ // Only add queues to env-1
1338
+ await setupQueue ( {
1339
+ redis,
1340
+ keyProducer,
1341
+ parentQueue : "parent-queue" ,
1342
+ score : now - 5000 ,
1343
+ queueId : "queue-1" ,
1344
+ orgId : "org-1" ,
1345
+ envId : "env-1" ,
1346
+ } ) ;
1347
+
1348
+ await setupQueue ( {
1349
+ redis,
1350
+ keyProducer,
1351
+ parentQueue : "parent-queue" ,
1352
+ score : now - 4000 ,
1353
+ queueId : "queue-2" ,
1354
+ orgId : "org-1" ,
1355
+ envId : "env-1" ,
1356
+ } ) ;
1357
+
1358
+ const result = await strategy . distributeFairQueuesFromParentQueue (
1359
+ "parent-queue" ,
1360
+ "consumer-1"
1361
+ ) ;
1362
+
1363
+ // Should only get one environment in the result
1364
+ expect ( result . length ) . toBe ( 1 ) ;
1365
+ expect ( result [ 0 ] . envId ) . toBe ( "env-1" ) ;
1366
+ expect ( result [ 0 ] . queues . length ) . toBe ( 2 ) ;
1367
+ }
1368
+ ) ;
1369
+
1370
+ redisTest (
1371
+ "should respect maximumQueuePerEnvCount with priority offset queues" ,
1372
+ async ( { redisOptions } ) => {
1373
+ const redis = createRedisClient ( redisOptions ) ;
1374
+
1375
+ const keyProducer = createKeyProducer ( "test" ) ;
1376
+ const strategy = new FairDequeuingStrategy ( {
1377
+ tracer,
1378
+ redis,
1379
+ keys : keyProducer ,
1380
+ defaultEnvConcurrency : 5 ,
1381
+ parentQueueLimit : 100 ,
1382
+ seed : "test-seed-priority" ,
1383
+ maximumQueuePerEnvCount : 2 ,
1384
+ biases : {
1385
+ concurrencyLimitBias : 0 ,
1386
+ availableCapacityBias : 0 ,
1387
+ queueAgeRandomization : 0.3 ,
1388
+ } ,
1389
+ } ) ;
1390
+
1391
+ const now = Date . now ( ) ;
1392
+
1393
+ // Setup queues with a mix of normal and priority offset ages
1394
+ const queues = [
1395
+ { age : 5000 , id : "queue-0" } , // Normal age
1396
+ { age : 4000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET , id : "queue-1" } , // Priority
1397
+ { age : 3000 , id : "queue-2" } , // Normal age
1398
+ { age : 2000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET , id : "queue-3" } , // Priority
1399
+ { age : 1000 , id : "queue-4" } , // Normal age
1400
+ ] ;
1401
+
1402
+ await setupConcurrency ( {
1403
+ redis,
1404
+ keyProducer,
1405
+ env : { id : "env-1" , currentConcurrency : 0 , limit : 5 } ,
1406
+ } ) ;
1407
+
1408
+ for ( const queue of queues ) {
1409
+ await setupQueue ( {
1410
+ redis,
1411
+ keyProducer,
1412
+ parentQueue : "parent-queue" ,
1413
+ score : now - queue . age ,
1414
+ queueId : queue . id ,
1415
+ orgId : "org-1" ,
1416
+ envId : "env-1" ,
1417
+ } ) ;
1418
+ }
1419
+
1420
+ // Run multiple iterations to check distribution
1421
+ const iterations = 1000 ;
1422
+ const queueSelectionCounts : Record < string , number > = { } ;
1423
+
1424
+ for ( let i = 0 ; i < iterations ; i ++ ) {
1425
+ const result = await strategy . distributeFairQueuesFromParentQueue (
1426
+ "parent-queue" ,
1427
+ `consumer-${ i } `
1428
+ ) ;
1429
+
1430
+ const selectedQueues = result [ 0 ] . queues ;
1431
+ for ( const queueId of selectedQueues ) {
1432
+ const baseQueueId = queueId . split ( ":" ) . pop ( ) ! ;
1433
+ queueSelectionCounts [ baseQueueId ] = ( queueSelectionCounts [ baseQueueId ] || 0 ) + 1 ;
1434
+ }
1435
+ }
1436
+
1437
+ console . log ( "\nPriority Queue Selection Statistics:" ) ;
1438
+ for ( const [ queueId , count ] of Object . entries ( queueSelectionCounts ) ) {
1439
+ const percentage = ( count / ( iterations * 2 ) ) * 100 ;
1440
+ const isPriority =
1441
+ queues . find ( ( q ) => q . id === queueId ) ?. age ! > MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET ;
1442
+ console . log (
1443
+ `${ queueId } ${ isPriority ? " (priority)" : "" } : ${ percentage . toFixed ( 2 ) } % (${ count } times)`
1444
+ ) ;
1445
+ }
1446
+
1447
+ // Verify all queues get selected
1448
+ for ( const queue of queues ) {
1449
+ expect ( queueSelectionCounts [ queue . id ] ) . toBeGreaterThan ( 0 ) ;
1450
+ }
1451
+
1452
+ // Even with priority queues, we should still see a reasonable distribution
1453
+ const selectionPercentages = Object . values ( queueSelectionCounts ) . map (
1454
+ ( count ) => ( count / ( iterations * 2 ) ) * 100
1455
+ ) ;
1456
+ const stdDev = calculateStandardDeviation ( selectionPercentages ) ;
1457
+ expect ( stdDev ) . toBeLessThan ( 20 ) ; // Allow for slightly more variance due to priority queues
1458
+ }
1459
+ ) ;
1238
1460
} ) ;
1239
1461
1240
1462
// Helper function to flatten results for counting
0 commit comments