Skip to content

Commit 93516c9

Browse files
committed
Added rate limiting to IngestSendEvent
1 parent a2993e3 commit 93516c9

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ const EnvironmentSchema = z.object({
100100
API_RATE_LIMIT_REQUEST_LOGS_ENABLED: z.string().default("0"),
101101
API_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
102102

103+
//Ingesting event rate limit
104+
INGEST_EVENT_RATE_LIMIT_WINDOW: z.string().default("60s"),
105+
INGEST_EVENT_RATE_LIMIT_MAX: z.coerce.number().int().default(1_000),
106+
103107
//v3
104108
V3_ENABLED: z.string().default("false"),
105109
PROVIDER_SECRET: z.string().default("provider-secret"),

apps/webapp/app/services/events/ingestSendEvent.server.ts

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
44
import { workerQueue } from "~/services/worker.server";
55
import { logger } from "../logger.server";
66
import { EventRecord, ExternalAccount } from "@trigger.dev/database";
7+
import { Duration, RateLimiter } from "../rateLimiter.server";
8+
import { Ratelimit } from "@upstash/ratelimit";
9+
import { env } from "~/env.server";
710

811
type UpdateEventInput = {
912
tx: PrismaClientOrTransaction;
@@ -31,9 +34,17 @@ const EVENT_UPDATE_THRESHOLD_WINDOW_IN_MSECS = 5 * 1000; // 5 seconds
3134

3235
export class IngestSendEvent {
3336
#prismaClient: PrismaClientOrTransaction;
37+
#rateLimiter: RateLimiter;
3438

3539
constructor(prismaClient: PrismaClientOrTransaction = prisma, private deliverEvents = true) {
3640
this.#prismaClient = prismaClient;
41+
this.#rateLimiter = new RateLimiter({
42+
keyPrefix: "ingestsendevent",
43+
limiter: Ratelimit.slidingWindow(
44+
env.INGEST_EVENT_RATE_LIMIT_MAX,
45+
env.INGEST_EVENT_RATE_LIMIT_WINDOW as Duration
46+
),
47+
});
3748
}
3849

3950
#calculateDeliverAt(options?: SendEventOptions) {
@@ -104,6 +115,20 @@ export class IngestSendEvent {
104115
eventSource,
105116
}));
106117

118+
//rate limit
119+
const { success, reset, limit } = await this.#rateLimiter.limit(environment.organizationId);
120+
121+
if (success) {
122+
await this.enqueueWorkerEvent(tx, eventLog);
123+
} else {
124+
logger.info("IngestSendEvent: Rate limit exceeded", {
125+
organizationId: environment.organizationId,
126+
reset,
127+
limit,
128+
});
129+
return;
130+
}
131+
107132
return eventLog;
108133
});
109134
} catch (error) {
@@ -151,8 +176,6 @@ export class IngestSendEvent {
151176
},
152177
});
153178

154-
await this.enqueueWorkerEvent(tx, eventLog);
155-
156179
return eventLog;
157180
}
158181

@@ -177,8 +200,6 @@ export class IngestSendEvent {
177200
},
178201
});
179202

180-
await this.enqueueWorkerEvent(tx, updatedEventLog);
181-
182203
return updatedEventLog;
183204
}
184205

0 commit comments

Comments
 (0)