Skip to content

Commit b4c61af

Browse files
committed
add snapshots since methods and route
1 parent cc0feb0 commit b4c61af

File tree

9 files changed

+345
-57
lines changed

9 files changed

+345
-57
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { json, TypedResponse } from "@remix-run/server-runtime";
2+
import { WorkerApiRunSnapshotsSinceResponseBody } from "@trigger.dev/core/v3/workers";
3+
import { z } from "zod";
4+
import { createLoaderWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5+
6+
export const loader = createLoaderWorkerApiRoute(
7+
{
8+
params: z.object({
9+
runFriendlyId: z.string(),
10+
snapshotId: z.string(),
11+
}),
12+
},
13+
async ({
14+
authenticatedWorker,
15+
params,
16+
}): Promise<TypedResponse<WorkerApiRunSnapshotsSinceResponseBody>> => {
17+
const { runFriendlyId, snapshotId } = params;
18+
19+
const executions = await authenticatedWorker.getSnapshotsSince({
20+
runFriendlyId,
21+
snapshotId,
22+
});
23+
24+
if (!executions) {
25+
throw new Error("Failed to retrieve snapshots since given snapshot");
26+
}
27+
28+
return json({ executions });
29+
}
30+
);

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,19 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
759759
});
760760
}
761761

762+
async getSnapshotsSince({
763+
runFriendlyId,
764+
snapshotId,
765+
}: {
766+
runFriendlyId: string;
767+
snapshotId: string;
768+
}) {
769+
return await this._engine.getSnapshotsSince({
770+
runId: fromFriendlyId(runFriendlyId),
771+
snapshotId: fromFriendlyId(snapshotId),
772+
});
773+
}
774+
762775
toJSON(): WorkerGroupTokenAuthenticationResponse {
763776
if (this.type === WorkerInstanceGroupType.MANAGED) {
764777
return {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 24 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ import { EnqueueSystem } from "./systems/enqueueSystem.js";
4343
import {
4444
ExecutionSnapshotSystem,
4545
getLatestExecutionSnapshot,
46+
getExecutionSnapshotsSince,
47+
executionDataFromSnapshot,
4648
} from "./systems/executionSnapshotSystem.js";
4749
import { PendingVersionSystem } from "./systems/pendingVersionSystem.js";
4850
import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js";
@@ -1100,43 +1102,31 @@ export class RunEngine {
11001102
const prisma = tx ?? this.prisma;
11011103
try {
11021104
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
1105+
return executionDataFromSnapshot(snapshot);
1106+
} catch (e) {
1107+
this.logger.error("Failed to getRunExecutionData", {
1108+
message: e instanceof Error ? e.message : e,
1109+
});
1110+
return null;
1111+
}
1112+
}
11031113

1104-
const executionData: RunExecutionData = {
1105-
version: "1" as const,
1106-
snapshot: {
1107-
id: snapshot.id,
1108-
friendlyId: snapshot.friendlyId,
1109-
executionStatus: snapshot.executionStatus,
1110-
description: snapshot.description,
1111-
},
1112-
run: {
1113-
id: snapshot.runId,
1114-
friendlyId: snapshot.runFriendlyId,
1115-
status: snapshot.runStatus,
1116-
attemptNumber: snapshot.attemptNumber ?? undefined,
1117-
},
1118-
batch: snapshot.batchId
1119-
? {
1120-
id: snapshot.batchId,
1121-
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
1122-
}
1123-
: undefined,
1124-
checkpoint: snapshot.checkpoint
1125-
? {
1126-
id: snapshot.checkpoint.id,
1127-
friendlyId: snapshot.checkpoint.friendlyId,
1128-
type: snapshot.checkpoint.type,
1129-
location: snapshot.checkpoint.location,
1130-
imageRef: snapshot.checkpoint.imageRef,
1131-
reason: snapshot.checkpoint.reason ?? undefined,
1132-
}
1133-
: undefined,
1134-
completedWaitpoints: snapshot.completedWaitpoints,
1135-
};
1114+
async getSnapshotsSince({
1115+
runId,
1116+
snapshotId,
1117+
tx,
1118+
}: {
1119+
runId: string;
1120+
snapshotId: string;
1121+
tx?: PrismaClientOrTransaction;
1122+
}): Promise<RunExecutionData[] | null> {
1123+
const prisma = tx ?? this.prisma;
11361124

1137-
return executionData;
1125+
try {
1126+
const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId);
1127+
return snapshots.map(executionDataFromSnapshot);
11381128
} catch (e) {
1139-
this.logger.error("Failed to getRunExecutionData", {
1129+
this.logger.error("Failed to getSnapshotsSince", {
11401130
message: e instanceof Error ? e.message : e,
11411131
});
11421132
return null;
@@ -1158,9 +1148,6 @@ export class RunEngine {
11581148
}
11591149
}
11601150

1161-
//#endregion
1162-
1163-
//#region Heartbeat
11641151
async #handleStalledSnapshot({
11651152
runId,
11661153
snapshotId,

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

Lines changed: 98 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { CompletedWaitpoint, ExecutionResult } from "@trigger.dev/core/v3";
1+
import { CompletedWaitpoint, ExecutionResult, RunExecutionData } from "@trigger.dev/core/v3";
22
import { BatchId, RunId, SnapshotId } from "@trigger.dev/core/v3/isomorphic";
33
import {
44
Prisma,
@@ -17,31 +17,23 @@ export type ExecutionSnapshotSystemOptions = {
1717
heartbeatTimeouts: HeartbeatTimeouts;
1818
};
1919

20-
export interface LatestExecutionSnapshot extends TaskRunExecutionSnapshot {
20+
export interface EnhancedExecutionSnapshot extends TaskRunExecutionSnapshot {
2121
friendlyId: string;
2222
runFriendlyId: string;
2323
checkpoint: TaskRunCheckpoint | null;
2424
completedWaitpoints: CompletedWaitpoint[];
2525
}
2626

27-
/* Gets the most recent valid snapshot for a run */
28-
export async function getLatestExecutionSnapshot(
29-
prisma: PrismaClientOrTransaction,
30-
runId: string
31-
): Promise<LatestExecutionSnapshot> {
32-
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
33-
where: { runId, isValid: true },
34-
include: {
35-
completedWaitpoints: true,
36-
checkpoint: true,
37-
},
38-
orderBy: { createdAt: "desc" },
39-
});
40-
41-
if (!snapshot) {
42-
throw new Error(`No execution snapshot found for TaskRun ${runId}`);
43-
}
27+
type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGetPayload<{
28+
include: {
29+
checkpoint: true;
30+
completedWaitpoints: true;
31+
};
32+
}>;
4433

34+
function enhanceExecutionSnapshot(
35+
snapshot: ExecutionSnapshotWithCheckAndWaitpoints
36+
): EnhancedExecutionSnapshot {
4537
return {
4638
...snapshot,
4739
friendlyId: SnapshotId.toFriendlyId(snapshot.id),
@@ -99,6 +91,27 @@ export async function getLatestExecutionSnapshot(
9991
};
10092
}
10193

94+
/* Gets the most recent valid snapshot for a run */
95+
export async function getLatestExecutionSnapshot(
96+
prisma: PrismaClientOrTransaction,
97+
runId: string
98+
): Promise<EnhancedExecutionSnapshot> {
99+
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
100+
where: { runId, isValid: true },
101+
include: {
102+
completedWaitpoints: true,
103+
checkpoint: true,
104+
},
105+
orderBy: { createdAt: "desc" },
106+
});
107+
108+
if (!snapshot) {
109+
throw new Error(`No execution snapshot found for TaskRun ${runId}`);
110+
}
111+
112+
return enhanceExecutionSnapshot(snapshot);
113+
}
114+
102115
export async function getExecutionSnapshotCompletedWaitpoints(
103116
prisma: PrismaClientOrTransaction,
104117
snapshotId: string
@@ -141,6 +154,72 @@ export function executionResultFromSnapshot(snapshot: TaskRunExecutionSnapshot):
141154
};
142155
}
143156

157+
export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot): RunExecutionData {
158+
return {
159+
version: "1" as const,
160+
snapshot: {
161+
id: snapshot.id,
162+
friendlyId: snapshot.friendlyId,
163+
executionStatus: snapshot.executionStatus,
164+
description: snapshot.description,
165+
},
166+
run: {
167+
id: snapshot.runId,
168+
friendlyId: snapshot.runFriendlyId,
169+
status: snapshot.runStatus,
170+
attemptNumber: snapshot.attemptNumber ?? undefined,
171+
},
172+
batch: snapshot.batchId
173+
? {
174+
id: snapshot.batchId,
175+
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
176+
}
177+
: undefined,
178+
checkpoint: snapshot.checkpoint
179+
? {
180+
id: snapshot.checkpoint.id,
181+
friendlyId: snapshot.checkpoint.friendlyId,
182+
type: snapshot.checkpoint.type,
183+
location: snapshot.checkpoint.location,
184+
imageRef: snapshot.checkpoint.imageRef,
185+
reason: snapshot.checkpoint.reason ?? undefined,
186+
}
187+
: undefined,
188+
completedWaitpoints: snapshot.completedWaitpoints,
189+
};
190+
}
191+
192+
export async function getExecutionSnapshotsSince(
193+
prisma: PrismaClientOrTransaction,
194+
runId: string,
195+
sinceSnapshotId: string
196+
): Promise<EnhancedExecutionSnapshot[]> {
197+
// Find the createdAt of the sinceSnapshotId
198+
const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findUnique({
199+
where: { id: sinceSnapshotId },
200+
select: { createdAt: true },
201+
});
202+
203+
if (!sinceSnapshot) {
204+
throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`);
205+
}
206+
207+
const snapshots = await prisma.taskRunExecutionSnapshot.findMany({
208+
where: {
209+
runId,
210+
isValid: true,
211+
createdAt: { gt: sinceSnapshot.createdAt },
212+
},
213+
include: {
214+
completedWaitpoints: true,
215+
checkpoint: true,
216+
},
217+
orderBy: { createdAt: "asc" },
218+
});
219+
220+
return snapshots.map(enhanceExecutionSnapshot);
221+
}
222+
144223
export class ExecutionSnapshotSystem {
145224
private readonly $: SystemResources;
146225
private readonly heartbeatTimeouts: HeartbeatTimeouts;

0 commit comments

Comments
 (0)