Skip to content

Commit 259c905

Browse files
committed
WIL realtime concurrency tracking
1 parent 686271c commit 259c905

File tree

7 files changed

+361
-76
lines changed

7 files changed

+361
-76
lines changed

apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { useTypedMatchesData } from "~/hooks/useTypedMatchData";
1111
import { useUser } from "~/hooks/useUser";
1212
import { OrganizationsPresenter } from "~/presenters/OrganizationsPresenter.server";
1313
import { getImpersonationId } from "~/services/impersonation.server";
14-
import { getCurrentPlan, getUsage } from "~/services/platform.v3.server";
14+
import { getCachedUsage, getCurrentPlan, getUsage } from "~/services/platform.v3.server";
1515
import { requireUserId } from "~/services/session.server";
1616
import { telemetry } from "~/services/telemetry.server";
1717
import { organizationPath } from "~/utils/pathBuilder";
@@ -30,6 +30,24 @@ export function useCurrentPlan(matches?: UIMatch[]) {
3030
return data?.currentPlan;
3131
}
3232

33+
export const shouldRevalidate: ShouldRevalidateFunction = (params) => {
34+
const { currentParams, nextParams } = params;
35+
36+
const current = ParamsSchema.safeParse(currentParams);
37+
const next = ParamsSchema.safeParse(nextParams);
38+
39+
if (current.success && next.success) {
40+
if (current.data.organizationSlug !== next.data.organizationSlug) {
41+
return true;
42+
}
43+
if (current.data.projectParam !== next.data.projectParam) {
44+
return true;
45+
}
46+
}
47+
48+
return params.currentUrl.pathname !== params.nextUrl.pathname;
49+
};
50+
3351
export const loader = async ({ request, params }: LoaderFunctionArgs) => {
3452
const userId = await requireUserId(request);
3553
const impersonationId = await getImpersonationId(request);
@@ -51,11 +69,14 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
5169
const firstDayOfMonth = new Date();
5270
firstDayOfMonth.setUTCDate(1);
5371
firstDayOfMonth.setUTCHours(0, 0, 0, 0);
54-
const tomorrow = new Date();
55-
tomorrow.setUTCDate(tomorrow.getDate() + 1);
72+
const firstDayOfNextMonth = new Date();
73+
firstDayOfNextMonth.setUTCMonth(firstDayOfNextMonth.getUTCMonth() + 1);
74+
firstDayOfNextMonth.setUTCDate(1);
75+
firstDayOfNextMonth.setUTCHours(0, 0, 0, 0);
76+
5677
const [plan, usage] = await Promise.all([
5778
getCurrentPlan(organization.id),
58-
getUsage(organization.id, { from: firstDayOfMonth, to: tomorrow }),
79+
getCachedUsage(organization.id, { from: firstDayOfMonth, to: firstDayOfNextMonth }),
5980
]);
6081

6182
let hasExceededFreeTier = false;
@@ -104,23 +125,3 @@ export function ErrorBoundary() {
104125
<RouteErrorDisplay button={{ title: "Home", to: "/" }} />
105126
);
106127
}
107-
108-
export const shouldRevalidate: ShouldRevalidateFunction = ({
109-
defaultShouldRevalidate,
110-
currentParams,
111-
nextParams,
112-
}) => {
113-
const current = ParamsSchema.safeParse(currentParams);
114-
const next = ParamsSchema.safeParse(nextParams);
115-
116-
if (current.success && next.success) {
117-
if (current.data.organizationSlug !== next.data.organizationSlug) {
118-
return true;
119-
}
120-
if (current.data.projectParam !== next.data.projectParam) {
121-
return true;
122-
}
123-
}
124-
125-
return defaultShouldRevalidate;
126-
};

apps/webapp/app/routes/realtime.v1.batches.$batchId.ts

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4-
import { env } from "~/env.server";
54
import { permittedToReadBatch } from "~/services/accessControl.server";
65
import { authenticateApiRequest } from "~/services/apiAuth.server";
76
import { logger } from "~/services/logger.server";
7+
import { realtimeClient } from "~/services/realtimeClient.server";
88
import { makeApiCors } from "~/utils/apiCors";
9-
import { longPollingFetch } from "~/utils/longPollingFetch";
109

1110
const ParamsSchema = z.object({
1211
batchId: z.string(),
@@ -53,17 +52,11 @@ export async function loader({ request, params }: ActionFunctionArgs) {
5352
return apiCors(json({ error: "Batch Run not found" }, { status: 404 }));
5453
}
5554

56-
const url = new URL(request.url);
57-
const originUrl = new URL(`${env.ELECTRIC_ORIGIN}/v1/shape/public."TaskRun"`);
58-
url.searchParams.forEach((value, key) => {
59-
originUrl.searchParams.set(key, value);
60-
});
61-
62-
originUrl.searchParams.set("where", `"batchId"='${batchRun.id}'`);
63-
64-
const finalUrl = originUrl.toString();
65-
66-
return longPollingFetch(finalUrl);
55+
return realtimeClient.streamRunsWhere(
56+
request.url,
57+
authenticationResult.environment,
58+
`"batchId"='${batchRun.id}'`
59+
);
6760
} catch (error) {
6861
if (error instanceof Response) {
6962
// Error responses from longPollingFetch

apps/webapp/app/routes/realtime.v1.runs.$runId.ts

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4-
import { env } from "~/env.server";
54
import { permittedToReadRun } from "~/services/accessControl.server";
65
import { authenticateApiRequest } from "~/services/apiAuth.server";
76
import { logger } from "~/services/logger.server";
7+
import { realtimeClient } from "~/services/realtimeClient.server";
88
import { makeApiCors } from "~/utils/apiCors";
9-
import { longPollingFetch } from "~/utils/longPollingFetch";
109

1110
const ParamsSchema = z.object({
1211
runId: z.string(),
@@ -53,17 +52,12 @@ export async function loader({ request, params }: ActionFunctionArgs) {
5352
return apiCors(json({ error: "Task Run not found" }, { status: 404 }));
5453
}
5554

56-
const url = new URL(request.url);
57-
const originUrl = new URL(`${env.ELECTRIC_ORIGIN}/v1/shape/public."TaskRun"`);
58-
url.searchParams.forEach((value, key) => {
59-
originUrl.searchParams.set(key, value);
60-
});
61-
62-
originUrl.searchParams.set("where", `"id"='${run.id}'`);
63-
64-
const finalUrl = originUrl.toString();
65-
66-
return longPollingFetch(finalUrl);
55+
return realtimeClient.streamRunsWhere(
56+
request.url,
57+
authenticationResult.environment,
58+
`"id"='${run.id}'`,
59+
apiCors
60+
);
6761
} catch (error) {
6862
if (error instanceof Response) {
6963
// Error responses from longPollingFetch

apps/webapp/app/services/apiRateLimit.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import { DefaultStatefulContext, Namespace, Cache as UnkeyCache, createCache } from "@unkey/cache";
1+
import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } from "@unkey/cache";
22
import { MemoryStore } from "@unkey/cache/stores";
33
import { Ratelimit } from "@upstash/ratelimit";
44
import { Request as ExpressRequest, Response as ExpressResponse, NextFunction } from "express";
55
import { RedisOptions } from "ioredis";
66
import { createHash } from "node:crypto";
77
import { z } from "zod";
88
import { env } from "~/env.server";
9-
import { authenticateApiKey, authenticateAuthorizationHeader } from "./apiAuth.server";
9+
import { authenticateAuthorizationHeader } from "./apiAuth.server";
1010
import { logger } from "./logger.server";
1111
import { createRedisRateLimitClient, Duration, RateLimiter } from "./rateLimiter.server";
1212
import { RedisCacheStore } from "./unkey/redisCacheStore.server";
@@ -153,6 +153,7 @@ export function authorizationRateLimitMiddleware({
153153
},
154154
});
155155

156+
// This cache holds the rate limit configuration for each org, so we don't have to fetch it every request
156157
const cache = createCache({
157158
limiter: new Namespace<RateLimiterConfig>(ctx, {
158159
stores: [memory, redisCacheStore],

apps/webapp/app/services/platform.v3.server.ts

Lines changed: 93 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,75 @@
1-
import { BillingClient, Limits, SetPlanBody, UsageSeriesParams } from "@trigger.dev/platform/v3";
21
import { Organization, Project } from "@trigger.dev/database";
2+
import {
3+
BillingClient,
4+
Limits,
5+
SetPlanBody,
6+
UsageSeriesParams,
7+
UsageResult,
8+
} from "@trigger.dev/platform/v3";
9+
import { createCache, DefaultStatefulContext, Namespace } from "@unkey/cache";
10+
import { MemoryStore } from "@unkey/cache/stores";
311
import { redirect } from "remix-typedjson";
412
import { $replica } from "~/db.server";
513
import { env } from "~/env.server";
614
import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/message.server";
715
import { createEnvironment } from "~/models/organization.server";
816
import { logger } from "~/services/logger.server";
917
import { newProjectPath, organizationBillingPath } from "~/utils/pathBuilder";
18+
import { singleton } from "~/utils/singleton";
19+
import { RedisCacheStore } from "./unkey/redisCacheStore.server";
20+
21+
function initializeClient() {
22+
if (isCloud() && process.env.BILLING_API_URL && process.env.BILLING_API_KEY) {
23+
const client = new BillingClient({
24+
url: process.env.BILLING_API_URL,
25+
apiKey: process.env.BILLING_API_KEY,
26+
});
27+
console.log(`🤑 Billing client initialized: ${process.env.BILLING_API_URL}`);
28+
return client;
29+
} else {
30+
console.log(`🤑 Billing client not initialized`);
31+
}
32+
}
33+
34+
const client = singleton("billingClient", initializeClient);
35+
36+
function initializePlatformCache() {
37+
const ctx = new DefaultStatefulContext();
38+
const memory = new MemoryStore({ persistentMap: new Map() });
39+
const redisCacheStore = new RedisCacheStore({
40+
connection: {
41+
keyPrefix: `cache:platform:v3:`,
42+
port: env.REDIS_PORT,
43+
host: env.REDIS_HOST,
44+
username: env.REDIS_USERNAME,
45+
password: env.REDIS_PASSWORD,
46+
enableAutoPipelining: true,
47+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
48+
},
49+
});
50+
51+
// This cache holds the limits fetched from the platform service
52+
const cache = createCache({
53+
limits: new Namespace<number>(ctx, {
54+
stores: [memory, redisCacheStore],
55+
fresh: 60_000 * 5, // 5 minutes
56+
stale: 60_000 * 10, // 10 minutes
57+
}),
58+
usage: new Namespace<UsageResult>(ctx, {
59+
stores: [memory, redisCacheStore],
60+
fresh: 60_000 * 5, // 5 minutes
61+
stale: 60_000 * 10, // 10 minutes
62+
}),
63+
});
64+
65+
return cache;
66+
}
67+
68+
const platformCache = singleton("platformCache", initializePlatformCache);
1069

1170
export async function getCurrentPlan(orgId: string) {
12-
const client = getClient();
1371
if (!client) return undefined;
72+
1473
try {
1574
const result = await client.currentPlan(orgId);
1675

@@ -60,8 +119,8 @@ export async function getCurrentPlan(orgId: string) {
60119
}
61120

62121
export async function getLimits(orgId: string) {
63-
const client = getClient();
64122
if (!client) return undefined;
123+
65124
try {
66125
const result = await client.currentPlan(orgId);
67126
if (!result.success) {
@@ -87,9 +146,15 @@ export async function getLimit(orgId: string, limit: keyof Limits, fallback: num
87146
return fallback;
88147
}
89148

149+
export async function getCachedLimit(orgId: string, limit: keyof Limits, fallback: number) {
150+
return platformCache.limits.swr(`${orgId}:${limit}`, async () => {
151+
return getLimit(orgId, limit, fallback);
152+
});
153+
}
154+
90155
export async function customerPortalUrl(orgId: string, orgSlug: string) {
91-
const client = getClient();
92156
if (!client) return undefined;
157+
93158
try {
94159
return client.createPortalSession(orgId, {
95160
returnUrl: `${env.APP_ORIGIN}${organizationBillingPath({ slug: orgSlug })}`,
@@ -101,8 +166,8 @@ export async function customerPortalUrl(orgId: string, orgSlug: string) {
101166
}
102167

103168
export async function getPlans() {
104-
const client = getClient();
105169
if (!client) return undefined;
170+
106171
try {
107172
const result = await client.plans();
108173
if (!result.success) {
@@ -122,7 +187,6 @@ export async function setPlan(
122187
callerPath: string,
123188
plan: SetPlanBody
124189
) {
125-
const client = getClient();
126190
if (!client) {
127191
throw redirectWithErrorMessage(callerPath, request, "Error setting plan");
128192
}
@@ -178,8 +242,8 @@ export async function setPlan(
178242
}
179243

180244
export async function getUsage(organizationId: string, { from, to }: { from: Date; to: Date }) {
181-
const client = getClient();
182245
if (!client) return undefined;
246+
183247
try {
184248
const result = await client.usage(organizationId, { from, to });
185249
if (!result.success) {
@@ -193,9 +257,27 @@ export async function getUsage(organizationId: string, { from, to }: { from: Dat
193257
}
194258
}
195259

260+
export async function getCachedUsage(
261+
organizationId: string,
262+
{ from, to }: { from: Date; to: Date }
263+
) {
264+
if (!client) return undefined;
265+
266+
const result = await platformCache.usage.swr(
267+
`${organizationId}:${from.toISOString()}:${to.toISOString()}`,
268+
async () => {
269+
const usageResponse = await getUsage(organizationId, { from, to });
270+
271+
return usageResponse;
272+
}
273+
);
274+
275+
return result.val;
276+
}
277+
196278
export async function getUsageSeries(organizationId: string, params: UsageSeriesParams) {
197-
const client = getClient();
198279
if (!client) return undefined;
280+
199281
try {
200282
const result = await client.usageSeries(organizationId, params);
201283
if (!result.success) {
@@ -214,8 +296,8 @@ export async function reportInvocationUsage(
214296
costInCents: number,
215297
additionalData?: Record<string, any>
216298
) {
217-
const client = getClient();
218299
if (!client) return undefined;
300+
219301
try {
220302
const result = await client.reportInvocationUsage({
221303
organizationId,
@@ -234,8 +316,8 @@ export async function reportInvocationUsage(
234316
}
235317

236318
export async function reportComputeUsage(request: Request) {
237-
const client = getClient();
238319
if (!client) return undefined;
320+
239321
return fetch(`${process.env.BILLING_API_URL}/api/v1/usage/ingest/compute`, {
240322
method: "POST",
241323
headers: request.headers,
@@ -244,8 +326,8 @@ export async function reportComputeUsage(request: Request) {
244326
}
245327

246328
export async function getEntitlement(organizationId: string) {
247-
const client = getClient();
248329
if (!client) return undefined;
330+
249331
try {
250332
const result = await client.getEntitlement(organizationId);
251333
if (!result.success) {
@@ -275,19 +357,6 @@ export async function projectCreated(organization: Organization, project: Projec
275357
}
276358
}
277359

278-
function getClient() {
279-
if (isCloud() && process.env.BILLING_API_URL && process.env.BILLING_API_KEY) {
280-
const client = new BillingClient({
281-
url: process.env.BILLING_API_URL,
282-
apiKey: process.env.BILLING_API_KEY,
283-
});
284-
console.log(`Billing client initialized: ${process.env.BILLING_API_URL}`);
285-
return client;
286-
} else {
287-
console.log(`Billing client not initialized`);
288-
}
289-
}
290-
291360
function isCloud(): boolean {
292361
const acceptableHosts = [
293362
"https://cloud.trigger.dev",

0 commit comments

Comments
 (0)