Skip to content

Fix long retry delays #2020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ jobs:
secrets: inherit

publish-webapp:
needs: [typecheck]
needs: [typecheck, units]
uses: ./.github/workflows/publish-webapp.yml
secrets: inherit
with:
image_tag: ${{ inputs.image_tag }}

publish-worker:
needs: [typecheck]
needs: [typecheck, units]
uses: ./.github/workflows/publish-worker.yml
secrets: inherit
with:
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),

RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
Expand Down Expand Up @@ -717,7 +718,7 @@ const EnvironmentSchema = z.object({

SLACK_BOT_TOKEN: z.string().optional(),
SLACK_SIGNUP_REASON_CHANNEL_ID: z.string().optional(),

// kapa.ai
KAPA_AI_WEBSITE_ID: z.string().optional(),
});
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ function createRunEngine() {
...(env.RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
},
retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS,
});

return engine;
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ export class RunEngine {
waitpointSystem: this.waitpointSystem,
delayedRunSystem: this.delayedRunSystem,
machines: this.options.machines,
retryWarmStartThresholdMs: this.options.retryWarmStartThresholdMs,
});

this.dequeueSystem = new DequeueSystem({
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ export class RunQueue {
}
}

await this.#callNackMessage({ message });
await this.#callNackMessage({ message, retryAt });

return true;
},
Expand Down
61 changes: 61 additions & 0 deletions internal-packages/run-engine/src/run-queue/tests/nack.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,65 @@ describe("RunQueue.nackMessage", () => {
}
}
);

redisTest(
"nacking a message with retryAt sets the correct requeue time",
async ({ redisContainer }) => {
const queue = new RunQueue({
...testOptions,
queueSelectionStrategy: new FairQueueSelectionStrategy({
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
keys: testOptions.keys,
}),
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
});

try {
const envMasterQueue = `env:${authenticatedEnvDev.id}`;

// Enqueue message
await queue.enqueueMessage({
env: authenticatedEnvDev,
message: messageDev,
masterQueues: ["main", envMasterQueue],
});

// Dequeue message
const dequeued = await queue.dequeueMessageFromMasterQueue(
"test_12345",
envMasterQueue,
10
);
expect(dequeued.length).toBe(1);

// Set retryAt to 5 seconds in the future
const retryAt = Date.now() + 5000;
await queue.nackMessage({
orgId: messageDev.orgId,
messageId: messageDev.runId,
retryAt,
});

// Check the score of the message in the queue
const queueKey = queue.keys.queueKey(authenticatedEnvDev, messageDev.queue);
const score = await queue.oldestMessageInQueue(authenticatedEnvDev, messageDev.queue);
expect(typeof score).toBe("number");
if (typeof score !== "number") {
throw new Error("Expected score to be a number, but got undefined");
}
// Should be within 100ms of retryAt
expect(Math.abs(score - retryAt)).toBeLessThanOrEqual(100);
} finally {
await queue.quit();
}
}
);
});
12 changes: 9 additions & 3 deletions internal-packages/testcontainers/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ const postgresContainer = async (
try {
await use(container);
} finally {
await container.stop();
// WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately.
// If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second
await container.stop({ timeout: 10 });
}
};

Expand Down Expand Up @@ -92,7 +94,9 @@ const redisContainer = async (
try {
await use(container);
} finally {
await container.stop();
// WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately.
// If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second
await container.stop({ timeout: 10 });
}
};

Expand Down Expand Up @@ -142,7 +146,9 @@ const electricOrigin = async (
try {
await use(origin);
} finally {
await container.stop();
// WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately.
// If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second
await container.stop({ timeout: 10 });
}
};

Expand Down