Skip to content

Commit 99bdaaa

Browse files
committed
Implement new machine preset stuff and send usage reports to OpenMeter from webapp
1 parent 04881e4 commit 99bdaaa

File tree

31 files changed

+333
-98
lines changed

31 files changed

+333
-98
lines changed

apps/kubernetes-provider/src/index.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ import {
77
TaskOperationsIndexOptions,
88
TaskOperationsRestoreOptions,
99
} from "@trigger.dev/core-apps";
10-
import { Machine, PostStartCauses, PreStopCauses, EnvironmentType } from "@trigger.dev/core/v3";
10+
import {
11+
MachinePreset,
12+
PostStartCauses,
13+
PreStopCauses,
14+
EnvironmentType,
15+
} from "@trigger.dev/core/v3";
1116
import { randomUUID } from "crypto";
1217
import { TaskMonitor } from "./taskMonitor";
1318
import { PodCleaner } from "./podCleaner";
@@ -398,10 +403,10 @@ class KubernetesTaskOperations implements TaskOperations {
398403
};
399404
}
400405

401-
#getResourcesFromMachineConfig(config: Machine): ComputeResources {
406+
#getResourcesFromMachineConfig(preset: MachinePreset): ComputeResources {
402407
return {
403-
cpu: `${config.cpu}`,
404-
memory: `${config.memory}G`,
408+
cpu: `${preset.cpu}`,
409+
memory: `${preset.memory}G`,
405410
};
406411
}
407412

apps/webapp/app/env.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,16 @@ const EnvironmentSchema = z.object({
192192
/* Usage settings */
193193
USAGE_EVENT_URL: z.string().optional(),
194194
PROD_USAGE_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
195+
196+
CENTS_PER_HOUR_MICRO: z.coerce.number().positive().default(0),
197+
CENTS_PER_HOUR_SMALL_1X: z.coerce.number().positive().default(0),
198+
CENTS_PER_HOUR_SMALL_2X: z.coerce.number().positive().default(0),
199+
CENTS_PER_HOUR_MEDIUM_1X: z.coerce.number().positive().default(0),
200+
CENTS_PER_HOUR_MEDIUM_2X: z.coerce.number().positive().default(0),
201+
CENTS_PER_HOUR_LARGE_1X: z.coerce.number().positive().default(0),
202+
203+
USAGE_OPEN_METER_API_KEY: z.string().optional(),
204+
USAGE_OPEN_METER_BASE_URL: z.string().optional(),
195205
});
196206

197207
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
8080
version: taskRun.lockedToVersion ? taskRun.lockedToVersion.version : undefined,
8181
createdAt: taskRun.createdAt ?? undefined,
8282
updatedAt: taskRun.updatedAt ?? undefined,
83-
startedAt: taskRun.lockedAt ?? undefined,
83+
startedAt: taskRun.startedAt ?? undefined,
8484
finishedAt: ApiRetrieveRunPresenter.isStatusFinished(apiStatus)
8585
? taskRun.updatedAt
8686
: undefined,

apps/webapp/app/presenters/v3/RunListPresenter.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ export class RunListPresenter extends BasePresenter {
156156
runtimeEnvironmentId: string;
157157
status: TaskRunStatus;
158158
createdAt: Date;
159-
lockedAt: Date | null;
159+
startedAt: Date | null;
160160
updatedAt: Date;
161161
isTest: boolean;
162162
spanId: string;
@@ -172,7 +172,7 @@ export class RunListPresenter extends BasePresenter {
172172
tr."runtimeEnvironmentId" AS "runtimeEnvironmentId",
173173
tr.status AS status,
174174
tr."createdAt" AS "createdAt",
175-
tr."lockedAt" AS "lockedAt",
175+
tr."startedAt" AS "startedAt",
176176
tr."updatedAt" AS "updatedAt",
177177
tr."isTest" AS "isTest",
178178
tr."spanId" AS "spanId",
@@ -278,7 +278,7 @@ export class RunListPresenter extends BasePresenter {
278278
number: Number(run.number),
279279
createdAt: run.createdAt.toISOString(),
280280
updatedAt: run.updatedAt.toISOString(),
281-
startedAt: run.lockedAt ? run.lockedAt.toISOString() : undefined,
281+
startedAt: run.startedAt ? run.startedAt.toISOString() : undefined,
282282
hasFinished,
283283
finishedAt: hasFinished ? run.updatedAt.toISOString() : undefined,
284284
isTest: run.isTest,

apps/webapp/app/presenters/v3/TaskListPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ export class TaskListPresenter extends BasePresenter {
311311
>`
312312
SELECT
313313
tr."taskIdentifier",
314-
AVG(EXTRACT(EPOCH FROM (tr."updatedAt" - tr."lockedAt"))) as duration
314+
AVG(EXTRACT(EPOCH FROM (tr."updatedAt" - tr."startedAt"))) as duration
315315
FROM
316316
${sqlDatabaseSchema}."TaskRun" as tr
317317
WHERE

apps/webapp/app/routes/api.v1.usage.ingest.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { ActionFunctionArgs } from "@remix-run/server-runtime";
2+
import { MachinePresetName } from "@trigger.dev/core/v3";
23
import { z } from "zod";
4+
import { prisma } from "~/db.server";
35
import { validateJWTToken } from "~/services/apiAuth.server";
46
import { logger } from "~/services/logger.server";
7+
import { machinePresetFromName } from "~/v3/machinePresets.server";
8+
import { reportUsageEvent } from "~/v3/openMeter.server";
59

610
const JWTPayloadSchema = z.object({
711
environment_id: z.string(),
@@ -11,6 +15,10 @@ const JWTPayloadSchema = z.object({
1115
machine_preset: z.string(),
1216
});
1317

18+
const BodySchema = z.object({
19+
durationMs: z.number(),
20+
});
21+
1422
export async function action({ request }: ActionFunctionArgs) {
1523
// Ensure this is a POST request
1624
if (request.method.toUpperCase() !== "POST") {
@@ -27,7 +35,43 @@ export async function action({ request }: ActionFunctionArgs) {
2735

2836
const jwtPayload = await validateJWTToken(jwt, JWTPayloadSchema);
2937

30-
logger.debug("Validated JWT", { jwtPayload });
38+
const rawJson = await request.json();
39+
40+
const json = BodySchema.safeParse(rawJson);
41+
42+
if (!json.success) {
43+
logger.error("Failed to parse request body", { rawJson });
44+
45+
return { status: 400, body: "Bad Request" };
46+
}
47+
48+
const preset = machinePresetFromName(jwtPayload.machine_preset as MachinePresetName);
49+
50+
logger.debug("Validated JWT", { jwtPayload, json: json.data, preset });
51+
52+
await prisma.taskRun.update({
53+
where: {
54+
id: jwtPayload.run_id,
55+
},
56+
data: {
57+
usageDurationMs: {
58+
increment: json.data.durationMs,
59+
},
60+
costInCents: {
61+
increment: json.data.durationMs * preset.centsPerMs,
62+
},
63+
},
64+
});
65+
66+
await reportUsageEvent({
67+
source: "webapp",
68+
type: "usage",
69+
subject: jwtPayload.org_id,
70+
data: {
71+
durationMs: json.data.durationMs,
72+
costInCents: json.data.durationMs * preset.centsPerMs,
73+
},
74+
});
3175

3276
return new Response(null, {
3377
status: 200,

apps/webapp/app/services/apiRateLimit.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({
145145
"/api/internal/stripe_webhooks",
146146
"/api/v1/authorization-code",
147147
"/api/v1/token",
148+
"/api/v1/usage/ingest",
148149
/^\/api\/v1\/tasks\/[^\/]+\/callback\/[^\/]+$/, // /api/v1/tasks/$id/callback/$secret
149150
/^\/api\/v1\/runs\/[^\/]+\/tasks\/[^\/]+\/callback\/[^\/]+$/, // /api/v1/runs/$runId/tasks/$id/callback/$secret
150151
/^\/api\/v1\/http-endpoints\/[^\/]+\/env\/[^\/]+\/[^\/]+$/, // /api/v1/http-endpoints/$httpEndpointId/env/$envType/$shortcode
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import { MachineConfig, MachinePreset, MachinePresetName } from "@trigger.dev/core/v3";
2+
import { env } from "~/env.server";
3+
import { logger } from "~/services/logger.server";
4+
5+
export const presets = {
6+
micro: {
7+
cpu: 0.25,
8+
memory: 0.25,
9+
centsPerMs: env.CENTS_PER_HOUR_MICRO / 3_600_000,
10+
},
11+
"small-1x": {
12+
cpu: 0.5,
13+
memory: 0.5,
14+
centsPerMs: env.CENTS_PER_HOUR_SMALL_1X / 3_600_000,
15+
},
16+
"small-2x": {
17+
cpu: 1,
18+
memory: 1,
19+
centsPerMs: env.CENTS_PER_HOUR_SMALL_2X / 3_600_000,
20+
},
21+
"medium-1x": {
22+
cpu: 1,
23+
memory: 2,
24+
centsPerMs: env.CENTS_PER_HOUR_MEDIUM_1X / 3_600_000,
25+
},
26+
"medium-2x": {
27+
cpu: 2,
28+
memory: 4,
29+
centsPerMs: env.CENTS_PER_HOUR_MEDIUM_2X / 3_600_000,
30+
},
31+
"large-1x": {
32+
cpu: 4,
33+
memory: 8,
34+
centsPerMs: env.CENTS_PER_HOUR_LARGE_1X / 3_600_000,
35+
},
36+
};
37+
38+
export function machinePresetFromConfig(config: unknown): MachinePreset {
39+
const parsedConfig = MachineConfig.safeParse(config);
40+
41+
if (!parsedConfig.success) {
42+
logger.error("Failed to parse machine config", { config });
43+
44+
return machinePresetFromName("small-1x");
45+
}
46+
47+
if (parsedConfig.data.preset) {
48+
return machinePresetFromName(parsedConfig.data.preset);
49+
}
50+
51+
if (parsedConfig.data.cpu && parsedConfig.data.memory) {
52+
const name = derivePresetNameFromValues(parsedConfig.data.cpu, parsedConfig.data.memory);
53+
54+
return machinePresetFromName(name);
55+
}
56+
57+
return machinePresetFromName("small-1x");
58+
}
59+
60+
export function machinePresetFromName(name: MachinePresetName): MachinePreset {
61+
return {
62+
name,
63+
...presets[name],
64+
};
65+
}
66+
67+
// Finds the smallest machine preset name that satisfies the given CPU and memory requirements
68+
function derivePresetNameFromValues(cpu: number, memory: number): MachinePresetName {
69+
for (const [name, preset] of Object.entries(presets)) {
70+
if (preset.cpu >= cpu && preset.memory >= memory) {
71+
return name as MachinePresetName;
72+
}
73+
}
74+
75+
return "small-1x";
76+
}

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ export class DevQueueConsumer {
415415
lockedById: backgroundTask.id,
416416
status: "EXECUTING",
417417
lockedToVersionId: backgroundWorker.id,
418+
startedAt: existingTaskRun.startedAt ?? new Date(),
418419
},
419420
include: {
420421
attempts: {
@@ -520,6 +521,7 @@ export class DevQueueConsumer {
520521
lockedAt: null,
521522
lockedById: null,
522523
status: "PENDING",
524+
startedAt: existingTaskRun.startedAt,
523525
},
524526
}),
525527
]);
@@ -577,6 +579,7 @@ export class DevQueueConsumer {
577579
lockedAt: null,
578580
lockedById: null,
579581
status: "PENDING",
582+
startedAt: existingTaskRun.startedAt,
580583
},
581584
}),
582585
]);

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Context, ROOT_CONTEXT, Span, SpanKind, context, trace } from "@opentelemetry/api";
22
import {
3-
Machine,
3+
MachinePreset,
44
ProdTaskRunExecution,
55
ProdTaskRunExecutionPayload,
66
TaskRunError,
@@ -41,6 +41,7 @@ import { RestoreCheckpointService } from "../services/restoreCheckpoint.server";
4141
import { tracer } from "../tracer.server";
4242
import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
4343
import { EnvironmentVariable } from "../environmentVariables/repository";
44+
import { machinePresetFromConfig } from "../machinePresets.server";
4445

4546
const WithTraceContext = z.object({
4647
traceparent: z.string().optional(),
@@ -409,6 +410,7 @@ export class SharedQueueConsumer {
409410
lockedAt: new Date(),
410411
lockedById: backgroundTask.id,
411412
lockedToVersionId: deployment.worker.id,
413+
startedAt: existingTaskRun.startedAt ?? new Date(),
412414
},
413415
include: {
414416
runtimeEnvironment: true,
@@ -509,26 +511,15 @@ export class SharedQueueConsumer {
509511
});
510512
} else {
511513
const machineConfig = lockedTaskRun.lockedBy?.machineConfig;
512-
const machine = Machine.safeParse(machineConfig ?? {});
513-
514-
if (!machine.success) {
515-
logger.error("Failed to parse machine config", {
516-
queueMessage: message.data,
517-
messageId: message.messageId,
518-
machineConfig,
519-
});
520-
521-
await this.#ackAndDoMoreWork(message.messageId);
522-
return;
523-
}
514+
const machine = machinePresetFromConfig(machineConfig ?? {});
524515

525516
await this._sender.send("BACKGROUND_WORKER_MESSAGE", {
526517
backgroundWorkerId: deployment.worker.friendlyId,
527518
data: {
528519
type: "SCHEDULE_ATTEMPT",
529520
image: deployment.imageReference,
530521
version: deployment.version,
531-
machine: machine.data,
522+
machine,
532523
// identifiers
533524
id: "placeholder", // TODO: Remove this completely in a future release
534525
envId: lockedTaskRun.runtimeEnvironment.id,
@@ -558,6 +549,7 @@ export class SharedQueueConsumer {
558549
lockedAt: null,
559550
lockedById: null,
560551
status: lockedTaskRun.status,
552+
startedAt: existingTaskRun.startedAt,
561553
},
562554
}),
563555
]);
@@ -1012,6 +1004,8 @@ class SharedQueueTasks {
10121004

10131005
const { backgroundWorkerTask, taskRun, queue } = attempt;
10141006

1007+
const machinePreset = machinePresetFromConfig(backgroundWorkerTask.machineConfig ?? {});
1008+
10151009
const execution: ProdTaskRunExecution = {
10161010
task: {
10171011
id: backgroundWorkerTask.slug,
@@ -1065,9 +1059,14 @@ class SharedQueueTasks {
10651059
contentHash: attempt.backgroundWorker.contentHash,
10661060
version: attempt.backgroundWorker.version,
10671061
},
1062+
machine: machinePreset,
10681063
};
10691064

1070-
const variables = await this.#buildEnvironmentVariables(attempt.runtimeEnvironment, taskRun);
1065+
const variables = await this.#buildEnvironmentVariables(
1066+
attempt.runtimeEnvironment,
1067+
taskRun,
1068+
machinePreset
1069+
);
10711070

10721071
const payload: ProdTaskRunExecutionPayload = {
10731072
execution,
@@ -1126,14 +1125,19 @@ class SharedQueueTasks {
11261125
id: runId,
11271126
runtimeEnvironmentId: environment.id,
11281127
},
1128+
include: {
1129+
lockedBy: true,
1130+
},
11291131
});
11301132

11311133
if (!run) {
11321134
logger.error("Run not found", { id: runId, envId });
11331135
return;
11341136
}
11351137

1136-
const variables = await this.#buildEnvironmentVariables(environment, run);
1138+
const machinePreset = machinePresetFromConfig(run.lockedBy?.machineConfig ?? {});
1139+
1140+
const variables = await this.#buildEnvironmentVariables(environment, run, machinePreset);
11371141

11381142
return {
11391143
traceContext: run.traceContext as Record<string, unknown>,
@@ -1177,13 +1181,14 @@ class SharedQueueTasks {
11771181

11781182
async #buildEnvironmentVariables(
11791183
environment: RuntimeEnvironment,
1180-
run: TaskRun
1184+
run: TaskRun,
1185+
machinePreset: MachinePreset
11811186
): Promise<Array<EnvironmentVariable>> {
11821187
const variables = await resolveVariablesForEnvironment(environment);
11831188

11841189
const jwt = await generateJWTTokenForEnvironment(environment, {
11851190
run_id: run.id,
1186-
machine_present: "tiny-1x",
1191+
machine_preset: machinePreset.name,
11871192
});
11881193

11891194
return [
@@ -1193,7 +1198,7 @@ class SharedQueueTasks {
11931198
{ key: "TRIGGER_RUN_ID", value: run.id },
11941199
{
11951200
key: "TRIGGER_MACHINE_PRESET",
1196-
value: "tiny-1x",
1201+
value: machinePreset.name,
11971202
},
11981203
],
11991204
];

0 commit comments

Comments
 (0)