Skip to content

Commit 85a543d

Browse files
v3: usage tracking (#1163)
* Starting to measure wall time and cpu time in the workers, and reporting that via otel and to completed task run attempts * Move usage tracking outside of the executor * WIP prod usage tracking * WIP * WIP custom fetch to openmeter * Create a usage client * WIP * WIP * Implement new machine preset stuff and send usage reports to OpenMeter from webapp * WIP * Expose usage info to the client * Add usage and cost to TaskEvent * Add ability to globally configure the task machine preset * Report start run usage * Change the machine docs to use presets * setExpirationTime to 24h * Removed logs * Update machines.mdx * Removed console.logs * Handle revalidating JWT tokens * Couple tweaks --------- Co-authored-by: Matt Aitken <[email protected]>
1 parent 10ceb85 commit 85a543d

File tree

70 files changed

+1598
-320
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1598
-320
lines changed

apps/kubernetes-provider/src/index.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ import {
77
TaskOperationsIndexOptions,
88
TaskOperationsRestoreOptions,
99
} from "@trigger.dev/core-apps";
10-
import { Machine, PostStartCauses, PreStopCauses, EnvironmentType } from "@trigger.dev/core/v3";
10+
import {
11+
MachinePreset,
12+
PostStartCauses,
13+
PreStopCauses,
14+
EnvironmentType,
15+
} from "@trigger.dev/core/v3";
1116
import { randomUUID } from "crypto";
1217
import { TaskMonitor } from "./taskMonitor";
1318
import { PodCleaner } from "./podCleaner";
@@ -399,10 +404,10 @@ class KubernetesTaskOperations implements TaskOperations {
399404
};
400405
}
401406

402-
#getResourcesFromMachineConfig(config: Machine): ComputeResources {
407+
#getResourcesFromMachineConfig(preset: MachinePreset): ComputeResources {
403408
return {
404-
cpu: `${config.cpu}`,
405-
memory: `${config.memory}G`,
409+
cpu: `${preset.cpu}`,
410+
memory: `${preset.memory}G`,
406411
};
407412
}
408413

apps/webapp/app/env.server.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { z } from "zod";
21
import { SecretStoreOptionsSchema } from "./services/secrets/secretStoreOptionsSchema.server";
2+
import { z } from "zod";
33
import { isValidRegex } from "./utils/regex";
44
import { isValidDatabaseUrl } from "./utils/db";
55

@@ -189,6 +189,21 @@ const EnvironmentSchema = z.object({
189189
V2_MARQS_VERBOSE: z.string().default("0"),
190190
V3_MARQS_CONCURRENCY_MONITOR_ENABLED: z.string().default("0"),
191191
V2_MARQS_CONCURRENCY_MONITOR_ENABLED: z.string().default("0"),
192+
/* Usage settings */
193+
USAGE_EVENT_URL: z.string().optional(),
194+
PROD_USAGE_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
195+
196+
CENTS_PER_HOUR_MICRO: z.coerce.number().default(0),
197+
CENTS_PER_HOUR_SMALL_1X: z.coerce.number().default(0),
198+
CENTS_PER_HOUR_SMALL_2X: z.coerce.number().default(0),
199+
CENTS_PER_HOUR_MEDIUM_1X: z.coerce.number().default(0),
200+
CENTS_PER_HOUR_MEDIUM_2X: z.coerce.number().default(0),
201+
CENTS_PER_HOUR_LARGE_1X: z.coerce.number().default(0),
202+
CENTS_PER_HOUR_LARGE_2X: z.coerce.number().default(0),
203+
BASE_RUN_COST_IN_CENTS: z.coerce.number().default(0),
204+
205+
USAGE_OPEN_METER_API_KEY: z.string().optional(),
206+
USAGE_OPEN_METER_BASE_URL: z.string().optional(),
192207
});
193208

194209
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
8080
version: taskRun.lockedToVersion ? taskRun.lockedToVersion.version : undefined,
8181
createdAt: taskRun.createdAt ?? undefined,
8282
updatedAt: taskRun.updatedAt ?? undefined,
83-
startedAt: taskRun.lockedAt ?? undefined,
83+
startedAt: taskRun.startedAt ?? undefined,
8484
finishedAt: ApiRetrieveRunPresenter.isStatusFinished(apiStatus)
8585
? taskRun.updatedAt
8686
: undefined,

apps/webapp/app/presenters/v3/RunListPresenter.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ export class RunListPresenter extends BasePresenter {
156156
runtimeEnvironmentId: string;
157157
status: TaskRunStatus;
158158
createdAt: Date;
159-
lockedAt: Date | null;
159+
startedAt: Date | null;
160160
updatedAt: Date;
161161
isTest: boolean;
162162
spanId: string;
@@ -172,7 +172,7 @@ export class RunListPresenter extends BasePresenter {
172172
tr."runtimeEnvironmentId" AS "runtimeEnvironmentId",
173173
tr.status AS status,
174174
tr."createdAt" AS "createdAt",
175-
tr."lockedAt" AS "lockedAt",
175+
tr."startedAt" AS "startedAt",
176176
tr."updatedAt" AS "updatedAt",
177177
tr."isTest" AS "isTest",
178178
tr."spanId" AS "spanId",
@@ -278,7 +278,7 @@ export class RunListPresenter extends BasePresenter {
278278
number: Number(run.number),
279279
createdAt: run.createdAt.toISOString(),
280280
updatedAt: run.updatedAt.toISOString(),
281-
startedAt: run.lockedAt ? run.lockedAt.toISOString() : undefined,
281+
startedAt: run.startedAt ? run.startedAt.toISOString() : undefined,
282282
hasFinished,
283283
finishedAt: hasFinished ? run.updatedAt.toISOString() : undefined,
284284
isTest: run.isTest,

apps/webapp/app/presenters/v3/TaskListPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ export class TaskListPresenter extends BasePresenter {
311311
>`
312312
SELECT
313313
tr."taskIdentifier",
314-
AVG(EXTRACT(EPOCH FROM (tr."updatedAt" - tr."lockedAt"))) as duration
314+
AVG(EXTRACT(EPOCH FROM (tr."updatedAt" - tr."startedAt"))) as duration
315315
FROM
316316
${sqlDatabaseSchema}."TaskRun" as tr
317317
WHERE

apps/webapp/app/routes/api.v1.deployments.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2-
import { InitializeDeploymentRequestBody, InitializeDeploymentResponseBody } from "@trigger.dev/core/v3";
2+
import {
3+
InitializeDeploymentRequestBody,
4+
InitializeDeploymentResponseBody,
5+
} from "@trigger.dev/core/v3";
36
import { env } from "~/env.server";
47
import { authenticateApiRequest } from "~/services/apiAuth.server";
58
import { logger } from "~/services/logger.server";
@@ -37,13 +40,11 @@ export async function action({ request, params }: ActionFunctionArgs) {
3740
contentHash: deployment.contentHash,
3841
shortCode: deployment.shortCode,
3942
version: deployment.version,
40-
externalBuildData: deployment.externalBuildData as InitializeDeploymentResponseBody["externalBuildData"],
43+
externalBuildData:
44+
deployment.externalBuildData as InitializeDeploymentResponseBody["externalBuildData"],
4145
imageTag,
42-
registryHost: env.DEPLOY_REGISTRY_HOST
43-
}
46+
registryHost: env.DEPLOY_REGISTRY_HOST,
47+
};
4448

45-
return json(
46-
responseBody,
47-
{ status: 200 }
48-
);
49+
return json(responseBody, { status: 200 });
4950
}

apps/webapp/app/routes/api.v1.projects.$projectRef.envvars.$slug.$name.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
123123

124124
const repository = new EnvironmentVariablesRepository();
125125

126-
const variables = await repository.getEnvironment(environment.project.id, environment.id, true);
126+
const variables = await repository.getEnvironment(environment.project.id, environment.id);
127127

128128
const environmentVariable = variables.find((v) => v.key === parsedParams.data.name);
129129

apps/webapp/app/routes/api.v1.projects.$projectRef.envvars.$slug.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
8080

8181
const repository = new EnvironmentVariablesRepository();
8282

83-
const variables = await repository.getEnvironment(environment.project.id, environment.id, true);
83+
const variables = await repository.getEnvironment(environment.project.id, environment.id);
8484

8585
return json(variables.map((variable) => ({ name: variable.key, value: variable.value })));
8686
}

apps/webapp/app/routes/api.v1.projects.$projectRef.envvars.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { LoaderFunctionArgs, json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { prisma } from "~/db.server";
44
import { authenticateApiRequest } from "~/services/apiAuth.server";
5-
import { EnvironmentVariablesRepository } from "~/v3/environmentVariables/environmentVariablesRepository.server";
5+
import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server";
66

77
const ParamsSchema = z.object({
88
projectRef: z.string(),
@@ -41,9 +41,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
4141
return json({ error: "Project not found" }, { status: 404 });
4242
}
4343

44-
const repository = new EnvironmentVariablesRepository();
45-
46-
const variables = await repository.getEnvironmentVariables(project.id, authenticatedEnv.id);
44+
const variables = await resolveVariablesForEnvironment(authenticatedEnv);
4745

4846
return json({
4947
variables: variables.reduce((acc: Record<string, string>, variable) => {
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import { ActionFunctionArgs } from "@remix-run/server-runtime";
2+
import { MachinePresetName } from "@trigger.dev/core/v3";
3+
import { z } from "zod";
4+
import { prisma } from "~/db.server";
5+
import { validateJWTTokenAndRenew } from "~/services/apiAuth.server";
6+
import { logger } from "~/services/logger.server";
7+
import { workerQueue } from "~/services/worker.server";
8+
import { machinePresetFromName } from "~/v3/machinePresets.server";
9+
import { reportUsageEvent } from "~/v3/openMeter.server";
10+
11+
const JWTPayloadSchema = z.object({
12+
environment_id: z.string(),
13+
org_id: z.string(),
14+
project_id: z.string(),
15+
run_id: z.string(),
16+
machine_preset: z.string(),
17+
});
18+
19+
const BodySchema = z.object({
20+
durationMs: z.number(),
21+
});
22+
23+
export async function action({ request }: ActionFunctionArgs) {
24+
// Ensure this is a POST request
25+
if (request.method.toUpperCase() !== "POST") {
26+
return { status: 405, body: "Method Not Allowed" };
27+
}
28+
29+
const jwtResult = await validateJWTTokenAndRenew(request, JWTPayloadSchema);
30+
31+
if (!jwtResult) {
32+
return { status: 401, body: "Unauthorized" };
33+
}
34+
35+
const rawJson = await request.json();
36+
37+
const json = BodySchema.safeParse(rawJson);
38+
39+
if (!json.success) {
40+
logger.error("Failed to parse request body", { rawJson });
41+
42+
return { status: 400, body: "Bad Request" };
43+
}
44+
45+
const preset = machinePresetFromName(jwtResult.payload.machine_preset as MachinePresetName);
46+
47+
logger.debug("[/api/v1/usage/ingest] Reporting usage", { jwtResult, json: json.data, preset });
48+
49+
if (json.data.durationMs > 0) {
50+
const costInCents = json.data.durationMs * preset.centsPerMs;
51+
52+
await prisma.taskRun.update({
53+
where: {
54+
id: jwtResult.payload.run_id,
55+
},
56+
data: {
57+
usageDurationMs: {
58+
increment: json.data.durationMs,
59+
},
60+
costInCents: {
61+
increment: json.data.durationMs * preset.centsPerMs,
62+
},
63+
},
64+
});
65+
66+
try {
67+
await reportUsageEvent({
68+
source: "webapp",
69+
type: "usage",
70+
subject: jwtResult.payload.org_id,
71+
data: {
72+
durationMs: json.data.durationMs,
73+
costInCents: String(costInCents),
74+
},
75+
});
76+
} catch (e) {
77+
logger.error("Failed to report usage event, enqueing v3.reportUsage", { error: e });
78+
79+
await workerQueue.enqueue("v3.reportUsage", {
80+
orgId: jwtResult.payload.org_id,
81+
data: {
82+
costInCents: String(costInCents),
83+
},
84+
additionalData: {
85+
durationMs: json.data.durationMs,
86+
},
87+
});
88+
}
89+
}
90+
91+
return new Response(null, {
92+
status: 200,
93+
headers: {
94+
"x-trigger-jwt": jwtResult.jwt,
95+
},
96+
});
97+
}

0 commit comments

Comments
 (0)