Skip to content

Commit af7074d

Browse files
committed
Implement test for realtime client using testcontainers
also updated electric to latest version
1 parent 259c905 commit af7074d

File tree

21 files changed

+412
-129
lines changed

21 files changed

+412
-129
lines changed

.vscode/extensions.json

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
11
{
2-
"recommendations": [
3-
"denoland.vscode-deno"
4-
],
5-
"unwantedRecommendations": [
6-
7-
]
2+
"recommendations": ["bierner.comment-tagged-templates"],
3+
"unwantedRecommendations": []
84
}

apps/webapp/app/routes/_app.orgs.$organizationSlug/route.tsx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,12 @@ export const shouldRevalidate: ShouldRevalidateFunction = (params) => {
4545
}
4646
}
4747

48+
// This prevents revalidation when there are search params changes
49+
// IMPORTANT: If the loader function depends on search params, this should be updated
4850
return params.currentUrl.pathname !== params.nextUrl.pathname;
4951
};
5052

53+
// IMPORTANT: Make sure to update shouldRevalidate if this loader depends on search params
5154
export const loader = async ({ request, params }: LoaderFunctionArgs) => {
5255
const userId = await requireUserId(request);
5356
const impersonationId = await getImpersonationId(request);
@@ -69,6 +72,9 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
6972
const firstDayOfMonth = new Date();
7073
firstDayOfMonth.setUTCDate(1);
7174
firstDayOfMonth.setUTCHours(0, 0, 0, 0);
75+
76+
// Using the 1st day of next month means we get the usage for the current month
77+
// and the cache key for getCachedUsage is stable over the month
7278
const firstDayOfNextMonth = new Date();
7379
firstDayOfNextMonth.setUTCMonth(firstDayOfNextMonth.getUTCMonth() + 1);
7480
firstDayOfNextMonth.setUTCDate(1);

apps/webapp/app/routes/realtime.v1.batches.$batchId.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { $replica } from "~/db.server";
44
import { permittedToReadBatch } from "~/services/accessControl.server";
55
import { authenticateApiRequest } from "~/services/apiAuth.server";
66
import { logger } from "~/services/logger.server";
7-
import { realtimeClient } from "~/services/realtimeClient.server";
7+
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
88
import { makeApiCors } from "~/utils/apiCors";
99

1010
const ParamsSchema = z.object({

apps/webapp/app/routes/realtime.v1.runs.$runId.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { $replica } from "~/db.server";
44
import { permittedToReadRun } from "~/services/accessControl.server";
55
import { authenticateApiRequest } from "~/services/apiAuth.server";
66
import { logger } from "~/services/logger.server";
7-
import { realtimeClient } from "~/services/realtimeClient.server";
7+
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
88
import { makeApiCors } from "~/utils/apiCors";
99

1010
const ParamsSchema = z.object({

apps/webapp/app/services/platform.v3.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ function initializePlatformCache() {
3838
const memory = new MemoryStore({ persistentMap: new Map() });
3939
const redisCacheStore = new RedisCacheStore({
4040
connection: {
41-
keyPrefix: `cache:platform:v3:`,
41+
keyPrefix: "tr:cache:platform:v3",
4242
port: env.REDIS_PORT,
4343
host: env.REDIS_HOST,
4444
username: env.REDIS_USERNAME,
Lines changed: 56 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,47 @@
1+
import { json } from "@remix-run/server-runtime";
12
import Redis, { Callback, Result, type RedisOptions } from "ioredis";
3+
import { randomUUID } from "node:crypto";
24
import { longPollingFetch } from "~/utils/longPollingFetch";
3-
import { getCachedLimit } from "./platform.v3.server";
45
import { logger } from "./logger.server";
5-
import { AuthenticatedEnvironment } from "./apiAuth.server";
6-
import { json } from "@remix-run/server-runtime";
7-
import { env } from "~/env.server";
8-
import { singleton } from "~/utils/singleton";
6+
7+
export interface CachedLimitProvider {
8+
getCachedLimit: (organizationId: string, defaultValue: number) => Promise<number | undefined>;
9+
}
910

1011
export type RealtimeClientOptions = {
1112
electricOrigin: string;
1213
redis: RedisOptions;
14+
cachedLimitProvider: CachedLimitProvider;
1315
keyPrefix: string;
14-
expiryTime?: number;
16+
expiryTimeInSeconds?: number;
17+
};
18+
19+
export type RealtimeEnvironment = {
20+
id: string;
21+
organizationId: string;
1522
};
1623

1724
export class RealtimeClient {
1825
private redis: Redis;
19-
private expiryTime: number;
26+
private expiryTimeInSeconds: number;
27+
private cachedLimitProvider: CachedLimitProvider;
2028

2129
constructor(private options: RealtimeClientOptions) {
2230
this.redis = new Redis(options.redis);
23-
this.expiryTime = options.expiryTime ?? 3600; // default to 1 hour
31+
this.expiryTimeInSeconds = options.expiryTimeInSeconds ?? 60 * 5; // default to 5 minutes
32+
this.cachedLimitProvider = options.cachedLimitProvider;
2433
this.#registerCommands();
2534
}
2635

2736
async streamRunsWhere(
2837
url: URL | string,
29-
authenticatedEnv: AuthenticatedEnvironment,
38+
environment: RealtimeEnvironment,
3039
whereClause: string,
3140
responseWrapper?: (response: Response) => Promise<Response>
3241
) {
3342
const electricUrl = this.#constructElectricUrl(url, whereClause);
3443

35-
return this.#performElectricRequest(electricUrl, authenticatedEnv, responseWrapper);
44+
return this.#performElectricRequest(electricUrl, environment, responseWrapper);
3645
}
3746

3847
#constructElectricUrl(url: URL | string, whereClause: string): URL {
@@ -51,7 +60,7 @@ export class RealtimeClient {
5160

5261
async #performElectricRequest(
5362
url: URL,
54-
authenticatedEnv: AuthenticatedEnvironment,
63+
environment: RealtimeEnvironment,
5564
responseWrapper: (response: Response) => Promise<Response> = (r) => Promise.resolve(r)
5665
) {
5766
const shapeId = extractShapeId(url);
@@ -67,40 +76,40 @@ export class RealtimeClient {
6776
return longPollingFetch(url.toString());
6877
}
6978

79+
const requestId = randomUUID();
80+
7081
// We now need to wrap the longPollingFetch in a concurrency tracker
71-
const concurrencyLimitResult = await getCachedLimit(
72-
authenticatedEnv.organizationId,
73-
"realtimeConcurrentConnections",
82+
const concurrencyLimit = await this.cachedLimitProvider.getCachedLimit(
83+
environment.organizationId,
7484
100_000
7585
);
7686

77-
if (!concurrencyLimitResult.val) {
87+
if (!concurrencyLimit) {
7888
logger.error("Failed to get concurrency limit", {
79-
organizationId: authenticatedEnv.organizationId,
80-
concurrencyLimitResult,
89+
organizationId: environment.organizationId,
8190
});
8291

8392
return responseWrapper(json({ error: "Failed to get concurrency limit" }, { status: 500 }));
8493
}
8594

86-
const concurrencyLimit = concurrencyLimitResult.val;
87-
8895
logger.debug("[realtimeClient] increment and check", {
8996
concurrencyLimit,
9097
shapeId,
91-
authenticatedEnv: {
92-
id: authenticatedEnv.id,
93-
organizationId: authenticatedEnv.organizationId,
98+
requestId,
99+
environment: {
100+
id: environment.id,
101+
organizationId: environment.organizationId,
94102
},
95103
});
96104

97-
const canProceed = await this.#incrementAndCheck(
98-
authenticatedEnv.id,
99-
shapeId,
100-
concurrencyLimit
101-
);
105+
const canProceed = await this.#incrementAndCheck(environment.id, requestId, concurrencyLimit);
102106

103107
if (!canProceed) {
108+
logger.debug("[realtimeClient] too many concurrent requests", {
109+
requestId,
110+
environmentId: environment.id,
111+
});
112+
104113
return responseWrapper(json({ error: "Too many concurrent requests" }, { status: 429 }));
105114
}
106115

@@ -109,41 +118,42 @@ export class RealtimeClient {
109118
const response = await longPollingFetch(url.toString());
110119

111120
// Decrement the counter after the long polling request is complete
112-
await this.#decrementConcurrency(authenticatedEnv.id, shapeId);
121+
await this.#decrementConcurrency(environment.id, requestId);
113122

114123
return response;
115124
} catch (error) {
116125
// Decrement the counter if the request fails
117-
await this.#decrementConcurrency(authenticatedEnv.id, shapeId);
126+
await this.#decrementConcurrency(environment.id, requestId);
118127

119128
throw error;
120129
}
121130
}
122131

123-
async #incrementAndCheck(environmentId: string, shapeId: string, limit: number) {
132+
async #incrementAndCheck(environmentId: string, requestId: string, limit: number) {
124133
const key = this.#getKey(environmentId);
125-
const now = Date.now().toString();
134+
const now = Date.now();
126135

127136
const result = await this.redis.incrementAndCheckConcurrency(
128137
key,
129-
now,
130-
shapeId,
131-
this.expiryTime.toString(),
138+
now.toString(),
139+
requestId,
140+
this.expiryTimeInSeconds.toString(), // expiry time
141+
(now - this.expiryTimeInSeconds * 1000).toString(), // cutoff time
132142
limit.toString()
133143
);
134144

135145
return result === 1;
136146
}
137147

138-
async #decrementConcurrency(environmentId: string, shapeId: string) {
148+
async #decrementConcurrency(environmentId: string, requestId: string) {
139149
logger.debug("[realtimeClient] decrement", {
140-
shapeId,
150+
requestId,
141151
environmentId,
142152
});
143153

144154
const key = this.#getKey(environmentId);
145155

146-
await this.redis.zrem(key, shapeId);
156+
await this.redis.zrem(key, requestId);
147157
}
148158

149159
#getKey(environmentId: string): string {
@@ -153,13 +163,17 @@ export class RealtimeClient {
153163
#registerCommands() {
154164
this.redis.defineCommand("incrementAndCheckConcurrency", {
155165
numberOfKeys: 1,
156-
lua: `
166+
lua: /* lua */ `
157167
local concurrencyKey = KEYS[1]
158168
159-
local timestamp = ARGV[1]
169+
local timestamp = tonumber(ARGV[1])
160170
local requestId = ARGV[2]
161-
local expiryTime = ARGV[3]
162-
local limit = tonumber(ARGV[4])
171+
local expiryTime = tonumber(ARGV[3])
172+
local cutoffTime = tonumber(ARGV[4])
173+
local limit = tonumber(ARGV[5])
174+
175+
-- Remove expired entries
176+
redis.call('ZREMRANGEBYSCORE', concurrencyKey, '-inf', cutoffTime)
163177
164178
-- Add the new request to the sorted set
165179
redis.call('ZADD', concurrencyKey, timestamp, requestId)
@@ -199,25 +213,9 @@ declare module "ioredis" {
199213
timestamp: string,
200214
requestId: string,
201215
expiryTime: string,
216+
cutoffTime: string,
202217
limit: string,
203218
callback?: Callback<number>
204219
): Result<number, Context>;
205220
}
206221
}
207-
208-
function initializeRealtimeClient() {
209-
return new RealtimeClient({
210-
electricOrigin: env.ELECTRIC_ORIGIN,
211-
keyPrefix: `tr:realtime:concurrency`,
212-
redis: {
213-
port: env.REDIS_PORT,
214-
host: env.REDIS_HOST,
215-
username: env.REDIS_USERNAME,
216-
password: env.REDIS_PASSWORD,
217-
enableAutoPipelining: true,
218-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
219-
},
220-
});
221-
}
222-
223-
export const realtimeClient = singleton("realtimeClient", initializeRealtimeClient);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { env } from "~/env.server";
2+
import { singleton } from "~/utils/singleton";
3+
import { RealtimeClient } from "./realtimeClient.server";
4+
import { getCachedLimit } from "./platform.v3.server";
5+
6+
function initializeRealtimeClient() {
7+
return new RealtimeClient({
8+
electricOrigin: env.ELECTRIC_ORIGIN,
9+
keyPrefix: "tr:realtime:concurrency",
10+
redis: {
11+
port: env.REDIS_PORT,
12+
host: env.REDIS_HOST,
13+
username: env.REDIS_USERNAME,
14+
password: env.REDIS_PASSWORD,
15+
enableAutoPipelining: true,
16+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
17+
},
18+
cachedLimitProvider: {
19+
async getCachedLimit(organizationId, defaultValue) {
20+
const result = await getCachedLimit(
21+
organizationId,
22+
"realtimeConcurrentConnections",
23+
defaultValue
24+
);
25+
26+
return result.val;
27+
},
28+
},
29+
});
30+
}
31+
32+
export const realtimeClient = singleton("realtimeClient", initializeRealtimeClient);

apps/webapp/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@
185185
"zod-validation-error": "^1.5.0"
186186
},
187187
"devDependencies": {
188+
"@internal/testcontainers": "workspace:*",
188189
"@remix-run/dev": "2.1.0",
189190
"@remix-run/eslint-config": "2.1.0",
190191
"@remix-run/testing": "^2.1.0",
@@ -243,9 +244,10 @@
243244
"ts-node": "^10.7.0",
244245
"tsconfig-paths": "^3.14.1",
245246
"typescript": "^5.1.6",
247+
"vite-tsconfig-paths": "^4.0.5",
246248
"vitest": "^1.4.0"
247249
},
248250
"engines": {
249251
"node": ">=16.0.0"
250252
}
251-
}
253+
}

apps/webapp/test/placeholder.test.ts

Lines changed: 0 additions & 5 deletions
This file was deleted.

0 commit comments

Comments
 (0)