1
1
import { nanoid } from "nanoid" ;
2
2
import pLimit from "p-limit" ;
3
+ import { Gauge } from "prom-client" ;
4
+ import { metricsRegister } from "~/metrics.server" ;
3
5
import { logger } from "~/services/logger.server" ;
4
6
5
7
export type DynamicFlushSchedulerConfig < T > = {
@@ -10,7 +12,6 @@ export type DynamicFlushSchedulerConfig<T> = {
10
12
} ;
11
13
12
14
export class DynamicFlushScheduler < T > {
13
- // private batchQueue: T[][]; // Adjust the type according to your data structure
14
15
private currentBatch : T [ ] ; // Adjust the type according to your data structure
15
16
private readonly BATCH_SIZE : number ;
16
17
private readonly FLUSH_INTERVAL : number ;
@@ -30,10 +31,24 @@ export class DynamicFlushScheduler<T> {
30
31
this . flushTimer = null ;
31
32
this . startFlushTimer ( ) ;
32
33
this . setupShutdownHandlers ( ) ;
34
+
35
+ const scheduler = this ;
36
+ new Gauge ( {
37
+ name : "dynamic_flush_scheduler_batch_size" ,
38
+ help : "Number of items in the current dynamic flush scheduler batch" ,
39
+ collect ( ) {
40
+ this . set ( scheduler . currentBatch . length ) ;
41
+ } ,
42
+ registers : [ metricsRegister ] ,
43
+ } ) ;
33
44
}
34
45
35
46
async addToBatch ( items : T [ ] ) : Promise < void > {
36
47
this . currentBatch . push ( ...items ) ;
48
+ logger . debug ( "Adding items to batch" , {
49
+ batchSize : this . BATCH_SIZE ,
50
+ newSize : this . currentBatch . length ,
51
+ } ) ;
37
52
38
53
if ( this . currentBatch . length >= this . BATCH_SIZE ) {
39
54
await this . flushNextBatch ( ) ;
@@ -52,8 +67,12 @@ export class DynamicFlushScheduler<T> {
52
67
private async shutdown ( ) : Promise < void > {
53
68
if ( this . isShuttingDown ) return ;
54
69
this . isShuttingDown = true ;
70
+ logger . log ( "Shutting down dynamic flush scheduler..." ) ;
71
+
55
72
await this . checkAndFlush ( ) ;
56
73
this . clearTimer ( ) ;
74
+
75
+ logger . log ( "All items have been flushed." ) ;
57
76
}
58
77
59
78
private clearTimer ( ) : void {
0 commit comments