Skip to content

Commit 0e990fd

Browse files
committed
Added more logging to TaskRunConcurrencyTracker and some more try/catches
1 parent 4148921 commit 0e990fd

File tree

1 file changed

+79
-41
lines changed

1 file changed

+79
-41
lines changed

apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts

Lines changed: 79 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
2626
async messageEnqueued(message: MessagePayload): Promise<void> {}
2727

2828
async messageDequeued(message: MessagePayload): Promise<void> {
29+
logger.debug("TaskRunConcurrencyTracker.messageDequeued()", {
30+
data: message.data,
31+
messageId: message.messageId,
32+
});
33+
2934
const data = this.getMessageData(message);
3035
if (!data) {
3136
logger.info(
@@ -45,6 +50,11 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
4550
}
4651

4752
async messageAcked(message: MessagePayload): Promise<void> {
53+
logger.debug("TaskRunConcurrencyTracker.messageAcked()", {
54+
data: message.data,
55+
messageId: message.messageId,
56+
});
57+
4858
const data = this.getMessageData(message);
4959
if (!data) {
5060
logger.info(
@@ -64,6 +74,11 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
6474
}
6575

6676
async messageNacked(message: MessagePayload): Promise<void> {
77+
logger.debug("TaskRunConcurrencyTracker.messageNacked()", {
78+
data: message.data,
79+
messageId: message.messageId,
80+
});
81+
6782
const data = this.getMessageData(message);
6883
if (!data) {
6984
logger.info(
@@ -103,14 +118,18 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
103118
environmentId: string;
104119
deployed: boolean;
105120
}): Promise<void> {
106-
const pipeline = this.redis.pipeline();
121+
try {
122+
const pipeline = this.redis.pipeline();
107123

108-
pipeline.sadd(this.getTaskKey(projectId, taskId), runId);
109-
pipeline.sadd(this.getTaskEnvironmentKey(projectId, taskId, environmentId), runId);
110-
pipeline.sadd(this.getEnvironmentKey(projectId, environmentId), runId);
111-
pipeline.sadd(this.getGlobalKey(deployed), runId);
124+
pipeline.sadd(this.getTaskKey(projectId, taskId), runId);
125+
pipeline.sadd(this.getTaskEnvironmentKey(projectId, taskId, environmentId), runId);
126+
pipeline.sadd(this.getEnvironmentKey(projectId, environmentId), runId);
127+
pipeline.sadd(this.getGlobalKey(deployed), runId);
112128

113-
await pipeline.exec();
129+
await pipeline.exec();
130+
} catch (error) {
131+
logger.error("TaskRunConcurrencyTracker.executionStarted() error", { error });
132+
}
114133
}
115134

116135
private async executionFinished({
@@ -126,14 +145,18 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
126145
environmentId: string;
127146
deployed: boolean;
128147
}): Promise<void> {
129-
const pipeline = this.redis.pipeline();
148+
try {
149+
const pipeline = this.redis.pipeline();
130150

131-
pipeline.srem(this.getTaskKey(projectId, taskId), runId);
132-
pipeline.srem(this.getTaskEnvironmentKey(projectId, taskId, environmentId), runId);
133-
pipeline.srem(this.getEnvironmentKey(projectId, environmentId), runId);
134-
pipeline.srem(this.getGlobalKey(deployed), runId);
151+
pipeline.srem(this.getTaskKey(projectId, taskId), runId);
152+
pipeline.srem(this.getTaskEnvironmentKey(projectId, taskId, environmentId), runId);
153+
pipeline.srem(this.getEnvironmentKey(projectId, environmentId), runId);
154+
pipeline.srem(this.getGlobalKey(deployed), runId);
135155

136-
await pipeline.exec();
156+
await pipeline.exec();
157+
} catch (error) {
158+
logger.error("TaskRunConcurrencyTracker.executionFinished() error", { error });
159+
}
137160
}
138161

139162
async taskConcurrentRunCount(projectId: string, taskId: string): Promise<number> {
@@ -149,21 +172,26 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
149172
}
150173

151174
private async getTaskCounts(projectId: string, taskIds: string[]): Promise<number[]> {
152-
const pipeline = this.redis.pipeline();
153-
taskIds.forEach((taskId) => {
154-
pipeline.scard(this.getTaskKey(projectId, taskId));
155-
});
156-
const results = await pipeline.exec();
157-
if (!results) {
175+
try {
176+
const pipeline = this.redis.pipeline();
177+
taskIds.forEach((taskId) => {
178+
pipeline.scard(this.getTaskKey(projectId, taskId));
179+
});
180+
const results = await pipeline.exec();
181+
if (!results) {
182+
return [];
183+
}
184+
return results.map(([err, count]) => {
185+
if (err) {
186+
console.error("Error in getTaskCounts:", err);
187+
return 0;
188+
}
189+
return count as number;
190+
});
191+
} catch (error) {
192+
logger.error("TaskRunConcurrencyTracker.getTaskCounts() error", { error });
158193
return [];
159194
}
160-
return results.map(([err, count]) => {
161-
if (err) {
162-
console.error("Error in getTaskCounts:", err);
163-
return 0;
164-
}
165-
return count as number;
166-
});
167195
}
168196

169197
async projectTotalConcurrentRunCount(projectId: string, taskIds: string[]): Promise<number> {
@@ -177,7 +205,7 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
177205
): Promise<Record<string, number>> {
178206
const counts = await this.getTaskCounts(projectId, taskIds);
179207
return taskIds.reduce((acc, taskId, index) => {
180-
acc[taskId] = counts[index];
208+
acc[taskId] = counts[index] ?? 0;
181209
return acc;
182210
}, {} as Record<string, number>);
183211
}
@@ -186,23 +214,28 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
186214
projectId: string,
187215
environmentIds: string[]
188216
): Promise<Record<string, number>> {
189-
const pipeline = this.redis.pipeline();
190-
environmentIds.forEach((environmentId) => {
191-
pipeline.scard(this.getEnvironmentKey(projectId, environmentId));
192-
});
193-
const results = await pipeline.exec();
194-
if (!results) {
195-
return Object.fromEntries(environmentIds.map((id) => [id, 0]));
196-
}
217+
try {
218+
const pipeline = this.redis.pipeline();
219+
environmentIds.forEach((environmentId) => {
220+
pipeline.scard(this.getEnvironmentKey(projectId, environmentId));
221+
});
222+
const results = await pipeline.exec();
223+
if (!results) {
224+
return Object.fromEntries(environmentIds.map((id) => [id, 0]));
225+
}
197226

198-
return results.reduce((acc, [err, count], index) => {
199-
if (err) {
200-
console.error("Error in environmentConcurrentRunCounts:", err);
227+
return results.reduce((acc, [err, count], index) => {
228+
if (err) {
229+
console.error("Error in environmentConcurrentRunCounts:", err);
230+
return acc;
231+
}
232+
acc[environmentIds[index]] = count as number;
201233
return acc;
202-
}
203-
acc[environmentIds[index]] = count as number;
204-
return acc;
205-
}, {} as Record<string, number>);
234+
}, {} as Record<string, number>);
235+
} catch (error) {
236+
logger.error("TaskRunConcurrencyTracker.environmentConcurrentRunCounts() error", { error });
237+
return Object.fromEntries(environmentIds.map((id) => [id, 0]));
238+
}
206239
}
207240

208241
private getTaskKey(projectId: string, taskId: string): string {
@@ -231,6 +264,11 @@ function getTracker() {
231264
);
232265
}
233266

267+
logger.debug("Initializing TaskRunConcurrencyTracker", {
268+
redisHost: env.REDIS_HOST,
269+
redisPort: env.REDIS_PORT,
270+
});
271+
234272
return new TaskRunConcurrencyTracker({
235273
redis: {
236274
keyPrefix: "concurrencytracker:",

0 commit comments

Comments
 (0)