Skip to content

Commit c05d958

Browse files
committed
Report start run usage
1 parent c0b442a commit c05d958

File tree

4 files changed

+75
-15
lines changed

4 files changed

+75
-15
lines changed

apps/webapp/app/routes/api.v1.usage.ingest.ts

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { z } from "zod";
44
import { prisma } from "~/db.server";
55
import { validateJWTToken } from "~/services/apiAuth.server";
66
import { logger } from "~/services/logger.server";
7+
import { workerQueue } from "~/services/worker.server";
78
import { machinePresetFromName } from "~/v3/machinePresets.server";
89
import { reportUsageEvent } from "~/v3/openMeter.server";
910

@@ -66,15 +67,29 @@ export async function action({ request }: ActionFunctionArgs) {
6667
},
6768
});
6869

69-
await reportUsageEvent({
70-
source: "webapp",
71-
type: "usage",
72-
subject: jwtPayload.org_id,
73-
data: {
74-
durationMs: json.data.durationMs,
75-
costInCents: String(costInCents),
76-
},
77-
});
70+
try {
71+
await reportUsageEvent({
72+
source: "webapp",
73+
type: "usage",
74+
subject: jwtPayload.org_id,
75+
data: {
76+
durationMs: json.data.durationMs,
77+
costInCents: String(costInCents),
78+
},
79+
});
80+
} catch (e) {
81+
logger.error("Failed to report usage event, enqueing v3.reportUsage", { error: e });
82+
83+
await workerQueue.enqueue("v3.reportUsage", {
84+
orgId: jwtPayload.org_id,
85+
data: {
86+
costInCents: String(costInCents),
87+
},
88+
additionalData: {
89+
durationMs: json.data.durationMs,
90+
},
91+
});
92+
}
7893
}
7994

8095
return new Response(null, {

apps/webapp/app/services/worker.server.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout.se
4646
import { ResumeTaskService } from "./tasks/resumeTask.server";
4747
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
4848
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
49+
import { reportUsageEvent } from "~/v3/openMeter.server";
4950

5051
const workerCatalog = {
5152
indexEndpoint: z.object({
@@ -169,6 +170,13 @@ const workerCatalog = {
169170
"v2.requeueMessage": z.object({
170171
runId: z.string(),
171172
}),
173+
"v3.reportUsage": z.object({
174+
orgId: z.string(),
175+
data: z.object({
176+
costInCents: z.string(),
177+
}),
178+
additionalData: z.record(z.any()).optional(),
179+
}),
172180
};
173181

174182
const executionWorkerCatalog = {
@@ -649,6 +657,21 @@ function getWorkerQueue() {
649657
await service.call(payload.runId);
650658
},
651659
},
660+
"v3.reportUsage": {
661+
priority: 0,
662+
maxAttempts: 8,
663+
handler: async (payload, job) => {
664+
await reportUsageEvent({
665+
source: "webapp",
666+
type: "usage",
667+
subject: payload.orgId,
668+
data: {
669+
costInCents: payload.data.costInCents,
670+
...payload.additionalData,
671+
},
672+
});
673+
},
674+
},
652675
},
653676
});
654677
}

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,21 @@ import { logger } from "~/services/logger.server";
55
import { generateFriendlyId } from "../friendlyIdentifiers";
66
import { BaseService, ServiceValidationError } from "./baseService.server";
77
import { TaskRun, TaskRunAttempt } from "@trigger.dev/database";
8+
import { machinePresetFromConfig } from "../machinePresets.server";
9+
import { workerQueue } from "~/services/worker.server";
810

911
export class CreateTaskRunAttemptService extends BaseService {
1012
public async call(
1113
runId: string,
12-
env?: AuthenticatedEnvironment,
14+
authenticatedEnv?: AuthenticatedEnvironment,
1315
setToExecuting = true
1416
): Promise<{
1517
execution: TaskRunExecution;
1618
run: TaskRun;
1719
attempt: TaskRunAttempt;
1820
}> {
19-
const environment = env ?? (await getAuthenticatedEnvironmentFromRun(runId, this._prisma));
21+
const environment =
22+
authenticatedEnv ?? (await getAuthenticatedEnvironmentFromRun(runId, this._prisma));
2023

2124
if (!environment) {
2225
throw new ServiceValidationError("Environment not found", 404);
@@ -128,6 +131,20 @@ export class CreateTaskRunAttemptService extends BaseService {
128131
throw new ServiceValidationError("Failed to create task run attempt", 500);
129132
}
130133

134+
if (taskRunAttempt.number === 1 && taskRun.baseCostInCents > 0) {
135+
await workerQueue.enqueue("v3.reportUsage", {
136+
orgId: environment.organizationId,
137+
data: {
138+
costInCents: String(taskRun.baseCostInCents),
139+
},
140+
additionalData: {
141+
runId: taskRun.id,
142+
},
143+
});
144+
}
145+
146+
const machinePreset = machinePresetFromConfig(taskRun.lockedBy.machineConfig ?? {});
147+
131148
const execution: TaskRunExecution = {
132149
task: {
133150
id: taskRun.lockedBy.slug,
@@ -151,6 +168,10 @@ export class CreateTaskRunAttemptService extends BaseService {
151168
tags: taskRun.tags.map((tag) => tag.name),
152169
isTest: taskRun.isTest,
153170
idempotencyKey: taskRun.idempotencyKey ?? undefined,
171+
startedAt: taskRun.startedAt ?? taskRun.createdAt,
172+
durationMs: taskRun.usageDurationMs,
173+
costInCents: taskRun.costInCents,
174+
baseCostInCents: taskRun.baseCostInCents,
154175
},
155176
queue: {
156177
id: queue.friendlyId,
@@ -176,6 +197,7 @@ export class CreateTaskRunAttemptService extends BaseService {
176197
taskRun.batchItems[0] && taskRun.batchItems[0].batchTaskRun
177198
? { id: taskRun.batchItems[0].batchTaskRun.friendlyId }
178199
: undefined,
200+
machine: machinePreset,
179201
};
180202

181203
return {

packages/trigger-sdk/src/v3/usage.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,18 @@ export const usage = {
102102
* export const myTask = task({
103103
* id: "my-task",
104104
* run: async (payload, { ctx }) => {
105-
* const { result, usage } = await usage.measure(async () => {
105+
* const { result, compute } = await usage.measure(async () => {
106106
* // Do some work
107107
* return "result";
108108
* });
109109
*
110110
* console.log("Result", result);
111-
* console.log("Cost and duration", { cost: usage.costInCents, duration: usage.durationMs });
111+
* console.log("Cost and duration", { cost: compute.costInCents, duration: compute.durationMs });
112112
* },
113113
* });
114114
* ```
115115
*/
116-
measure: async <T>(cb: () => Promise<T>): Promise<{ result: T; usage: ComputeUsage }> => {
116+
measure: async <T>(cb: () => Promise<T>): Promise<{ result: T; compute: ComputeUsage }> => {
117117
const measurement = usageApi.start();
118118

119119
const result = await cb();
@@ -125,7 +125,7 @@ export const usage = {
125125

126126
return {
127127
result,
128-
usage: {
128+
compute: {
129129
costInCents,
130130
durationMs: sample.cpuTime,
131131
},

0 commit comments

Comments
 (0)