Skip to content

Commit c8b835a

Browse files
authored
Run Engine 2.0 trigger idempotency (#1613)
* Return isCached from the trigger API endpoint * Fix for the wrong type when blocking a run * Render the idempotent run in the inspector * Event repository for idempotency * Debug events off by default, added an admin toggle to show them * triggerAndWait idempotency span * Some improvements to the reference idempotency task * Removed the cached tracing from the SDK * Server-side creating cached span * Improved idempotency test task * Create cached task spans in a better way * Idempotency span support inc batch trigger * Simplified how the spans are done, using more of the existing code * Improved the idempotency test task * Added Waitpoint Batch type, add to TaskRunWaitpoint with order * Pass batch ids through to the run engine when triggering * Added batchIndex * Better batch support in the run engine * Added settings to batch trigger service, before major overhaul * Allow the longer run/batch ids in the filters * Changed how batching works, includes breaking changes in CLI * Removed batch idempotency because it gets put on the runs instead * Added `runs` to the batch.retrieve call/API * Set firstAttemptStartedAt when creating the first attempt * Do nothing when receiving a BATCH waitpoint * Some fixes in the new batch trigger service… mostly just passing missing optional params through * Tweaked the idempotency test task for more situations * Only block with a batch if it’s a batchTriggerAndWait… 🤦‍♂️ * Added another case to the idempotency test task: multiple of the same idempotencyKey in a single batch * Support for the same run multiple times in the same batch * Small tweaks * Make sure to complete batches, even if they’re not andWait ones * Export RunDuplicateIdempotencyKeyError from the run engine
1 parent b12b617 commit c8b835a

File tree

46 files changed

+1871
-791
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1871
-791
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
export function TaskCachedIcon({ className }: { className?: string }) {
2+
return (
3+
<svg
4+
className={className}
5+
width="16"
6+
height="16"
7+
viewBox="0 0 16 16"
8+
fill="none"
9+
xmlns="http://www.w3.org/2000/svg"
10+
>
11+
<g clipPath="url(#clip0_15584_76102)">
12+
<path
13+
d="M0.5 3.5L0.5 2.5C0.5 1.39543 1.39543 0.5 2.5 0.5H3.5"
14+
stroke="#3B82F6"
15+
strokeLinecap="square"
16+
strokeLinejoin="round"
17+
/>
18+
<path
19+
d="M15.5 12.5L15.5 13.5C15.5 14.6046 14.6046 15.5 13.5 15.5L12.5 15.5"
20+
stroke="#3B82F6"
21+
strokeLinecap="square"
22+
strokeLinejoin="round"
23+
/>
24+
<path
25+
d="M12.5 0.5L13.5 0.5C14.6046 0.5 15.5 1.39543 15.5 2.5L15.5 3.5"
26+
stroke="#3B82F6"
27+
strokeLinecap="square"
28+
strokeLinejoin="round"
29+
/>
30+
<path
31+
d="M3.5 15.5L2.5 15.5C1.39543 15.5 0.5 14.6046 0.5 13.5L0.5 12.5"
32+
stroke="#3B82F6"
33+
strokeLinecap="square"
34+
strokeLinejoin="round"
35+
/>
36+
<path d="M11.1799 4.19V5.598H8.8479V12H7.1649V5.598H4.8219V4.19H11.1799Z" fill="#3B82F6" />
37+
<line x1="6" y1="15.5" x2="10" y2="15.5" stroke="#3B82F6" />
38+
<line x1="6" y1="0.5" x2="10" y2="0.5" stroke="#3B82F6" />
39+
<line x1="15.5" y1="6" x2="15.5" y2="10" stroke="#3B82F6" />
40+
<line x1="0.5" y1="6" x2="0.5" y2="10" stroke="#3B82F6" />
41+
</g>
42+
<defs>
43+
<clipPath id="clip0_15584_76102">
44+
<rect width="16" height="16" fill="white" />
45+
</clipPath>
46+
</defs>
47+
</svg>
48+
);
49+
}

apps/webapp/app/components/runs/v3/BatchFilters.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,8 +359,8 @@ function BatchIdDropdown({
359359
if (batchId) {
360360
if (!batchId.startsWith("batch_")) {
361361
error = "Batch IDs start with 'batch_'";
362-
} else if (batchId.length !== 27) {
363-
error = "Batch IDs are 27 characters long";
362+
} else if (batchId.length !== 27 && batchId.length !== 31) {
363+
error = "Batch IDs are 27/32 characters long";
364364
}
365365
}
366366

apps/webapp/app/components/runs/v3/RunFilters.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,8 +763,8 @@ function RunIdDropdown({
763763
if (runId) {
764764
if (!runId.startsWith("run_")) {
765765
error = "Run IDs start with 'run_'";
766-
} else if (runId.length !== 25) {
767-
error = "Run IDs are 25 characters long";
766+
} else if (runId.length !== 25 && runId.length !== 29) {
767+
error = "Run IDs are 25/30 characters long";
768768
}
769769
}
770770

apps/webapp/app/components/runs/v3/RunIcon.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
} from "@heroicons/react/20/solid";
88
import { AttemptIcon } from "~/assets/icons/AttemptIcon";
99
import { TaskIcon } from "~/assets/icons/TaskIcon";
10+
import { TaskCachedIcon } from "~/assets/icons/TaskCachedIcon";
1011
import { NamedIcon } from "~/components/primitives/NamedIcon";
1112
import { cn } from "~/utils/cn";
1213

@@ -41,6 +42,8 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
4142
switch (name) {
4243
case "task":
4344
return <TaskIcon className={cn(className, "text-blue-500")} />;
45+
case "task-cached":
46+
return <TaskCachedIcon className={cn(className, "text-blue-500")} />;
4447
case "scheduled":
4548
return <ClockIcon className={cn(className, "text-sun-500")} />;
4649
case "attempt":

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
MachinePresetName,
33
parsePacket,
44
prettyPrintPacket,
5+
SemanticInternalAttributes,
56
TaskRunError,
67
} from "@trigger.dev/core/v3";
78
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
@@ -39,7 +40,22 @@ export class SpanPresenter extends BasePresenter {
3940
throw new Error("Project not found");
4041
}
4142

42-
const run = await this.getRun(spanId);
43+
const parentRun = await this._prisma.taskRun.findFirst({
44+
select: {
45+
traceId: true,
46+
},
47+
where: {
48+
friendlyId: runFriendlyId,
49+
},
50+
});
51+
52+
if (!parentRun) {
53+
return;
54+
}
55+
56+
const { traceId } = parentRun;
57+
58+
const run = await this.getRun(traceId, spanId);
4359
if (run) {
4460
return {
4561
type: "run" as const,
@@ -48,7 +64,7 @@ export class SpanPresenter extends BasePresenter {
4864
}
4965

5066
//get the run
51-
const span = await this.getSpan(runFriendlyId, spanId);
67+
const span = await this.getSpan(traceId, spanId);
5268

5369
if (!span) {
5470
throw new Error("Span not found");
@@ -60,10 +76,17 @@ export class SpanPresenter extends BasePresenter {
6076
};
6177
}
6278

63-
async getRun(spanId: string) {
79+
async getRun(traceId: string, spanId: string) {
80+
const span = await eventRepository.getSpan(spanId, traceId);
81+
82+
if (!span) {
83+
return;
84+
}
85+
6486
const run = await this._replica.taskRun.findFirst({
6587
select: {
6688
id: true,
89+
spanId: true,
6790
traceId: true,
6891
//metadata
6992
number: true,
@@ -92,13 +115,15 @@ export class SpanPresenter extends BasePresenter {
92115
//status + duration
93116
status: true,
94117
startedAt: true,
118+
firstAttemptStartedAt: true,
95119
createdAt: true,
96120
updatedAt: true,
97121
queuedAt: true,
98122
completedAt: true,
99123
logsDeletedAt: true,
100124
//idempotency
101125
idempotencyKey: true,
126+
idempotencyKeyExpiresAt: true,
102127
//delayed
103128
delayUntil: true,
104129
//ttl
@@ -161,9 +186,13 @@ export class SpanPresenter extends BasePresenter {
161186
},
162187
},
163188
},
164-
where: {
165-
spanId,
166-
},
189+
where: span.originalRun
190+
? {
191+
friendlyId: span.originalRun,
192+
}
193+
: {
194+
spanId,
195+
},
167196
});
168197

169198
if (!run) {
@@ -238,8 +267,6 @@ export class SpanPresenter extends BasePresenter {
238267
}
239268
}
240269

241-
const span = await eventRepository.getSpan(spanId, run.traceId);
242-
243270
const metadata = run.metadata
244271
? await prettyPrintPacket(run.metadata, run.metadataType, {
245272
filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"],
@@ -296,6 +323,7 @@ export class SpanPresenter extends BasePresenter {
296323
status: run.status,
297324
createdAt: run.createdAt,
298325
startedAt: run.startedAt,
326+
firstAttemptStartedAt: run.firstAttemptStartedAt,
299327
updatedAt: run.updatedAt,
300328
delayUntil: run.delayUntil,
301329
expiredAt: run.expiredAt,
@@ -307,6 +335,8 @@ export class SpanPresenter extends BasePresenter {
307335
sdkVersion: run.lockedToVersion?.sdkVersion,
308336
isTest: run.isTest,
309337
environmentId: run.runtimeEnvironment.id,
338+
idempotencyKey: run.idempotencyKey,
339+
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
310340
schedule: run.schedule
311341
? {
312342
friendlyId: run.schedule.friendlyId,
@@ -349,24 +379,13 @@ export class SpanPresenter extends BasePresenter {
349379
engine: run.engine,
350380
masterQueue: run.masterQueue,
351381
secondaryMasterQueue: run.secondaryMasterQueue,
382+
spanId: run.spanId,
383+
isCached: !!span.originalRun,
352384
};
353385
}
354386

355-
async getSpan(runFriendlyId: string, spanId: string) {
356-
const run = await this._prisma.taskRun.findFirst({
357-
select: {
358-
traceId: true,
359-
},
360-
where: {
361-
friendlyId: runFriendlyId,
362-
},
363-
});
364-
365-
if (!run) {
366-
return;
367-
}
368-
369-
const span = await eventRepository.getSpan(spanId, run.traceId);
387+
async getSpan(traceId: string, spanId: string) {
388+
const span = await eventRepository.getSpan(spanId, traceId);
370389

371390
if (!span) {
372391
return;

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ const { action, loader } = createActionApiRoute(
108108
return json(
109109
{
110110
id: run.friendlyId,
111+
isCached: run.isCached,
111112
},
112113
{
113114
headers: $responseHeaders,

apps/webapp/app/routes/api.v1.tasks.batch.ts

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@ import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth
99
import { logger } from "~/services/logger.server";
1010
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1111
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
12-
import { determineEngineVersion } from "~/v3/engineVersion.server";
1312
import { ServiceValidationError } from "~/v3/services/baseService.server";
1413
import {
1514
BatchProcessingStrategy,
1615
BatchTriggerV2Service,
1716
} from "~/v3/services/batchTriggerV2.server";
18-
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
1917
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
2018
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
2119

@@ -88,15 +86,7 @@ const { action, loader } = createActionApiRoute(
8886
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
8987
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);
9088

91-
const version = await determineEngineVersion({
92-
environment: authentication.environment,
93-
version: engineVersion ?? undefined,
94-
});
95-
96-
const service =
97-
version === "V1"
98-
? new BatchTriggerV2Service(batchProcessingStrategy ?? undefined)
99-
: new BatchTriggerV3Service(batchProcessingStrategy ?? undefined);
89+
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);
10090

10191
try {
10292
const batch = await service.call(authentication.environment, body, {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { $replica } from "~/db.server";
4+
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5+
6+
const ParamsSchema = z.object({
7+
batchId: z.string(),
8+
});
9+
10+
export const loader = createLoaderApiRoute(
11+
{
12+
params: ParamsSchema,
13+
allowJWT: true,
14+
corsStrategy: "all",
15+
findResource: (params, auth) => {
16+
return $replica.batchTaskRun.findFirst({
17+
where: {
18+
friendlyId: params.batchId,
19+
runtimeEnvironmentId: auth.environment.id,
20+
},
21+
});
22+
},
23+
authorization: {
24+
action: "read",
25+
resource: (batch) => ({ batch: batch.friendlyId }),
26+
superScopes: ["read:runs", "read:all", "admin"],
27+
},
28+
},
29+
async ({ resource: batch }) => {
30+
return json({
31+
id: batch.friendlyId,
32+
status: batch.status,
33+
idempotencyKey: batch.idempotencyKey ?? undefined,
34+
createdAt: batch.createdAt,
35+
updatedAt: batch.updatedAt,
36+
runCount: batch.runCount,
37+
runs: batch.runIds,
38+
});
39+
}
40+
);

0 commit comments

Comments
 (0)