Skip to content

Commit b6e105a

Browse files
committed
Merge branch 'main' into v3/worker-attempt-creation
2 parents ee660a3 + a5a5d3a commit b6e105a

File tree

9 files changed

+169
-13
lines changed

9 files changed

+169
-13
lines changed

.changeset/new-pants-beg.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Fix issue when using SDK in non-node environments by scoping the stream import with node:

apps/webapp/app/consts.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ export const VERCEL_RESPONSE_TIMEOUT_STATUS_CODES = [408, 504];
1313
export const MAX_BATCH_TRIGGER_ITEMS = 100;
1414
export const MAX_TASK_RUN_ATTEMPTS = 250;
1515
export const BULK_ACTION_RUN_LIMIT = 250;
16+
export const MAX_JOB_RUN_EXECUTION_COUNT = 250;

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ const EnvironmentSchema = z.object({
164164
ALERT_RESEND_API_KEY: z.string().optional(),
165165

166166
MAX_SEQUENTIAL_INDEX_FAILURE_COUNT: z.coerce.number().default(96),
167+
168+
LOOPS_API_KEY: z.string().optional(),
167169
});
168170

169171
export type Environment = z.infer<typeof EnvironmentSchema>;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { env } from "~/env.server";
2+
import { logger } from "./logger.server";
3+
4+
class LoopsClient {
5+
constructor(private readonly apiKey: string) {}
6+
7+
async userCreated({
8+
userId,
9+
email,
10+
name,
11+
}: {
12+
userId: string;
13+
email: string;
14+
name: string | null;
15+
}) {
16+
logger.info(`Loops send "sign-up" event`, { userId, email, name });
17+
return this.#sendEvent({
18+
email,
19+
userId,
20+
firstName: name?.split(" ").at(0),
21+
eventName: "sign-up",
22+
});
23+
}
24+
25+
async #sendEvent({
26+
email,
27+
userId,
28+
firstName,
29+
eventName,
30+
eventProperties,
31+
}: {
32+
email: string;
33+
userId: string;
34+
firstName?: string;
35+
eventName: string;
36+
eventProperties?: Record<string, string | number | boolean>;
37+
}) {
38+
const options = {
39+
method: "POST",
40+
headers: { Authorization: `Bearer ${this.apiKey}`, "Content-Type": "application/json" },
41+
body: JSON.stringify({
42+
email,
43+
userId,
44+
firstName,
45+
eventName,
46+
eventProperties,
47+
}),
48+
};
49+
50+
try {
51+
const response = await fetch("https://app.loops.so/api/v1/events/send", options);
52+
53+
if (!response.ok) {
54+
logger.error(`Loops sendEvent ${eventName} bad status`, { status: response.status });
55+
return false;
56+
}
57+
58+
const responseBody = (await response.json()) as any;
59+
60+
if (!responseBody.success) {
61+
logger.error(`Loops sendEvent ${eventName} failed response`, {
62+
message: responseBody.message,
63+
});
64+
return false;
65+
}
66+
67+
return true;
68+
} catch (error) {
69+
logger.error(`Loops sendEvent ${eventName} failed`, { error });
70+
return false;
71+
}
72+
}
73+
}
74+
75+
export const loopsClient = env.LOOPS_API_KEY ? new LoopsClient(env.LOOPS_API_KEY) : null;

apps/webapp/app/services/runs/performRunExecutionV3.server.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
import { generateErrorMessage } from "zod-error";
2626
import { eventRecordToApiJson } from "~/api.server";
2727
import {
28+
MAX_JOB_RUN_EXECUTION_COUNT,
2829
MAX_RUN_CHUNK_EXECUTION_LIMIT,
2930
MAX_RUN_YIELDED_EXECUTIONS,
3031
RUN_CHUNK_EXECUTION_BUFFER,
@@ -141,6 +142,46 @@ export class PerformRunExecutionV3Service {
141142
});
142143
}
143144

145+
if (run.version.status === "DISABLED") {
146+
return await this.#failRunExecution(
147+
this.#prismaClient,
148+
run,
149+
{
150+
message: `Job version ${run.version.version} is disabled, aborting run.`,
151+
},
152+
"ABORTED"
153+
);
154+
}
155+
156+
// If the execution duration is greater than the maximum execution time, we need to fail the run
157+
if (run.executionDuration >= run.organization.maximumExecutionTimePerRunInMs) {
158+
await this.#failRunExecution(
159+
this.#prismaClient,
160+
run,
161+
{
162+
message: `Execution timed out after ${
163+
run.organization.maximumExecutionTimePerRunInMs / 1000
164+
} seconds`,
165+
},
166+
"TIMED_OUT",
167+
0
168+
);
169+
return;
170+
}
171+
172+
if (run.executionCount >= MAX_JOB_RUN_EXECUTION_COUNT) {
173+
await this.#failRunExecution(
174+
this.#prismaClient,
175+
run,
176+
{
177+
message: `Execution timed out after ${run.executionCount} executions`,
178+
},
179+
"TIMED_OUT",
180+
0
181+
);
182+
return;
183+
}
184+
144185
const client = new EndpointApi(run.environment.apiKey, run.endpoint.url);
145186
const event = eventRecordToApiJson(run.event);
146187

apps/webapp/app/services/telemetry.server.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type { Organization } from "~/models/organization.server";
77
import type { Project } from "~/models/project.server";
88
import type { User } from "~/models/user.server";
99
import { singleton } from "~/utils/singleton";
10+
import { loopsClient } from "./loops.server";
1011

1112
type Options = {
1213
postHogApiKey?: string;
@@ -39,18 +40,19 @@ class Telemetry {
3940

4041
user = {
4142
identify: ({ user, isNewUser }: { user: User; isNewUser: boolean }) => {
42-
if (this.#posthogClient === undefined) return;
43-
this.#posthogClient.identify({
44-
distinctId: user.id,
45-
properties: {
46-
email: user.email,
47-
name: user.name,
48-
authenticationMethod: user.authenticationMethod,
49-
admin: user.admin,
50-
createdAt: user.createdAt,
51-
isNewUser,
52-
},
53-
});
43+
if (this.#posthogClient) {
44+
this.#posthogClient.identify({
45+
distinctId: user.id,
46+
properties: {
47+
email: user.email,
48+
name: user.name,
49+
authenticationMethod: user.authenticationMethod,
50+
admin: user.admin,
51+
createdAt: user.createdAt,
52+
isNewUser,
53+
},
54+
});
55+
}
5456
if (isNewUser) {
5557
this.#capture({
5658
userId: user.id,
@@ -64,6 +66,12 @@ class Telemetry {
6466
},
6567
});
6668

69+
loopsClient?.userCreated({
70+
userId: user.id,
71+
email: user.email,
72+
name: user.name,
73+
});
74+
6775
this.#triggerClient?.sendEvent({
6876
name: "user.created",
6977
payload: {

packages/core/src/v3/zodfetch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { APIConnectionError, APIError } from "./apiErrors";
44
import { RetryOptions } from "./schemas";
55
import { calculateNextRetryDelay } from "./utils/retries";
66
import { FormDataEncoder } from "form-data-encoder";
7-
import { Readable } from "stream";
7+
import { Readable } from "node:stream";
88

99
export const defaultRetryOptions = {
1010
maxAttempts: 3,

packages/core/tsup.config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ export default defineConfig({
1717
"./src/v3/workers/index.ts",
1818
"./src/v3/zodfetch.ts",
1919
],
20+
external: ["node:stream"],
2021
});

references/job-catalog/src/stressTest.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,29 @@ client.defineJob({
3737
},
3838
});
3939

40+
client.defineJob({
41+
id: "stress-test-disabled",
42+
name: "Stress Test Disabled",
43+
version: "1.0.0",
44+
trigger: eventTrigger({
45+
name: "stress.test.disabled",
46+
}),
47+
enabled: false,
48+
run: async (payload, io, ctx) => {
49+
await io.wait("wait-1", 20);
50+
51+
await io.runTask(
52+
`task-1`,
53+
async (task) => {
54+
await new Promise((resolve) => setTimeout(resolve, 10000));
55+
},
56+
{ name: `Task 1` }
57+
);
58+
59+
await io.wait("wait-2", 5);
60+
},
61+
});
62+
4063
client.defineJob({
4164
id: "stress-test-2",
4265
name: "Stress Test 2",

0 commit comments

Comments
 (0)