@@ -1119,6 +1119,122 @@ describe("FairDequeuingStrategy", () => {
1119
1119
expect ( env2Queues . length ) . toBe ( 2 ) ;
1120
1120
}
1121
1121
) ;
1122
+
1123
+ redisTest (
1124
+ "should fairly distribute queues when using maximumQueuePerEnvCount over time" ,
1125
+ async ( { redisOptions } ) => {
1126
+ const redis = createRedisClient ( redisOptions ) ;
1127
+
1128
+ const keyProducer = createKeyProducer ( "test" ) ;
1129
+ const strategy = new FairDequeuingStrategy ( {
1130
+ tracer,
1131
+ redis,
1132
+ keys : keyProducer ,
1133
+ defaultEnvConcurrency : 5 ,
1134
+ parentQueueLimit : 100 ,
1135
+ seed : "test-seed-fair-distribution" ,
1136
+ maximumQueuePerEnvCount : 2 , // Only take 2 queues at a time
1137
+ biases : {
1138
+ concurrencyLimitBias : 0 ,
1139
+ availableCapacityBias : 0 ,
1140
+ queueAgeRandomization : 0.3 , // Add some randomization to allow newer queues a chance
1141
+ } ,
1142
+ } ) ;
1143
+
1144
+ const now = Date . now ( ) ;
1145
+
1146
+ // Setup one environment with 5 queues of different ages
1147
+ const queues = [
1148
+ { age : 5000 , id : "queue-0" } , // Oldest
1149
+ { age : 4000 , id : "queue-1" } ,
1150
+ { age : 3000 , id : "queue-2" } ,
1151
+ { age : 2000 , id : "queue-3" } ,
1152
+ { age : 1000 , id : "queue-4" } , // Newest
1153
+ ] ;
1154
+
1155
+ // Setup the environment and its queues
1156
+ await setupConcurrency ( {
1157
+ redis,
1158
+ keyProducer,
1159
+ env : { id : "env-1" , currentConcurrency : 0 , limit : 5 } ,
1160
+ } ) ;
1161
+
1162
+ for ( const queue of queues ) {
1163
+ await setupQueue ( {
1164
+ redis,
1165
+ keyProducer,
1166
+ parentQueue : "parent-queue" ,
1167
+ score : now - queue . age ,
1168
+ queueId : queue . id ,
1169
+ orgId : "org-1" ,
1170
+ envId : "env-1" ,
1171
+ } ) ;
1172
+ }
1173
+
1174
+ // Run multiple iterations and track which queues are selected
1175
+ const iterations = 1000 ;
1176
+ const queueSelectionCounts : Record < string , number > = { } ;
1177
+ const queuePairings : Record < string , number > = { } ;
1178
+
1179
+ for ( let i = 0 ; i < iterations ; i ++ ) {
1180
+ const result = await strategy . distributeFairQueuesFromParentQueue (
1181
+ "parent-queue" ,
1182
+ `consumer-${ i } `
1183
+ ) ;
1184
+
1185
+ // There should be exactly one environment
1186
+ expect ( result . length ) . toBe ( 1 ) ;
1187
+ const selectedQueues = result [ 0 ] . queues ;
1188
+
1189
+ // Should always get exactly 2 queues due to maximumQueuePerEnvCount
1190
+ expect ( selectedQueues . length ) . toBe ( 2 ) ;
1191
+
1192
+ // Track individual queue selections
1193
+ for ( const queueId of selectedQueues ) {
1194
+ const baseQueueId = queueId . split ( ":" ) . pop ( ) ! ;
1195
+ queueSelectionCounts [ baseQueueId ] = ( queueSelectionCounts [ baseQueueId ] || 0 ) + 1 ;
1196
+ }
1197
+
1198
+ // Track queue pairings to ensure variety
1199
+ const [ first , second ] = selectedQueues . map ( ( qId ) => qId . split ( ":" ) . pop ( ) ! ) . sort ( ) ;
1200
+ const pairingKey = `${ first } -${ second } ` ;
1201
+ queuePairings [ pairingKey ] = ( queuePairings [ pairingKey ] || 0 ) + 1 ;
1202
+ }
1203
+
1204
+ console . log ( "\nQueue Selection Statistics:" ) ;
1205
+ for ( const [ queueId , count ] of Object . entries ( queueSelectionCounts ) ) {
1206
+ const percentage = ( count / ( iterations * 2 ) ) * 100 ; // Times 2 because we select 2 queues each time
1207
+ console . log ( `${ queueId } : ${ percentage . toFixed ( 2 ) } % (${ count } times)` ) ;
1208
+ }
1209
+
1210
+ console . log ( "\nQueue Pairing Statistics:" ) ;
1211
+ for ( const [ pair , count ] of Object . entries ( queuePairings ) ) {
1212
+ const percentage = ( count / iterations ) * 100 ;
1213
+ console . log ( `${ pair } : ${ percentage . toFixed ( 2 ) } % (${ count } times)` ) ;
1214
+ }
1215
+
1216
+ // Verify that all queues were selected at least once
1217
+ for ( const queue of queues ) {
1218
+ expect ( queueSelectionCounts [ queue . id ] ) . toBeGreaterThan ( 0 ) ;
1219
+ }
1220
+
1221
+ // Calculate standard deviation of selection percentages
1222
+ const selectionPercentages = Object . values ( queueSelectionCounts ) . map (
1223
+ ( count ) => ( count / ( iterations * 2 ) ) * 100
1224
+ ) ;
1225
+ const stdDev = calculateStandardDeviation ( selectionPercentages ) ;
1226
+
1227
+ // The standard deviation should be reasonable given our age bias
1228
+ // Higher stdDev means more bias towards older queues
1229
+ // We expect some bias due to queueAgeRandomization being 0.3
1230
+ expect ( stdDev ) . toBeLessThan ( 15 ) ; // Allow for age-based bias but not extreme
1231
+
1232
+ // Verify we get different pairings of queues
1233
+ const uniquePairings = Object . keys ( queuePairings ) . length ;
1234
+ // With 5 queues, we can have 10 possible unique pairs
1235
+ expect ( uniquePairings ) . toBeGreaterThan ( 5 ) ; // Should see at least half of possible combinations
1236
+ }
1237
+ ) ;
1122
1238
} ) ;
1123
1239
1124
1240
// Helper function to flatten results for counting
0 commit comments