Skip to content

Commit de135e4

Browse files
committed
Fix for TASK_RUN_HEARTBEAT errors in deployed and dev works
1 parent 00668ff commit de135e4

File tree

7 files changed

+40
-18
lines changed

7 files changed

+40
-18
lines changed

.changeset/poor-starfishes-act.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"trigger.dev": patch
3+
---
4+
5+
Configurable deployed heartbeat interval via HEARTBEAT_INTERVAL_MS env var

apps/webapp/app/env.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ const EnvironmentSchema = z.object({
177177

178178
LOOPS_API_KEY: z.string().optional(),
179179
MARQS_DISABLE_REBALANCING: z.coerce.boolean().default(false),
180+
MARQS_VISIBILITY_TIMEOUT_MS: z.coerce
181+
.number()
182+
.int()
183+
.default(60 * 1000 * 15),
184+
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
180185

181186
VERBOSE_GRAPHILE_LOGGING: z.string().default("false"),
182187
V2_MARQS_ENABLED: z.string().default("0"),

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,15 @@ async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmen
798798
]);
799799
}
800800

801+
if (env.PROD_TASK_HEARTBEAT_INTERVAL_MS) {
802+
result = result.concat([
803+
{
804+
key: "HEARTBEAT_INTERVAL_MS",
805+
value: String(env.PROD_TASK_HEARTBEAT_INTERVAL_MS),
806+
},
807+
]);
808+
}
809+
801810
const commonVariables = await resolveCommonBuiltInVariables(runtimeEnvironment);
802811

803812
return [...result, ...commonVariables];

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ export class DevQueueConsumer {
162162
/**
163163
* @deprecated Use `taskRunHeartbeat` instead
164164
*/
165-
public async taskHeartbeat(workerId: string, id: string, seconds: number = 60) {
166-
logger.debug("[DevQueueConsumer] taskHeartbeat()", { id, seconds });
165+
public async taskHeartbeat(workerId: string, id: string) {
166+
logger.debug("[DevQueueConsumer] taskHeartbeat()", { id });
167167

168168
const taskRunAttempt = await prisma.taskRunAttempt.findUnique({
169169
where: { friendlyId: id },
@@ -173,13 +173,13 @@ export class DevQueueConsumer {
173173
return;
174174
}
175175

176-
await marqs?.heartbeatMessage(taskRunAttempt.taskRunId, seconds);
176+
await marqs?.heartbeatMessage(taskRunAttempt.taskRunId);
177177
}
178178

179-
public async taskRunHeartbeat(workerId: string, id: string, seconds: number = 60) {
180-
logger.debug("[DevQueueConsumer] taskRunHeartbeat()", { id, seconds });
179+
public async taskRunHeartbeat(workerId: string, id: string) {
180+
logger.debug("[DevQueueConsumer] taskRunHeartbeat()", { id });
181181

182-
await marqs?.heartbeatMessage(id, seconds);
182+
await marqs?.heartbeatMessage(id);
183183
}
184184

185185
public async stop(reason: string = "CLI disconnected") {

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -698,8 +698,8 @@ export class MarQS {
698698
}
699699

700700
// This should increment by the number of seconds, but with a max value of Date.now() + visibilityTimeoutInMs
701-
public async heartbeatMessage(messageId: string, seconds: number = 30) {
702-
await this.options.visibilityTimeoutStrategy.heartbeat(messageId, seconds * 1000);
701+
public async heartbeatMessage(messageId: string) {
702+
await this.options.visibilityTimeoutStrategy.heartbeat(messageId, this.visibilityTimeoutInMs);
703703
}
704704

705705
get visibilityTimeoutInMs() {
@@ -1871,7 +1871,7 @@ function getMarQSClient() {
18711871
redis: redisOptions,
18721872
defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT,
18731873
defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT,
1874-
visibilityTimeoutInMs: 120 * 1000, // 2 minutes,
1874+
visibilityTimeoutInMs: env.MARQS_VISIBILITY_TIMEOUT_MS,
18751875
enableRebalancing: !env.MARQS_DISABLE_REBALANCING,
18761876
subscriber: concurrencyTracker,
18771877
});

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,8 +1169,8 @@ class SharedQueueTasks {
11691169
} satisfies TaskRunExecutionLazyAttemptPayload;
11701170
}
11711171

1172-
async taskHeartbeat(attemptFriendlyId: string, seconds: number = 60) {
1173-
logger.debug("[SharedQueueConsumer] taskHeartbeat()", { id: attemptFriendlyId, seconds });
1172+
async taskHeartbeat(attemptFriendlyId: string) {
1173+
logger.debug("[SharedQueueConsumer] taskHeartbeat()", { id: attemptFriendlyId });
11741174

11751175
const taskRunAttempt = await prisma.taskRunAttempt.findUnique({
11761176
where: { friendlyId: attemptFriendlyId },
@@ -1180,13 +1180,13 @@ class SharedQueueTasks {
11801180
return;
11811181
}
11821182

1183-
await marqs?.heartbeatMessage(taskRunAttempt.taskRunId, seconds);
1183+
await marqs?.heartbeatMessage(taskRunAttempt.taskRunId);
11841184
}
11851185

1186-
async taskRunHeartbeat(runId: string, seconds: number = 60) {
1187-
logger.debug("[SharedQueueConsumer] taskRunHeartbeat()", { runId, seconds });
1186+
async taskRunHeartbeat(runId: string) {
1187+
logger.debug("[SharedQueueConsumer] taskRunHeartbeat()", { runId });
11881188

1189-
await marqs?.heartbeatMessage(runId, seconds);
1189+
await marqs?.heartbeatMessage(runId);
11901190
}
11911191

11921192
public async taskRunFailed(completion: TaskRunFailedExecutionResult) {

packages/cli-v3/src/entryPoints/deploy-run-worker.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,13 @@ process.on("uncaughtException", function (error, origin) {
7777
}
7878
});
7979

80-
const heartbeatIntervalMs = getEnvVar("USAGE_HEARTBEAT_INTERVAL_MS");
80+
const usageIntervalMs = getEnvVar("USAGE_HEARTBEAT_INTERVAL_MS");
8181
const usageEventUrl = getEnvVar("USAGE_EVENT_URL");
8282
const triggerJWT = getEnvVar("TRIGGER_JWT");
83+
const heartbeatIntervalMs = getEnvVar("HEARTBEAT_INTERVAL_MS");
8384

8485
const prodUsageManager = new ProdUsageManager(new DevUsageManager(), {
85-
heartbeatIntervalMs: heartbeatIntervalMs ? parseInt(heartbeatIntervalMs, 10) : undefined,
86+
heartbeatIntervalMs: usageIntervalMs ? parseInt(usageIntervalMs, 10) : undefined,
8687
url: usageEventUrl,
8788
jwt: triggerJWT,
8889
});
@@ -383,7 +384,9 @@ runtime.setGlobalRuntimeManager(prodRuntimeManager);
383384

384385
process.title = "trigger-dev-worker";
385386

386-
for await (const _ of setInterval(15_000)) {
387+
const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
388+
389+
for await (const _ of setInterval(heartbeatInterval)) {
387390
if (_isRunning && _execution) {
388391
try {
389392
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id });

0 commit comments

Comments
 (0)