Skip to content

Commit 1b35d6b

Browse files
committed
Merge remote-tracking branch 'origin/main' into fix/v4-checkpoints
2 parents b8f9dc1 + 9daa397 commit 1b35d6b

File tree

12 files changed

+583
-87
lines changed

12 files changed

+583
-87
lines changed

apps/webapp/app/components/navigation/SideMenu.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ export function SideMenu({
139139
>
140140
<div
141141
className={cn(
142-
"flex items-center justify-between border-b px-1 py-1 transition duration-300",
142+
"flex items-center justify-between overflow-hidden border-b px-1 py-1 transition duration-300",
143143
showHeaderDivider ? "border-grid-bright" : "border-transparent"
144144
)}
145145
>
@@ -304,7 +304,7 @@ function ProjectSelector({
304304
isOpen={isOrgMenuOpen}
305305
overflowHidden
306306
className={cn(
307-
"h-8 w-full justify-between overflow-hidden py-1 pl-1.5",
307+
"h-8 w-full justify-between py-1 pl-1.5",
308308
user.isImpersonating && "border border-dashed border-amber-400"
309309
)}
310310
>

apps/webapp/app/components/primitives/Popover.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ function PopoverArrowTrigger({
175175
>
176176
{children}
177177
</Paragraph>
178-
<DropdownIcon className="size-4 min-w-[0.75rem] text-text-dimmed transition group-hover:text-text-bright" />
178+
<DropdownIcon className="size-4 min-w-4 text-text-dimmed transition group-hover:text-text-bright" />
179179
</PopoverTrigger>
180180
);
181181
}

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ const commonRunSelect = {
5252
friendlyId: true,
5353
},
5454
},
55+
runTags: true,
5556
} satisfies Prisma.TaskRunSelect;
5657

5758
type CommonRelatedRun = Prisma.Result<
@@ -69,26 +70,29 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
6970
friendlyId,
7071
runtimeEnvironmentId: env.id,
7172
},
72-
include: {
73-
attempts: true,
74-
lockedToVersion: true,
75-
tags: true,
76-
batch: {
73+
select: {
74+
...commonRunSelect,
75+
payload: true,
76+
payloadType: true,
77+
output: true,
78+
outputType: true,
79+
error: true,
80+
attempts: {
7781
select: {
7882
id: true,
79-
friendlyId: true,
8083
},
8184
},
85+
attemptNumber: true,
86+
engine: true,
87+
taskEventStore: true,
8288
parentTaskRun: {
8389
select: commonRunSelect,
8490
},
8591
rootTaskRun: {
8692
select: commonRunSelect,
8793
},
8894
childRuns: {
89-
select: {
90-
...commonRunSelect,
91-
},
95+
select: commonRunSelect,
9296
},
9397
},
9498
});
@@ -124,29 +128,23 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
124128
}
125129

126130
if (taskRun.status === "COMPLETED_SUCCESSFULLY") {
127-
const completedAttempt = taskRun.attempts.find(
128-
(a) => a.status === "COMPLETED" && typeof a.output !== null
129-
);
130-
131-
if (completedAttempt && completedAttempt.output) {
132-
const outputPacket = await conditionallyImportPacket({
133-
data: completedAttempt.output,
134-
dataType: completedAttempt.outputType,
135-
});
131+
const outputPacket = await conditionallyImportPacket({
132+
data: taskRun.output ?? undefined,
133+
dataType: taskRun.outputType,
134+
});
136135

137-
if (
138-
outputPacket.dataType === "application/store" &&
139-
typeof outputPacket.data === "string"
140-
) {
141-
$outputPresignedUrl = await generatePresignedUrl(
142-
env.project.externalRef,
143-
env.slug,
144-
outputPacket.data,
145-
"GET"
146-
);
147-
} else {
148-
$output = await parsePacket(outputPacket);
149-
}
136+
if (
137+
outputPacket.dataType === "application/store" &&
138+
typeof outputPacket.data === "string"
139+
) {
140+
$outputPresignedUrl = await generatePresignedUrl(
141+
env.project.externalRef,
142+
env.slug,
143+
outputPacket.data,
144+
"GET"
145+
);
146+
} else {
147+
$output = await parsePacket(outputPacket);
150148
}
151149
}
152150

@@ -159,7 +157,8 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
159157
error: ApiRetrieveRunPresenter.apiErrorFromError(taskRun.error),
160158
schedule: await resolveSchedule(taskRun),
161159
// We're removing attempts from the API
162-
attemptCount: taskRun.attempts.length,
160+
attemptCount:
161+
taskRun.engine === "V1" ? taskRun.attempts.length : taskRun.attemptNumber ?? 0,
163162
attempts: [],
164163
relatedRuns: {
165164
root: taskRun.rootTaskRun

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
282282
runtimeEnvironmentId: environment.id,
283283
version: body.options?.lockToVersion,
284284
},
285+
select: {
286+
id: true,
287+
version: true,
288+
sdkVersion: true,
289+
cliVersion: true,
290+
},
285291
})
286292
: undefined;
287293

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -363,43 +363,14 @@ async function createWorkerQueue(
363363
)
364364
: queue.concurrencyLimit;
365365

366-
let taskQueue = await prisma.taskQueue.findFirst({
367-
where: {
368-
runtimeEnvironmentId: worker.runtimeEnvironmentId,
369-
name: queueName,
370-
},
371-
});
372-
373-
if (!taskQueue) {
374-
taskQueue = await prisma.taskQueue.create({
375-
data: {
376-
friendlyId: generateFriendlyId("queue"),
377-
version: "V2",
378-
name: queueName,
379-
orderableName,
380-
concurrencyLimit,
381-
runtimeEnvironmentId: worker.runtimeEnvironmentId,
382-
projectId: worker.projectId,
383-
type: queueType,
384-
workers: {
385-
connect: {
386-
id: worker.id,
387-
},
388-
},
389-
},
390-
});
391-
} else {
392-
await prisma.taskQueue.update({
393-
where: {
394-
id: taskQueue.id,
395-
},
396-
data: {
397-
workers: { connect: { id: worker.id } },
398-
version: "V2",
399-
orderableName,
400-
},
401-
});
402-
}
366+
const taskQueue = await upsertWorkerQueueRecord(
367+
queueName,
368+
concurrencyLimit ?? undefined,
369+
orderableName,
370+
queueType,
371+
worker,
372+
prisma
373+
);
403374

404375
if (typeof concurrencyLimit === "number") {
405376
logger.debug("createWorkerQueue: updating concurrency limit", {
@@ -426,6 +397,75 @@ async function createWorkerQueue(
426397
return taskQueue;
427398
}
428399

400+
async function upsertWorkerQueueRecord(
401+
queueName: string,
402+
concurrencyLimit: number | undefined,
403+
orderableName: string,
404+
queueType: TaskQueueType,
405+
worker: BackgroundWorker,
406+
prisma: PrismaClientOrTransaction,
407+
attempt: number = 0
408+
): Promise<TaskQueue> {
409+
if (attempt > 3) {
410+
throw new Error("Failed to insert queue record");
411+
}
412+
413+
try {
414+
let taskQueue = await prisma.taskQueue.findFirst({
415+
where: {
416+
runtimeEnvironmentId: worker.runtimeEnvironmentId,
417+
name: queueName,
418+
},
419+
});
420+
421+
if (!taskQueue) {
422+
taskQueue = await prisma.taskQueue.create({
423+
data: {
424+
friendlyId: generateFriendlyId("queue"),
425+
version: "V2",
426+
name: queueName,
427+
orderableName,
428+
concurrencyLimit,
429+
runtimeEnvironmentId: worker.runtimeEnvironmentId,
430+
projectId: worker.projectId,
431+
type: queueType,
432+
workers: {
433+
connect: {
434+
id: worker.id,
435+
},
436+
},
437+
},
438+
});
439+
} else {
440+
await prisma.taskQueue.update({
441+
where: {
442+
id: taskQueue.id,
443+
},
444+
data: {
445+
workers: { connect: { id: worker.id } },
446+
version: "V2",
447+
orderableName,
448+
},
449+
});
450+
}
451+
452+
return taskQueue;
453+
} catch (error) {
454+
// If the queue already exists, let's try again
455+
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002") {
456+
return await upsertWorkerQueueRecord(
457+
queueName,
458+
concurrencyLimit,
459+
orderableName,
460+
queueType,
461+
worker,
462+
prisma,
463+
attempt + 1
464+
);
465+
}
466+
throw error;
467+
}
468+
}
429469
//CreateDeclarativeScheduleError with a message
430470
export class CreateDeclarativeScheduleError extends Error {
431471
constructor(message: string) {

packages/cli-v3/src/commands/dev.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { runtimeChecks } from "../utilities/runtimeCheck.js";
1111
import { getProjectClient, LoginResultOk } from "../utilities/session.js";
1212
import { login } from "./login.js";
1313
import { updateTriggerPackages } from "./update.js";
14+
import { createLockFile } from "../dev/lock.js";
1415

1516
const DevCommandOptions = CommonCommandOptions.extend({
1617
debugOtel: z.boolean().default(false),
@@ -120,6 +121,8 @@ async function startDev(options: StartDevOptions) {
120121
displayedUpdateMessage = await updateTriggerPackages(options.cwd, { ...options }, true, true);
121122
}
122123

124+
const removeLockFile = await createLockFile(options.cwd);
125+
123126
let devInstance: DevSessionInstance | undefined;
124127

125128
printDevBanner(displayedUpdateMessage);
@@ -178,6 +181,7 @@ async function startDev(options: StartDevOptions) {
178181
stop: async () => {
179182
devInstance?.stop();
180183
await watcher?.stop();
184+
removeLockFile();
181185
},
182186
waitUntilExit,
183187
};

packages/cli-v3/src/dev/lock.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import path from "node:path";
2+
import { readFile } from "../utilities/fileSystem.js";
3+
import { tryCatch } from "@trigger.dev/core/utils";
4+
import { logger } from "../utilities/logger.js";
5+
import { mkdir, writeFile } from "node:fs/promises";
6+
import { existsSync, unlinkSync } from "node:fs";
7+
import { onExit } from "signal-exit";
8+
9+
const LOCK_FILE_NAME = "dev.lock";
10+
11+
export async function createLockFile(cwd: string) {
12+
const currentPid = process.pid;
13+
const lockFilePath = path.join(cwd, ".trigger", LOCK_FILE_NAME);
14+
15+
logger.debug("Checking for lockfile", { lockFilePath, currentPid });
16+
17+
const removeLockFile = () => {
18+
try {
19+
logger.debug("Removing lockfile", { lockFilePath });
20+
return unlinkSync(lockFilePath);
21+
} catch (e) {
22+
// This sometimes fails on Windows with EBUSY
23+
}
24+
};
25+
const removeExitListener = onExit(removeLockFile);
26+
27+
const [, existingLockfileContents] = await tryCatch(readFile(lockFilePath));
28+
29+
if (existingLockfileContents) {
30+
// Read the pid number from the lockfile
31+
const existingPid = Number(existingLockfileContents);
32+
33+
logger.debug("Lockfile exists", { lockFilePath, existingPid, currentPid });
34+
35+
if (existingPid === currentPid) {
36+
logger.debug("Lockfile exists and is owned by current process", {
37+
lockFilePath,
38+
existingPid,
39+
currentPid,
40+
});
41+
42+
return () => {
43+
removeExitListener();
44+
removeLockFile();
45+
};
46+
}
47+
48+
// If the pid is different, try and kill the existing pid
49+
logger.debug("Lockfile exists and is owned by another process, killing it", {
50+
lockFilePath,
51+
existingPid,
52+
currentPid,
53+
});
54+
55+
try {
56+
process.kill(existingPid);
57+
// If it did kill the process, it will have exited, deleting the lockfile, so let's wait for that to happen
58+
// But let's not wait forever
59+
await new Promise((resolve, reject) => {
60+
const timeout = setTimeout(() => {
61+
clearInterval(interval);
62+
reject(new Error("Timed out waiting for lockfile to be deleted"));
63+
}, 5000);
64+
65+
const interval = setInterval(() => {
66+
if (!existsSync(lockFilePath)) {
67+
clearInterval(interval);
68+
clearTimeout(timeout);
69+
resolve(true);
70+
}
71+
}, 100);
72+
});
73+
} catch (error) {
74+
logger.debug("Failed to kill existing process, lets assume it's not running", { error });
75+
}
76+
}
77+
78+
// Now write the current pid to the lockfile
79+
await writeFileAndEnsureDirExists(lockFilePath, currentPid.toString());
80+
81+
logger.debug("Lockfile created", { lockFilePath, currentPid });
82+
83+
return () => {
84+
removeExitListener();
85+
removeLockFile();
86+
};
87+
}
88+
89+
async function writeFileAndEnsureDirExists(filePath: string, data: string) {
90+
const dir = path.dirname(filePath);
91+
await mkdir(dir, { recursive: true });
92+
await writeFile(filePath, data);
93+
}

0 commit comments

Comments
 (0)