Skip to content

Commit 0f3dab7

Browse files
committed
WIP
1 parent 99bdaaa commit 0f3dab7

File tree

24 files changed

+242
-86
lines changed

24 files changed

+242
-86
lines changed

apps/webapp/app/routes/api.v1.usage.ingest.ts

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,29 +49,33 @@ export async function action({ request }: ActionFunctionArgs) {
4949

5050
logger.debug("Validated JWT", { jwtPayload, json: json.data, preset });
5151

52-
await prisma.taskRun.update({
53-
where: {
54-
id: jwtPayload.run_id,
55-
},
56-
data: {
57-
usageDurationMs: {
58-
increment: json.data.durationMs,
52+
if (json.data.durationMs > 10) {
53+
const costInCents = json.data.durationMs * preset.centsPerMs;
54+
55+
await prisma.taskRun.update({
56+
where: {
57+
id: jwtPayload.run_id,
5958
},
60-
costInCents: {
61-
increment: json.data.durationMs * preset.centsPerMs,
59+
data: {
60+
usageDurationMs: {
61+
increment: json.data.durationMs,
62+
},
63+
costInCents: {
64+
increment: json.data.durationMs * preset.centsPerMs,
65+
},
6266
},
63-
},
64-
});
65-
66-
await reportUsageEvent({
67-
source: "webapp",
68-
type: "usage",
69-
subject: jwtPayload.org_id,
70-
data: {
71-
durationMs: json.data.durationMs,
72-
costInCents: json.data.durationMs * preset.centsPerMs,
73-
},
74-
});
67+
});
68+
69+
await reportUsageEvent({
70+
source: "webapp",
71+
type: "usage",
72+
subject: jwtPayload.org_id,
73+
data: {
74+
durationMs: json.data.durationMs,
75+
costInCents: String(costInCents),
76+
},
77+
});
78+
}
7579

7680
return new Response(null, {
7781
status: 200,

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,9 +1026,12 @@ class SharedQueueTasks {
10261026
payloadType: taskRun.payloadType,
10271027
context: taskRun.context,
10281028
createdAt: taskRun.createdAt,
1029+
startedAt: taskRun.startedAt ?? taskRun.createdAt,
10291030
tags: taskRun.tags.map((tag) => tag.name),
10301031
isTest: taskRun.isTest,
10311032
idempotencyKey: taskRun.idempotencyKey ?? undefined,
1033+
durationMs: taskRun.usageDurationMs,
1034+
costInCents: taskRun.costInCents,
10321035
},
10331036
queue: {
10341037
id: queue.friendlyId,

apps/webapp/app/v3/otlpExporter.server.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,20 @@ function extractResourceProperties(attributes: KeyValue[]) {
364364
queueName: extractStringAttribute(attributes, SemanticInternalAttributes.QUEUE_NAME),
365365
batchId: extractStringAttribute(attributes, SemanticInternalAttributes.BATCH_ID),
366366
idempotencyKey: extractStringAttribute(attributes, SemanticInternalAttributes.IDEMPOTENCY_KEY),
367+
machinePreset: extractStringAttribute(
368+
attributes,
369+
SemanticInternalAttributes.MACHINE_PRESET_NAME
370+
),
371+
machinePresetCpu:
372+
extractDoubleAttribute(attributes, SemanticInternalAttributes.MACHINE_PRESET_CPU) ??
373+
extractNumberAttribute(attributes, SemanticInternalAttributes.MACHINE_PRESET_CPU),
374+
machinePresetMemory:
375+
extractDoubleAttribute(attributes, SemanticInternalAttributes.MACHINE_PRESET_MEMORY) ??
376+
extractNumberAttribute(attributes, SemanticInternalAttributes.MACHINE_PRESET_MEMORY),
377+
machinePresetCentsPerMs: extractDoubleAttribute(
378+
attributes,
379+
SemanticInternalAttributes.MACHINE_PRESET_CENTS_PER_MS
380+
),
367381
};
368382
}
369383

packages/cli-v3/src/workers/dev/worker-facade.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ const handler = new ZodMessageHandler({
183183

184184
const measurement = usage.start();
185185

186-
const { result } = await executor.execute(execution, metadata, traceContext);
186+
const { result } = await executor.execute(execution, metadata, traceContext, measurement);
187187

188188
const usageSample = usage.stop(measurement);
189189

packages/cli-v3/src/workers/prod/worker-facade.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ const zodIpc = new ZodIpcConnection({
176176

177177
const measurement = usage.start();
178178

179-
const { result } = await executor.execute(execution, metadata, traceContext);
179+
const { result } = await executor.execute(execution, metadata, traceContext, measurement);
180180

181181
const usageSample = usage.stop(measurement);
182182

packages/core/src/v3/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export * from "./logger-api";
1010
export * from "./runtime-api";
1111
export * from "./task-context-api";
1212
export * from "./apiClientManager-api";
13+
export * from "./usage-api";
1314
export * from "./schemas";
1415
export { SemanticInternalAttributes } from "./semanticInternalAttributes";
1516
export * from "./task-catalog-api";

packages/core/src/v3/schemas/common.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,57 @@
11
import { z } from "zod";
22

3+
// Defaults to 0.5
4+
export const MachineCpu = z.union([
5+
z.literal(0.25),
6+
z.literal(0.5),
7+
z.literal(1),
8+
z.literal(2),
9+
z.literal(4),
10+
]);
11+
12+
export type MachineCpu = z.infer<typeof MachineCpu>;
13+
14+
// Defaults to 1
15+
export const MachineMemory = z.union([
16+
z.literal(0.25),
17+
z.literal(0.5),
18+
z.literal(1),
19+
z.literal(2),
20+
z.literal(4),
21+
z.literal(8),
22+
]);
23+
24+
export type MachineMemory = z.infer<typeof MachineMemory>;
25+
26+
// Default is small-1x
27+
export const MachinePresetName = z.enum([
28+
"micro",
29+
"small-1x",
30+
"small-2x",
31+
"medium-1x",
32+
"medium-2x",
33+
"large-1x",
34+
]);
35+
36+
export type MachinePresetName = z.infer<typeof MachinePresetName>;
37+
38+
export const MachineConfig = z.object({
39+
cpu: MachineCpu.optional(),
40+
memory: MachineMemory.optional(),
41+
preset: MachinePresetName.optional(),
42+
});
43+
44+
export type MachineConfig = z.infer<typeof MachineConfig>;
45+
46+
export const MachinePreset = z.object({
47+
name: MachinePresetName,
48+
cpu: z.number(),
49+
memory: z.number(),
50+
centsPerMs: z.number(),
51+
});
52+
53+
export type MachinePreset = z.infer<typeof MachinePreset>;
54+
355
export const TaskRunBuiltInError = z.object({
456
type: z.literal("BUILT_IN_ERROR"),
557
name: z.string(),
@@ -77,7 +129,11 @@ export const TaskRun = z.object({
77129
tags: z.array(z.string()),
78130
isTest: z.boolean().default(false),
79131
createdAt: z.coerce.date(),
132+
startedAt: z.coerce.date(),
80133
idempotencyKey: z.string().optional(),
134+
durationMs: z.number().default(0),
135+
costInCents: z.number().default(0),
136+
baseCostInCents: z.number().default(0),
81137
});
82138

83139
export type TaskRun = z.infer<typeof TaskRun>;
@@ -146,6 +202,7 @@ export const TaskRunExecution = z.object({
146202
organization: TaskRunExecutionOrganization,
147203
project: TaskRunExecutionProject,
148204
batch: TaskRunExecutionBatch.optional(),
205+
machine: MachinePreset.optional(),
149206
});
150207

151208
export type TaskRunExecution = z.infer<typeof TaskRunExecution>;
@@ -162,6 +219,7 @@ export const TaskRunContext = z.object({
162219
organization: TaskRunExecutionOrganization,
163220
project: TaskRunExecutionProject,
164221
batch: TaskRunExecutionBatch.optional(),
222+
machine: MachinePreset.optional(),
165223
});
166224

167225
export type TaskRunContext = z.infer<typeof TaskRunContext>;

packages/core/src/v3/schemas/messages.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import { z } from "zod";
22
import {
3+
MachinePreset,
34
TaskRunExecution,
45
TaskRunExecutionResult,
56
TaskRunFailedExecutionResult,
6-
TaskRunExecutionUsage,
77
} from "./common";
88
import {
99
EnvironmentType,
10-
MachinePreset,
1110
ProdTaskRunExecution,
1211
ProdTaskRunExecutionPayload,
1312
TaskMetadataWithFilePath,

packages/core/src/v3/schemas/resources.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { z } from "zod";
2-
import { QueueOptions, RetryOptions, MachineConfig } from "./schemas";
2+
import { QueueOptions, RetryOptions } from "./schemas";
3+
import { MachineConfig } from "./common";
34

45
export const TaskResource = z.object({
56
id: z.string(),

packages/core/src/v3/schemas/schemas.ts

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,13 @@
11
import { z } from "zod";
22
import { RequireKeys } from "../types";
3-
import { TaskRunExecution } from "./common";
3+
import { MachineConfig, MachinePreset, TaskRunExecution } from "./common";
44

55
/*
66
WARNING: Never import anything from ./messages here. If it's needed in both, put it here instead.
77
*/
8-
98
export const EnvironmentType = z.enum(["PRODUCTION", "STAGING", "DEVELOPMENT", "PREVIEW"]);
109
export type EnvironmentType = z.infer<typeof EnvironmentType>;
1110

12-
// Defaults to 0.5
13-
export const MachineCpu = z.union([
14-
z.literal(0.25),
15-
z.literal(0.5),
16-
z.literal(1),
17-
z.literal(2),
18-
z.literal(4),
19-
]);
20-
21-
export type MachineCpu = z.infer<typeof MachineCpu>;
22-
23-
// Defaults to 1
24-
export const MachineMemory = z.union([
25-
z.literal(0.25),
26-
z.literal(0.5),
27-
z.literal(1),
28-
z.literal(2),
29-
z.literal(4),
30-
z.literal(8),
31-
]);
32-
33-
export type MachineMemory = z.infer<typeof MachineMemory>;
34-
35-
// Default is small-1x
36-
export const MachinePresetName = z.enum([
37-
"micro",
38-
"small-1x",
39-
"small-2x",
40-
"medium-1x",
41-
"medium-2x",
42-
"large-1x",
43-
]);
44-
45-
export type MachinePresetName = z.infer<typeof MachinePresetName>;
46-
47-
export const MachineConfig = z.object({
48-
cpu: MachineCpu.optional(),
49-
memory: MachineMemory.optional(),
50-
preset: MachinePresetName.optional(),
51-
});
52-
53-
export type MachineConfig = z.infer<typeof MachineConfig>;
54-
55-
export const MachinePreset = z.object({
56-
name: MachinePresetName,
57-
cpu: z.number(),
58-
memory: z.number(),
59-
centsPerMs: z.number(),
60-
});
61-
62-
export type MachinePreset = z.infer<typeof MachinePreset>;
63-
6411
export const TaskRunExecutionPayload = z.object({
6512
execution: TaskRunExecution,
6613
traceContext: z.record(z.unknown()),

packages/core/src/v3/semanticInternalAttributes.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ export const SemanticInternalAttributes = {
1818
TASK_EXPORT_NAME: "ctx.task.exportName",
1919
QUEUE_NAME: "ctx.queue.name",
2020
QUEUE_ID: "ctx.queue.id",
21+
MACHINE_PRESET_NAME: "ctx.machine.name",
22+
MACHINE_PRESET_CPU: "ctx.machine.cpu",
23+
MACHINE_PRESET_MEMORY: "ctx.machine.memory",
24+
MACHINE_PRESET_CENTS_PER_MS: "ctx.machine.centsPerMs",
2125
SPAN_PARTIAL: "$span.partial",
2226
SPAN_ID: "$span.span_id",
2327
OUTPUT: "$output",

packages/core/src/v3/taskContext/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ export class TaskContextAPI {
7575
[SemanticInternalAttributes.ORGANIZATION_NAME]: this.ctx.organization.name,
7676
[SemanticInternalAttributes.BATCH_ID]: this.ctx.batch?.id,
7777
[SemanticInternalAttributes.IDEMPOTENCY_KEY]: this.ctx.run.idempotencyKey,
78+
[SemanticInternalAttributes.MACHINE_PRESET_NAME]: this.ctx.machine?.name,
79+
[SemanticInternalAttributes.MACHINE_PRESET_CPU]: this.ctx.machine?.cpu,
80+
[SemanticInternalAttributes.MACHINE_PRESET_MEMORY]: this.ctx.machine?.memory,
81+
[SemanticInternalAttributes.MACHINE_PRESET_CENTS_PER_MS]: this.ctx.machine?.centsPerMs,
7882
};
7983
}
8084

packages/core/src/v3/usage/api.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ export class UsageAPI implements UsageManager {
4040
return this.#getUsageManager().pauseAsync(cb);
4141
}
4242

43+
public sample(): UsageSample | undefined {
44+
return this.#getUsageManager().sample();
45+
}
46+
4347
#getUsageManager(): UsageManager {
4448
return getGlobal(API_NAME) ?? NOOP_USAGE_MANAGER;
4549
}

packages/core/src/v3/usage/devUsageManager.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,26 @@ class DevUsageMeasurement implements UsageMeasurement {
4242
}
4343

4444
export class DevUsageManager implements UsageManager {
45+
private _firstMeasurement?: DevUsageMeasurement;
4546
private _currentMeasurements: Map<string, DevUsageMeasurement> = new Map();
4647
private _pauses: Map<string, { start: ClockTime; end?: ClockTime }> = new Map();
4748

4849
disable(): void {}
4950

51+
sample(): UsageSample | undefined {
52+
return this._firstMeasurement?.sample();
53+
}
54+
5055
start(): DevUsageMeasurement {
5156
// generate a random ID
5257
const id = generateRandomString();
5358

5459
const measurement = new DevUsageMeasurement(id);
5560

61+
if (!this._firstMeasurement) {
62+
this._firstMeasurement = measurement;
63+
}
64+
5665
this._currentMeasurements.set(id, measurement);
5766

5867
return measurement;

packages/core/src/v3/usage/noopUsageManager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,8 @@ export class NoopUsageManager implements UsageManager {
1818
pauseAsync<T>(cb: () => Promise<T>): Promise<T> {
1919
return cb();
2020
}
21+
22+
sample(): UsageSample | undefined {
23+
return undefined;
24+
}
2125
}

packages/core/src/v3/usage/prodUsageManager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ export class ProdUsageManager implements UsageManager {
3232
this._abortController?.abort();
3333
}
3434

35+
sample(): UsageSample | undefined {
36+
return this._measurement?.sample();
37+
}
38+
3539
start(): UsageMeasurement {
3640
if (!this.isReportingEnabled || !this.options.heartbeatIntervalMs) {
3741
return this.delegageUsageManager.start();

packages/core/src/v3/usage/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ export interface UsageManager {
1111
disable(): void;
1212
start(): UsageMeasurement;
1313
stop(measurement: UsageMeasurement): UsageSample;
14+
sample(): UsageSample | undefined;
1415
pauseAsync<T>(cb: () => Promise<T>): Promise<T>;
1516
}

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
} from "../utils/ioSerialization";
2525
import { calculateNextRetryDelay } from "../utils/retries";
2626
import { accessoryAttributes } from "../utils/styleAttributes";
27+
import { UsageMeasurement } from "../usage/types";
2728

2829
export type TaskExecutorOptions = {
2930
tracingSDK: TracingSDK;
@@ -57,7 +58,8 @@ export class TaskExecutor {
5758
async execute(
5859
execution: TaskRunExecution,
5960
worker: BackgroundWorkerProperties,
60-
traceContext: Record<string, unknown>
61+
traceContext: Record<string, unknown>,
62+
usage: UsageMeasurement
6163
): Promise<{ result: TaskRunExecutionResult }> {
6264
const ctx = TaskRunContext.parse(execution);
6365
const attemptMessage = `Attempt ${execution.attempt.number}`;

0 commit comments

Comments
 (0)