Skip to content

Commit bc7bbd4

Browse files
authored
v2 IngestSendEvent rate limit (#1134)
* Easier to create a rate limiter, use it in the ApiRateLimiter. Upgraded the Upstash package * Always prefix any rate limiter in Redis with “ratelimit:” * By default log when the rate limit is hit * Added rate limiting to IngestSendEvent * Log out the EventRecord id * Increase events.deliverScheduled attempts * INGEST_EVENT_RATE_LIMIT_MAX is optional * Removed old API rate limit code * IngestSendEvent rate limiter is optional. Moved outside of the DB transaction * Log a message out when the rate limiter is created * Return undefined if the rate limit has been crossed
1 parent 5fe23e4 commit bc7bbd4

File tree

7 files changed

+164
-80
lines changed

7 files changed

+164
-80
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ const EnvironmentSchema = z.object({
100100
API_RATE_LIMIT_REQUEST_LOGS_ENABLED: z.string().default("0"),
101101
API_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
102102

103+
//Ingesting event rate limit
104+
INGEST_EVENT_RATE_LIMIT_WINDOW: z.string().default("60s"),
105+
INGEST_EVENT_RATE_LIMIT_MAX: z.coerce.number().int().optional(),
106+
103107
//v3
104108
V3_ENABLED: z.string().default("false"),
105109
PROVIDER_SECRET: z.string().default("provider-secret"),

apps/webapp/app/services/apiRateLimit.server.ts

Lines changed: 15 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,21 @@
11
import { Ratelimit } from "@upstash/ratelimit";
22
import { Request as ExpressRequest, Response as ExpressResponse, NextFunction } from "express";
3-
import Redis, { RedisOptions } from "ioredis";
3+
import { RedisOptions } from "ioredis";
44
import { createHash } from "node:crypto";
55
import { env } from "~/env.server";
66
import { logger } from "./logger.server";
7-
8-
function createRedisRateLimitClient(
9-
redisOptions: RedisOptions
10-
): ConstructorParameters<typeof Ratelimit>[0]["redis"] {
11-
const redis = new Redis(redisOptions);
12-
13-
return {
14-
sadd: async <TData>(key: string, ...members: TData[]): Promise<number> => {
15-
return redis.sadd(key, members as (string | number | Buffer)[]);
16-
},
17-
eval: <TArgs extends unknown[], TData = unknown>(
18-
...args: [script: string, keys: string[], args: TArgs]
19-
): Promise<TData> => {
20-
const script = args[0];
21-
const keys = args[1];
22-
const argsArray = args[2];
23-
return redis.eval(
24-
script,
25-
keys.length,
26-
...keys,
27-
...(argsArray as (string | Buffer | number)[])
28-
) as Promise<TData>;
29-
},
30-
};
31-
}
7+
import { Duration, Limiter, RateLimiter, createRedisRateLimitClient } from "./rateLimiter.server";
328

339
type Options = {
10+
redis?: RedisOptions;
11+
keyPrefix: string;
12+
pathMatchers: (RegExp | string)[];
13+
pathWhiteList?: (RegExp | string)[];
14+
limiter: Limiter;
3415
log?: {
3516
requests?: boolean;
3617
rejections?: boolean;
3718
};
38-
redis: RedisOptions;
39-
keyPrefix: string;
40-
pathMatchers: (RegExp | string)[];
41-
pathWhiteList?: (RegExp | string)[];
42-
limiter: ConstructorParameters<typeof Ratelimit>[0]["limiter"];
4319
};
4420

4521
//returns an Express middleware that rate limits using the Bearer token in the Authorization header
@@ -54,12 +30,12 @@ export function authorizationRateLimitMiddleware({
5430
requests: true,
5531
},
5632
}: Options) {
57-
const rateLimiter = new Ratelimit({
58-
redis: createRedisRateLimitClient(redis),
59-
limiter: limiter,
60-
ephemeralCache: new Map(),
61-
analytics: false,
62-
prefix: keyPrefix,
33+
const rateLimiter = new RateLimiter({
34+
redis,
35+
keyPrefix,
36+
limiter,
37+
logSuccess: log.requests,
38+
logFailure: log.rejections,
6339
});
6440

6541
return async (req: ExpressRequest, res: ExpressResponse, next: NextFunction) => {
@@ -135,27 +111,9 @@ export function authorizationRateLimitMiddleware({
135111
res.set("x-ratelimit-reset", reset.toString());
136112

137113
if (success) {
138-
if (log.requests) {
139-
logger.info(`RateLimiter (${keyPrefix}): under rate limit`, {
140-
limit,
141-
reset,
142-
remaining,
143-
hashedAuthorizationValue,
144-
});
145-
}
146114
return next();
147115
}
148116

149-
if (log.rejections) {
150-
logger.warn(`RateLimiter (${keyPrefix}): rate limit exceeded`, {
151-
limit,
152-
reset,
153-
remaining,
154-
pending,
155-
hashedAuthorizationValue,
156-
});
157-
}
158-
159117
res.setHeader("Content-Type", "application/problem+json");
160118
const secondsUntilReset = Math.max(0, (reset - new Date().getTime()) / 1000);
161119
return res.status(429).send(
@@ -167,6 +125,7 @@ export function authorizationRateLimitMiddleware({
167125
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
168126
reset,
169127
limit,
128+
remaining,
170129
secondsUntilReset,
171130
error: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
172131
},
@@ -177,18 +136,8 @@ export function authorizationRateLimitMiddleware({
177136
};
178137
}
179138

180-
type Duration = Parameters<typeof Ratelimit.slidingWindow>[1];
181-
182139
export const apiRateLimiter = authorizationRateLimitMiddleware({
183-
keyPrefix: "ratelimit:api",
184-
redis: {
185-
port: env.REDIS_PORT,
186-
host: env.REDIS_HOST,
187-
username: env.REDIS_USERNAME,
188-
password: env.REDIS_PASSWORD,
189-
enableAutoPipelining: true,
190-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
191-
},
140+
keyPrefix: "api",
192141
limiter: Ratelimit.slidingWindow(env.API_RATE_LIMIT_MAX, env.API_RATE_LIMIT_WINDOW as Duration),
193142
pathMatchers: [/^\/api/],
194143
// Allow /api/v1/tasks/:id/callback/:secret

apps/webapp/app/services/events/ingestSendEvent.server.ts

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
44
import { workerQueue } from "~/services/worker.server";
55
import { logger } from "../logger.server";
66
import { EventRecord, ExternalAccount } from "@trigger.dev/database";
7+
import { Duration, RateLimiter } from "../rateLimiter.server";
8+
import { Ratelimit } from "@upstash/ratelimit";
9+
import { env } from "~/env.server";
710

811
type UpdateEventInput = {
912
tx: PrismaClientOrTransaction;
@@ -31,9 +34,19 @@ const EVENT_UPDATE_THRESHOLD_WINDOW_IN_MSECS = 5 * 1000; // 5 seconds
3134

3235
export class IngestSendEvent {
3336
#prismaClient: PrismaClientOrTransaction;
37+
#rateLimiter: RateLimiter | undefined;
3438

3539
constructor(prismaClient: PrismaClientOrTransaction = prisma, private deliverEvents = true) {
3640
this.#prismaClient = prismaClient;
41+
this.#rateLimiter = env.INGEST_EVENT_RATE_LIMIT_MAX
42+
? new RateLimiter({
43+
keyPrefix: "ingestsendevent",
44+
limiter: Ratelimit.slidingWindow(
45+
env.INGEST_EVENT_RATE_LIMIT_MAX,
46+
env.INGEST_EVENT_RATE_LIMIT_WINDOW as Duration
47+
),
48+
})
49+
: undefined;
3750
}
3851

3952
#calculateDeliverAt(options?: SendEventOptions) {
@@ -65,7 +78,7 @@ export class IngestSendEvent {
6578
return;
6679
}
6780

68-
return await $transaction(this.#prismaClient, async (tx) => {
81+
const createdEvent = await $transaction(this.#prismaClient, async (tx) => {
6982
const externalAccount = options?.accountId
7083
? await tx.externalAccount.upsert({
7184
where: {
@@ -106,6 +119,23 @@ export class IngestSendEvent {
106119

107120
return eventLog;
108121
});
122+
123+
if (!createdEvent) return;
124+
125+
//rate limit
126+
const result = await this.#rateLimiter?.limit(environment.organizationId);
127+
if (result && !result.success) {
128+
logger.info("IngestSendEvent: Rate limit exceeded", {
129+
eventRecordId: createdEvent.id,
130+
organizationId: environment.organizationId,
131+
reset: result.reset,
132+
limit: result.limit,
133+
});
134+
return;
135+
}
136+
137+
await this.enqueueWorkerEvent(this.#prismaClient, createdEvent);
138+
return createdEvent;
109139
} catch (error) {
110140
const prismaError = PrismaErrorSchema.safeParse(error);
111141

@@ -151,8 +181,6 @@ export class IngestSendEvent {
151181
},
152182
});
153183

154-
await this.enqueueWorkerEvent(tx, eventLog);
155-
156184
return eventLog;
157185
}
158186

@@ -177,8 +205,6 @@ export class IngestSendEvent {
177205
},
178206
});
179207

180-
await this.enqueueWorkerEvent(tx, updatedEventLog);
181-
182208
return updatedEventLog;
183209
}
184210

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { Ratelimit } from "@upstash/ratelimit";
2+
import Redis, { RedisOptions } from "ioredis";
3+
import { env } from "~/env.server";
4+
import { logger } from "./logger.server";
5+
6+
type Options = {
7+
redis?: RedisOptions;
8+
keyPrefix: string;
9+
limiter: Limiter;
10+
logSuccess?: boolean;
11+
logFailure?: boolean;
12+
};
13+
14+
export type Limiter = ConstructorParameters<typeof Ratelimit>[0]["limiter"];
15+
export type Duration = Parameters<typeof Ratelimit.slidingWindow>[1];
16+
export type RateLimitResponse = Awaited<ReturnType<Ratelimit["limit"]>>;
17+
18+
export class RateLimiter {
19+
#ratelimit: Ratelimit;
20+
21+
constructor(private readonly options: Options) {
22+
const { redis, keyPrefix, limiter } = options;
23+
const prefix = `ratelimit:${keyPrefix}`;
24+
this.#ratelimit = new Ratelimit({
25+
redis: createRedisRateLimitClient(
26+
redis ?? {
27+
port: env.REDIS_PORT,
28+
host: env.REDIS_HOST,
29+
username: env.REDIS_USERNAME,
30+
password: env.REDIS_PASSWORD,
31+
enableAutoPipelining: true,
32+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
33+
}
34+
),
35+
limiter,
36+
ephemeralCache: new Map(),
37+
analytics: false,
38+
prefix,
39+
});
40+
41+
logger.info(`RateLimiter (${keyPrefix}): initialized`, {
42+
keyPrefix,
43+
redisKeyspace: prefix,
44+
});
45+
}
46+
47+
async limit(identifier: string, rate = 1): Promise<RateLimitResponse> {
48+
const result = this.#ratelimit.limit(identifier, { rate });
49+
const { success, limit, reset, remaining } = await result;
50+
51+
if (success && this.options.logSuccess) {
52+
logger.info(`RateLimiter (${this.options.keyPrefix}): under rate limit`, {
53+
limit,
54+
reset,
55+
remaining,
56+
identifier,
57+
});
58+
}
59+
60+
//log these by default
61+
if (!success && this.options.logFailure !== false) {
62+
logger.info(`RateLimiter (${this.options.keyPrefix}): rate limit exceeded`, {
63+
limit,
64+
reset,
65+
remaining,
66+
identifier,
67+
});
68+
}
69+
70+
return result;
71+
}
72+
}
73+
74+
export function createRedisRateLimitClient(
75+
redisOptions: RedisOptions
76+
): ConstructorParameters<typeof Ratelimit>[0]["redis"] {
77+
const redis = new Redis(redisOptions);
78+
79+
return {
80+
sadd: async <TData>(key: string, ...members: TData[]): Promise<number> => {
81+
return redis.sadd(key, members as (string | number | Buffer)[]);
82+
},
83+
hset: <TValue>(
84+
key: string,
85+
obj: {
86+
[key: string]: TValue;
87+
}
88+
): Promise<number> => {
89+
return redis.hset(key, obj);
90+
},
91+
eval: <TArgs extends unknown[], TData = unknown>(
92+
...args: [script: string, keys: string[], args: TArgs]
93+
): Promise<TData> => {
94+
const script = args[0];
95+
const keys = args[1];
96+
const argsArray = args[2];
97+
return redis.eval(
98+
script,
99+
keys.length,
100+
...keys,
101+
...(argsArray as (string | Buffer | number)[])
102+
) as Promise<TData>;
103+
},
104+
};
105+
}

apps/webapp/app/services/worker.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ function getWorkerQueue() {
309309
},
310310
"events.deliverScheduled": {
311311
priority: 0, // smaller number = higher priority
312-
maxAttempts: 5,
312+
maxAttempts: 8,
313313
handler: async ({ id, payload }, job) => {
314314
const service = new DeliverScheduledEventService();
315315

apps/webapp/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@
101101
"@trigger.dev/yalt": "workspace:*",
102102
"@types/pg": "8.6.6",
103103
"@uiw/react-codemirror": "^4.19.5",
104-
"@upstash/ratelimit": "^1.0.1",
104+
"@upstash/ratelimit": "^1.1.3",
105105
"@whatwg-node/fetch": "^0.9.14",
106106
"assert-never": "^1.2.1",
107107
"aws4fetch": "^1.0.18",

pnpm-lock.yaml

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)