Skip to content

Commit 6944cda

Browse files
committed
report failedBatchesCount
1 parent 09145b3 commit 6944cda

File tree

1 file changed

+30
-14
lines changed

1 file changed

+30
-14
lines changed

apps/webapp/app/v3/dynamicFlushScheduler.server.ts

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,42 @@ export class DynamicFlushScheduler<T> {
1919
private readonly concurrencyLimiter: ReturnType<typeof pLimit>;
2020
private flushTimer: NodeJS.Timeout | null;
2121
private readonly callback: (flushId: string, batch: T[]) => Promise<void>;
22-
private isShuttingDown = false;
22+
private isShuttingDown;
23+
private failedBatchCount;
2324

2425
constructor(config: DynamicFlushSchedulerConfig<T>) {
2526
this.currentBatch = [];
2627
this.BATCH_SIZE = config.batchSize;
2728
this.FLUSH_INTERVAL = config.flushInterval;
2829
this.MAX_CONCURRENCY = config.maxConcurrency || 1;
2930
this.concurrencyLimiter = pLimit(this.MAX_CONCURRENCY);
30-
this.callback = config.callback;
3131
this.flushTimer = null;
32+
this.callback = config.callback;
33+
this.isShuttingDown = false;
34+
this.failedBatchCount = 0;
3235
this.startFlushTimer();
3336
this.setupShutdownHandlers();
3437

35-
// if (process.env.NODE_ENV !== "test") {
36-
// const scheduler = this;
37-
// new Gauge({
38-
// name: "dynamic_flush_scheduler_batch_size",
39-
// help: "Number of items in the current dynamic flush scheduler batch",
40-
// collect() {
41-
// this.set(scheduler.currentBatch.length);
42-
// },
43-
// registers: [metricsRegister],
44-
// });
45-
// }
38+
if (process.env.NODE_ENV !== "test") {
39+
const scheduler = this;
40+
new Gauge({
41+
name: "dynamic_flush_scheduler_batch_size",
42+
help: "Number of items in the current dynamic flush scheduler batch",
43+
collect() {
44+
this.set(scheduler.currentBatch.length);
45+
},
46+
registers: [metricsRegister],
47+
});
48+
49+
new Gauge({
50+
name: "dynamic_flush_scheduler_failed_batches",
51+
help: "Number of failed batches",
52+
collect() {
53+
this.set(scheduler.failedBatchCount);
54+
},
55+
registers: [metricsRegister],
56+
});
57+
}
4658
}
4759

4860
async addToBatch(items: T[]): Promise<void> {
@@ -113,10 +125,14 @@ export class DynamicFlushScheduler<T> {
113125
batchId,
114126
error,
115127
});
128+
throw error;
116129
}
117130
})
118131
);
119132

120-
await Promise.all(promises);
133+
const results = await Promise.allSettled(promises);
134+
135+
const failedBatches = results.filter((result) => result.status === "rejected").length;
136+
this.failedBatchCount += failedBatches;
121137
}
122138
}

0 commit comments

Comments
 (0)