Skip to content

Commit 1144d7e

Browse files
committed
Deadlock detection WIP
1 parent 0dd60ea commit 1144d7e

File tree

10 files changed

+498
-7
lines changed

10 files changed

+498
-7
lines changed
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
2+
import { RunChainStateManager, TriggerTaskRequest } from "../types";
3+
import { RunChainState } from "@trigger.dev/core/v3/schemas";
4+
import { logger } from "~/services/logger.server";
5+
import { EngineServiceValidationError } from "./errors";
6+
7+
export class DefaultRunChainStateManager implements RunChainStateManager {
8+
private readonly prisma: PrismaClientOrTransaction;
9+
private readonly isReleaseConcurrencyEnabled: boolean;
10+
11+
constructor(prisma: PrismaClientOrTransaction, isReleaseConcurrencyEnabled: boolean) {
12+
this.prisma = prisma;
13+
this.isReleaseConcurrencyEnabled = isReleaseConcurrencyEnabled;
14+
}
15+
16+
async validateRunChain(
17+
request: TriggerTaskRequest,
18+
{
19+
parentRun,
20+
queueName,
21+
lockedQueueId,
22+
}: { parentRun?: TaskRun; queueName: string; lockedQueueId?: string }
23+
): Promise<RunChainState> {
24+
// if there is no parent run, the chain resets
25+
if (!parentRun) {
26+
return {};
27+
}
28+
29+
const parsedParentRunChainState = RunChainState.safeParse(parentRun.runChainState ?? {});
30+
31+
if (!parsedParentRunChainState.success) {
32+
logger.error("Invalid run chain state for parent run", {
33+
runId: parentRun.id,
34+
runState: parentRun.runChainState,
35+
error: parsedParentRunChainState.error,
36+
});
37+
38+
return {};
39+
}
40+
41+
const parentRunChainState = parsedParentRunChainState.data;
42+
43+
// Now we need to check if the parent run will hold concurrency, or if it will release it
44+
// if it will hold concurrency, we need to account for the parent run's concurrency
45+
// Then, along with the new run's concurrency,
46+
// we need to determine if the new run will ever be able to run, or are we in a deadlock situation
47+
// We need to check the concurrency limit against the concurrency limit of the environment, and the queue of the new run
48+
// We'll also need the queue of the parent run, to determine if the parent run will release and which queue to add to
49+
// Since the parent run is already running, it will definitely have a locked queue associated with it
50+
const { concurrency } = parentRunChainState;
51+
52+
const parentLockedQueueId = parentRun.lockedQueueId;
53+
54+
if (!parentLockedQueueId) {
55+
logger.error("Parent run has no locked queue, cannot determine run chain state", {
56+
runId: parentRun.id,
57+
runState: parentRun.runChainState,
58+
});
59+
60+
return {};
61+
}
62+
63+
const parentQueueState = await this.#getParentQueueState(
64+
parentRunChainState,
65+
parentLockedQueueId
66+
);
67+
68+
// We first need to check if the release concurrency system is enabled,
69+
// If it is not enabled, then we can assume the parent run will hold the concurrency,
70+
// for the env and the queue
71+
// If it is enabled, we never hold the concurrency for the env, just for the queue
72+
if (!this.isReleaseConcurrencyEnabled) {
73+
parentQueueState.holding += 1;
74+
75+
const newRunChainState = {
76+
...parentRunChainState,
77+
concurrency: {
78+
queues: [
79+
...(concurrency?.queues ?? []).filter((queue) => queue.id !== parentLockedQueueId),
80+
parentQueueState,
81+
],
82+
environment: (concurrency?.environment ?? 0) + 1,
83+
},
84+
};
85+
86+
return await this.#validateNewRunChainState(request, newRunChainState, {
87+
parentRun,
88+
queueName,
89+
lockedQueueId,
90+
});
91+
}
92+
93+
// Now we need to determine if the parent run will release the concurrency
94+
// if it does, we will add to the holding count for the queue
95+
const willReleaseConcurrency = await this.#determineIfParentRunWillReleaseConcurrency(
96+
request,
97+
parentLockedQueueId
98+
);
99+
100+
if (!willReleaseConcurrency) {
101+
parentQueueState.holding += 1;
102+
}
103+
104+
const newRunChainState = {
105+
...parentRunChainState,
106+
concurrency: {
107+
queues: [
108+
...(concurrency?.queues ?? []).filter((queue) => queue.id !== parentLockedQueueId),
109+
parentQueueState,
110+
],
111+
environment: concurrency?.environment ?? 0,
112+
},
113+
};
114+
115+
return await this.#validateNewRunChainState(request, newRunChainState, {
116+
parentRun,
117+
queueName,
118+
lockedQueueId,
119+
});
120+
}
121+
122+
// Performs the deadlock detection logic once the new run chain state is determined
123+
// Needs to account for everything held, plus the new run's concurrency
124+
async #validateNewRunChainState(
125+
request: TriggerTaskRequest,
126+
runChainState: RunChainState,
127+
{
128+
parentRun,
129+
queueName,
130+
lockedQueueId,
131+
}: { parentRun?: TaskRun; queueName: string; lockedQueueId?: string }
132+
) {
133+
logger.debug("Validating new run chain state", {
134+
runChainState,
135+
});
136+
137+
const environmentConcurrency = (runChainState.concurrency?.environment ?? 0) + 1;
138+
139+
if (environmentConcurrency > request.environment.maximumConcurrencyLimit) {
140+
const environmentDetails = `The environment has a concurrency limit of ${request.environment.maximumConcurrencyLimit}, and the chain would require ${environmentConcurrency}`;
141+
throw new EngineServiceValidationError(this.createDeadlockErrorMessage(environmentDetails));
142+
}
143+
144+
if (!lockedQueueId) {
145+
return runChainState;
146+
}
147+
148+
const queueConcurrencyState = runChainState.concurrency?.queues.find(
149+
(queue) => queue.id === lockedQueueId
150+
);
151+
152+
if (!queueConcurrencyState) {
153+
return runChainState;
154+
}
155+
156+
const queueConcurrency = queueConcurrencyState.holding + 1;
157+
158+
const queue = await this.prisma.taskQueue.findFirst({
159+
where: {
160+
id: lockedQueueId,
161+
},
162+
select: {
163+
concurrencyLimit: true,
164+
},
165+
});
166+
167+
if (!queue) {
168+
return runChainState;
169+
}
170+
171+
const queueConcurrencyLimit = queue.concurrencyLimit;
172+
173+
if (
174+
typeof queueConcurrencyLimit === "number" &&
175+
queueConcurrencyLimit !== 0 &&
176+
queueConcurrency > queueConcurrencyLimit
177+
) {
178+
const queueDetails = `The queue '${queueName}' has a concurrency limit of ${queueConcurrencyLimit}, and the chain would require ${queueConcurrency}`;
179+
throw new EngineServiceValidationError(this.createDeadlockErrorMessage(queueDetails));
180+
}
181+
182+
return runChainState;
183+
}
184+
185+
async #determineIfParentRunWillReleaseConcurrency(
186+
request: TriggerTaskRequest,
187+
parentLockedQueueId: string
188+
) {
189+
if (typeof request.body.options?.releaseConcurrency === "boolean") {
190+
return request.body.options.releaseConcurrency;
191+
}
192+
193+
const parentQueue = await this.prisma.taskQueue.findFirst({
194+
where: {
195+
id: parentLockedQueueId,
196+
},
197+
select: {
198+
releaseConcurrencyOnWaitpoint: true,
199+
concurrencyLimit: true,
200+
},
201+
});
202+
203+
logger.debug("Determining if parent run will release concurrency", {
204+
parentQueue,
205+
});
206+
207+
if (typeof parentQueue?.concurrencyLimit === "undefined") {
208+
return true;
209+
}
210+
211+
if (typeof parentQueue?.releaseConcurrencyOnWaitpoint === "boolean") {
212+
return parentQueue.releaseConcurrencyOnWaitpoint;
213+
}
214+
215+
return false;
216+
}
217+
218+
async #getParentQueueState(runChainState: RunChainState, parentLockedQueueId: string) {
219+
const newQueueState = runChainState.concurrency?.queues.find(
220+
(queue) => queue.id === parentLockedQueueId
221+
);
222+
223+
if (newQueueState) {
224+
return newQueueState;
225+
}
226+
227+
const parentQueue = await this.prisma.taskQueue.findFirst({
228+
where: {
229+
id: parentLockedQueueId,
230+
},
231+
});
232+
233+
if (!parentQueue) {
234+
throw new Error("Deadlock detection failed, parent queue not found");
235+
}
236+
237+
return {
238+
id: parentQueue.id,
239+
name: parentQueue.name,
240+
holding: 0,
241+
};
242+
}
243+
244+
private createDeadlockErrorMessage(details: string) {
245+
return `Deadlock detected: This task run cannot be triggered because it would create a concurrency deadlock.
246+
247+
A deadlock occurs when a chain of task runs (parent -> child) would collectively hold more concurrency than is available, making it impossible for the child run to ever execute.
248+
249+
Current situation:
250+
${details}
251+
252+
This usually happens when:
253+
1. A parent task triggers a child task using triggerAndWait()
254+
2. Both tasks use the same queue
255+
3. The parent task doesn't release its concurrency while waiting (releaseConcurrency: false)
256+
257+
To fix this, you can:
258+
1. Enable releaseConcurrencyOnWaitpoint on the queue
259+
2. Use a different queue for the child task
260+
3. Increase the concurrency limits
261+
4. Use trigger() instead of triggerAndWait() if you don't need to wait
262+
263+
Learn more about concurrency and deadlocks at https://trigger.dev/docs/queue-concurrency`;
264+
}
265+
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { IdempotencyKeyConcern } from "../concerns/idempotencyKeys.server";
2525
import type {
2626
PayloadProcessor,
2727
QueueManager,
28+
RunChainStateManager,
2829
RunNumberIncrementer,
2930
TraceEventConcern,
3031
TriggerTaskRequest,
@@ -41,6 +42,7 @@ export class RunEngineTriggerTaskService {
4142
private readonly engine: RunEngine;
4243
private readonly tracer: Tracer;
4344
private readonly traceEventConcern: TraceEventConcern;
45+
private readonly runChainStateManager: RunChainStateManager;
4446

4547
constructor(opts: {
4648
prisma: PrismaClientOrTransaction;
@@ -51,6 +53,7 @@ export class RunEngineTriggerTaskService {
5153
idempotencyKeyConcern: IdempotencyKeyConcern;
5254
runNumberIncrementer: RunNumberIncrementer;
5355
traceEventConcern: TraceEventConcern;
56+
runChainStateManager: RunChainStateManager;
5457
tracer: Tracer;
5558
}) {
5659
this.prisma = opts.prisma;
@@ -62,6 +65,7 @@ export class RunEngineTriggerTaskService {
6265
this.runNumberIncrementer = opts.runNumberIncrementer;
6366
this.tracer = opts.tracer;
6467
this.traceEventConcern = opts.traceEventConcern;
68+
this.runChainStateManager = opts.runChainStateManager;
6569
}
6670

6771
public async call({
@@ -176,8 +180,6 @@ export class RunEngineTriggerTaskService {
176180
}
177181
}
178182

179-
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
180-
181183
const metadataPacket = body.options?.metadata
182184
? handleMetadataPacket(
183185
body.options?.metadata,
@@ -217,6 +219,12 @@ export class RunEngineTriggerTaskService {
217219

218220
const depth = parentRun ? parentRun.depth + 1 : 0;
219221

222+
const runChainState = await this.runChainStateManager.validateRunChain(triggerRequest, {
223+
parentRun: parentRun ?? undefined,
224+
queueName,
225+
lockedQueueId,
226+
});
227+
220228
const masterQueue = await this.queueConcern.getMasterQueue(environment);
221229

222230
try {
@@ -229,6 +237,8 @@ export class RunEngineTriggerTaskService {
229237
event.setAttribute("runId", runFriendlyId);
230238
span.setAttribute("runId", runFriendlyId);
231239

240+
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
241+
232242
const taskRun = await this.engine.trigger(
233243
{
234244
number: num,
@@ -285,6 +295,7 @@ export class RunEngineTriggerTaskService {
285295
parentRun && body.options?.resumeParentOnCompletion
286296
? parentRun.queueTimestamp ?? undefined
287297
: undefined,
298+
runChainState,
288299
},
289300
this.prisma
290301
);

apps/webapp/app/runEngine/types.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import { BackgroundWorker, TaskRun } from "@trigger.dev/database";
22

3-
import { IOPacket, TaskRunError, TriggerTaskRequestBody } from "@trigger.dev/core/v3";
3+
import {
4+
IOPacket,
5+
RunChainState,
6+
TaskRunError,
7+
TriggerTaskRequestBody,
8+
} from "@trigger.dev/core/v3";
49
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
5-
import { EventBuilder } from "~/v3/eventRepository.server";
10+
import { z } from "zod";
611

712
export type TriggerTaskServiceOptions = {
813
idempotencyKey?: string;
@@ -139,3 +144,10 @@ export interface TraceEventConcern {
139144
callback: (span: TracedEventSpan) => Promise<T>
140145
): Promise<T>;
141146
}
147+
148+
export interface RunChainStateManager {
149+
validateRunChain(
150+
request: TriggerTaskRequest,
151+
options: { parentRun?: TaskRun; queueName: string; lockedQueueId?: string }
152+
): Promise<RunChainState>;
153+
}

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import { tracer } from "../tracer.server";
1313
import { WithRunEngine } from "./baseService.server";
1414
import { TriggerTaskServiceV1 } from "./triggerTaskV1.server";
1515
import { DefaultTraceEventsConcern } from "~/runEngine/concerns/traceEvents.server";
16+
import { DefaultRunChainStateManager } from "~/runEngine/concerns/runChainStates.server";
17+
import { env } from "~/env.server";
1618

1719
export type TriggerTaskServiceOptions = {
1820
idempotencyKey?: string;
@@ -92,10 +94,17 @@ export class TriggerTaskService extends WithRunEngine {
9294
queueConcern: new DefaultQueueManager(this._prisma, this._engine),
9395
validator: new DefaultTriggerTaskValidator(),
9496
payloadProcessor: new DefaultPayloadProcessor(),
95-
eventRepo: eventRepository,
96-
idempotencyKeyConcern: new IdempotencyKeyConcern(this._prisma, this._engine, eventRepository),
97+
idempotencyKeyConcern: new IdempotencyKeyConcern(
98+
this._prisma,
99+
this._engine,
100+
new DefaultTraceEventsConcern(eventRepository)
101+
),
97102
runNumberIncrementer: new DefaultRunNumberIncrementer(),
98103
traceEventConcern: new DefaultTraceEventsConcern(eventRepository),
104+
runChainStateManager: new DefaultRunChainStateManager(
105+
this._prisma,
106+
env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "1"
107+
),
99108
tracer: tracer,
100109
});
101110
return await service.call({

0 commit comments

Comments
 (0)