Skip to content

[v4] Don’t allowed canceled delayed runs to be put into the queue #1981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ export class RunEngine {
executionSnapshotSystem: this.executionSnapshotSystem,
batchSystem: this.batchSystem,
waitpointSystem: this.waitpointSystem,
delayedRunSystem: this.delayedRunSystem,
machines: this.options.machines,
});

Expand Down
5 changes: 5 additions & 0 deletions internal-packages/run-engine/src/engine/statuses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ export function isFinishedOrPendingFinished(status: TaskRunExecutionStatus): boo
return finishedStatuses.includes(status);
}

export function isInitialState(status: TaskRunExecutionStatus): boolean {
const startedStatuses: TaskRunExecutionStatus[] = ["RUN_CREATED"];
return startedStatuses.includes(status);
}

export function isFinalRunStatus(status: TaskRunStatus): boolean {
const finalStatuses: TaskRunStatus[] = [
"CANCELED",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,8 @@ export class DelayedRunSystem {
availableAt: delayUntil,
});
}

async preventDelayedRunFromBeingEnqueued({ runId }: { runId: string }) {
await this.$.worker.ack(`enqueueDelayedRun:${runId}`);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { runStatusFromError, ServiceValidationError } from "../errors.js";
import { sendNotificationToWorker } from "../eventBus.js";
import { getMachinePreset } from "../machinePresets.js";
import { retryOutcomeFromCompletion } from "../retrying.js";
import { isExecuting } from "../statuses.js";
import { isExecuting, isInitialState } from "../statuses.js";
import { RunEngineOptions } from "../types.js";
import { BatchSystem } from "./batchSystem.js";
import {
Expand All @@ -32,12 +32,14 @@ import {
} from "./executionSnapshotSystem.js";
import { SystemResources } from "./systems.js";
import { WaitpointSystem } from "./waitpointSystem.js";
import { DelayedRunSystem } from "./delayedRunSystem.js";

export type RunAttemptSystemOptions = {
resources: SystemResources;
executionSnapshotSystem: ExecutionSnapshotSystem;
batchSystem: BatchSystem;
waitpointSystem: WaitpointSystem;
delayedRunSystem: DelayedRunSystem;
retryWarmStartThresholdMs?: number;
machines: RunEngineOptions["machines"];
};
Expand All @@ -47,12 +49,14 @@ export class RunAttemptSystem {
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
private readonly batchSystem: BatchSystem;
private readonly waitpointSystem: WaitpointSystem;
private readonly delayedRunSystem: DelayedRunSystem;

constructor(private readonly options: RunAttemptSystemOptions) {
this.$ = options.resources;
this.executionSnapshotSystem = options.executionSnapshotSystem;
this.batchSystem = options.batchSystem;
this.waitpointSystem = options.waitpointSystem;
this.delayedRunSystem = options.delayedRunSystem;
}

public async startRunAttempt({
Expand Down Expand Up @@ -968,6 +972,7 @@ export class RunAttemptSystem {
completedAt: true,
taskEventStore: true,
parentTaskRunId: true,
delayUntil: true,
runtimeEnvironment: {
select: {
organizationId: true,
Expand All @@ -986,6 +991,11 @@ export class RunAttemptSystem {
},
});

//if the run is delayed and hasn't started yet, we need to prevent it being added to the queue in future
if (isInitialState(latestSnapshot.executionStatus) && run.delayUntil) {
await this.delayedRunSystem.preventDelayedRunFromBeingEnqueued({ runId });
}

//remove it from the queue and release concurrency
await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);

Expand Down
111 changes: 111 additions & 0 deletions internal-packages/run-engine/src/engine/tests/delays.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,115 @@ describe("RunEngine delays", () => {
engine.quit();
}
});

containerTest("Cancelling a delayed run", async ({ prisma, redisOptions }) => {
//create environment
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
queue: {
redis: redisOptions,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0001,
},
tracer: trace.getTracer("test", "0.0.0"),
});

try {
const taskIdentifier = "test-task";

//create background worker
const backgroundWorker = await setupBackgroundWorker(
engine,
authenticatedEnvironment,
taskIdentifier
);

//trigger the run with a 1 second delay
const run = await engine.trigger(
{
number: 1,
friendlyId: "run_1234",
environment: authenticatedEnvironment,
taskIdentifier,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12345",
spanId: "s12345",
masterQueue: "main",
queue: "task/test-task",
isTest: false,
tags: [],
delayUntil: new Date(Date.now() + 1000),
},
prisma
);

//verify it's created but not queued
const executionData = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData);
expect(executionData.snapshot.executionStatus).toBe("RUN_CREATED");
expect(run.status).toBe("DELAYED");

//cancel the run
await engine.cancelRun({
runId: run.id,
reason: "Cancelled by test",
});

//verify it's cancelled
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData2);
expect(executionData2.snapshot.executionStatus).toBe("FINISHED");
expect(executionData2.run.status).toBe("CANCELED");

//wait past the original delay time
await setTimeout(1500);

//verify the run is still cancelled
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData3);
expect(executionData3.snapshot.executionStatus).toBe("FINISHED");
expect(executionData3.run.status).toBe("CANCELED");

//attempt to dequeue - should get nothing
const dequeued = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: run.masterQueue,
maxRunCount: 10,
});

expect(dequeued.length).toBe(0);

//verify final state is still cancelled
const executionData4 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData4);
expect(executionData4.snapshot.executionStatus).toBe("FINISHED");
expect(executionData4.run.status).toBe("CANCELED");
} finally {
engine.quit();
}
});
});