Skip to content

Commit 8f43aec

Browse files
committed
Handle redis errors more gracefully
1 parent 6fd7560 commit 8f43aec

File tree

2 files changed

+47
-11
lines changed

2 files changed

+47
-11
lines changed

apps/webapp/app/v3/marqs/concurrencyMonitor.server.ts

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,24 @@ export class MarqsConcurrencyMonitor {
5151
processedKeys: 0,
5252
};
5353

54-
const { stream, redis } = this.marqs.queueConcurrencyScanStream(10, () => {
55-
this._logger.debug("[MarqsConcurrencyMonitor] stream closed", {
56-
stats,
57-
});
58-
});
54+
const { stream, redis } = this.marqs.queueConcurrencyScanStream(
55+
10,
56+
() => {
57+
this._logger.debug("[MarqsConcurrencyMonitor] stream closed", {
58+
stats,
59+
});
60+
},
61+
(error) => {
62+
this._logger.debug("[MarqsConcurrencyMonitor] stream error", {
63+
stats,
64+
error: {
65+
name: error.name,
66+
message: error.message,
67+
stack: error.stack,
68+
},
69+
});
70+
}
71+
);
5972

6073
stream.on("data", async (keys) => {
6174
stream.pause();
@@ -80,9 +93,11 @@ export class MarqsConcurrencyMonitor {
8093

8194
stats.processedKeys += uniqueKeys.length;
8295

83-
await Promise.all(uniqueKeys.map((key) => this.#processKey(key, redis))).finally(() => {
84-
stream.resume();
85-
});
96+
await Promise.allSettled(uniqueKeys.map((key) => this.#processKey(key, redis))).finally(
97+
() => {
98+
stream.resume();
99+
}
100+
);
86101
});
87102
}
88103

@@ -91,8 +106,20 @@ export class MarqsConcurrencyMonitor {
91106
const orgKey = this.keys.orgCurrentConcurrencyKeyFromQueue(key);
92107
const envKey = this.keys.envCurrentConcurrencyKeyFromQueue(key);
93108

94-
// Next, we need to get all the items from the key, and any parent keys (org, env, queue) using sunion.
95-
const runIds = await redis.sunion(orgKey, envKey, key);
109+
let runIds: string[] = [];
110+
111+
try {
112+
// Next, we need to get all the items from the key, and any parent keys (org, env, queue) using sunion.
113+
runIds = await redis.sunion(orgKey, envKey, key);
114+
} catch (e) {
115+
this._logger.error("[MarqsConcurrencyMonitor] error during sunion", {
116+
key,
117+
orgKey,
118+
envKey,
119+
runIds,
120+
error: e,
121+
});
122+
}
96123

97124
if (runIds.length === 0) {
98125
return;

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,11 @@ export class MarQS {
788788
}
789789
}
790790

791-
queueConcurrencyScanStream(count: number = 100, onEndCallback?: () => void) {
791+
queueConcurrencyScanStream(
792+
count: number = 100,
793+
onEndCallback?: () => void,
794+
onErrorCallback?: (error: Error) => void
795+
) {
792796
const pattern = this.keys.queueCurrentConcurrencyScanPattern();
793797

794798
logger.debug("Starting queue concurrency scan stream", {
@@ -812,6 +816,11 @@ export class MarQS {
812816
redis.quit();
813817
});
814818

819+
stream.on("error", (error) => {
820+
onErrorCallback?.(error);
821+
redis.quit();
822+
});
823+
815824
return { stream, redis };
816825
}
817826

0 commit comments

Comments
 (0)