Skip to content

v2 IngestSendEvent rate limit #1134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ const EnvironmentSchema = z.object({
API_RATE_LIMIT_REQUEST_LOGS_ENABLED: z.string().default("0"),
API_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),

//Ingesting event rate limit
INGEST_EVENT_RATE_LIMIT_WINDOW: z.string().default("60s"),
INGEST_EVENT_RATE_LIMIT_MAX: z.coerce.number().int().optional(),

//v3
V3_ENABLED: z.string().default("false"),
PROVIDER_SECRET: z.string().default("provider-secret"),
Expand Down
81 changes: 15 additions & 66 deletions apps/webapp/app/services/apiRateLimit.server.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,21 @@
import { Ratelimit } from "@upstash/ratelimit";
import { Request as ExpressRequest, Response as ExpressResponse, NextFunction } from "express";
import Redis, { RedisOptions } from "ioredis";
import { RedisOptions } from "ioredis";
import { createHash } from "node:crypto";
import { env } from "~/env.server";
import { logger } from "./logger.server";

function createRedisRateLimitClient(
redisOptions: RedisOptions
): ConstructorParameters<typeof Ratelimit>[0]["redis"] {
const redis = new Redis(redisOptions);

return {
sadd: async <TData>(key: string, ...members: TData[]): Promise<number> => {
return redis.sadd(key, members as (string | number | Buffer)[]);
},
eval: <TArgs extends unknown[], TData = unknown>(
...args: [script: string, keys: string[], args: TArgs]
): Promise<TData> => {
const script = args[0];
const keys = args[1];
const argsArray = args[2];
return redis.eval(
script,
keys.length,
...keys,
...(argsArray as (string | Buffer | number)[])
) as Promise<TData>;
},
};
}
import { Duration, Limiter, RateLimiter, createRedisRateLimitClient } from "./rateLimiter.server";

type Options = {
redis?: RedisOptions;
keyPrefix: string;
pathMatchers: (RegExp | string)[];
pathWhiteList?: (RegExp | string)[];
limiter: Limiter;
log?: {
requests?: boolean;
rejections?: boolean;
};
redis: RedisOptions;
keyPrefix: string;
pathMatchers: (RegExp | string)[];
pathWhiteList?: (RegExp | string)[];
limiter: ConstructorParameters<typeof Ratelimit>[0]["limiter"];
};

//returns an Express middleware that rate limits using the Bearer token in the Authorization header
Expand All @@ -54,12 +30,12 @@ export function authorizationRateLimitMiddleware({
requests: true,
},
}: Options) {
const rateLimiter = new Ratelimit({
redis: createRedisRateLimitClient(redis),
limiter: limiter,
ephemeralCache: new Map(),
analytics: false,
prefix: keyPrefix,
const rateLimiter = new RateLimiter({
redis,
keyPrefix,
limiter,
logSuccess: log.requests,
logFailure: log.rejections,
});

return async (req: ExpressRequest, res: ExpressResponse, next: NextFunction) => {
Expand Down Expand Up @@ -135,27 +111,9 @@ export function authorizationRateLimitMiddleware({
res.set("x-ratelimit-reset", reset.toString());

if (success) {
if (log.requests) {
logger.info(`RateLimiter (${keyPrefix}): under rate limit`, {
limit,
reset,
remaining,
hashedAuthorizationValue,
});
}
return next();
}

if (log.rejections) {
logger.warn(`RateLimiter (${keyPrefix}): rate limit exceeded`, {
limit,
reset,
remaining,
pending,
hashedAuthorizationValue,
});
}

res.setHeader("Content-Type", "application/problem+json");
const secondsUntilReset = Math.max(0, (reset - new Date().getTime()) / 1000);
return res.status(429).send(
Expand All @@ -167,6 +125,7 @@ export function authorizationRateLimitMiddleware({
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
reset,
limit,
remaining,
secondsUntilReset,
error: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
},
Expand All @@ -177,18 +136,8 @@ export function authorizationRateLimitMiddleware({
};
}

type Duration = Parameters<typeof Ratelimit.slidingWindow>[1];

export const apiRateLimiter = authorizationRateLimitMiddleware({
keyPrefix: "ratelimit:api",
redis: {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
keyPrefix: "api",
limiter: Ratelimit.slidingWindow(env.API_RATE_LIMIT_MAX, env.API_RATE_LIMIT_WINDOW as Duration),
pathMatchers: [/^\/api/],
// Allow /api/v1/tasks/:id/callback/:secret
Expand Down
36 changes: 31 additions & 5 deletions apps/webapp/app/services/events/ingestSendEvent.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { workerQueue } from "~/services/worker.server";
import { logger } from "../logger.server";
import { EventRecord, ExternalAccount } from "@trigger.dev/database";
import { Duration, RateLimiter } from "../rateLimiter.server";
import { Ratelimit } from "@upstash/ratelimit";
import { env } from "~/env.server";

type UpdateEventInput = {
tx: PrismaClientOrTransaction;
Expand Down Expand Up @@ -31,9 +34,19 @@ const EVENT_UPDATE_THRESHOLD_WINDOW_IN_MSECS = 5 * 1000; // 5 seconds

export class IngestSendEvent {
#prismaClient: PrismaClientOrTransaction;
#rateLimiter: RateLimiter | undefined;

constructor(prismaClient: PrismaClientOrTransaction = prisma, private deliverEvents = true) {
this.#prismaClient = prismaClient;
this.#rateLimiter = env.INGEST_EVENT_RATE_LIMIT_MAX
? new RateLimiter({
keyPrefix: "ingestsendevent",
limiter: Ratelimit.slidingWindow(
env.INGEST_EVENT_RATE_LIMIT_MAX,
env.INGEST_EVENT_RATE_LIMIT_WINDOW as Duration
),
})
: undefined;
}

#calculateDeliverAt(options?: SendEventOptions) {
Expand Down Expand Up @@ -65,7 +78,7 @@ export class IngestSendEvent {
return;
}

return await $transaction(this.#prismaClient, async (tx) => {
const createdEvent = await $transaction(this.#prismaClient, async (tx) => {
const externalAccount = options?.accountId
? await tx.externalAccount.upsert({
where: {
Expand Down Expand Up @@ -106,6 +119,23 @@ export class IngestSendEvent {

return eventLog;
});

if (!createdEvent) return;

//rate limit
const result = await this.#rateLimiter?.limit(environment.organizationId);
if (result && !result.success) {
logger.info("IngestSendEvent: Rate limit exceeded", {
eventRecordId: createdEvent.id,
organizationId: environment.organizationId,
reset: result.reset,
limit: result.limit,
});
return;
}

await this.enqueueWorkerEvent(this.#prismaClient, createdEvent);
return createdEvent;
} catch (error) {
const prismaError = PrismaErrorSchema.safeParse(error);

Expand Down Expand Up @@ -151,8 +181,6 @@ export class IngestSendEvent {
},
});

await this.enqueueWorkerEvent(tx, eventLog);

return eventLog;
}

Expand All @@ -177,8 +205,6 @@ export class IngestSendEvent {
},
});

await this.enqueueWorkerEvent(tx, updatedEventLog);

return updatedEventLog;
}

Expand Down
105 changes: 105 additions & 0 deletions apps/webapp/app/services/rateLimiter.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { Ratelimit } from "@upstash/ratelimit";
import Redis, { RedisOptions } from "ioredis";
import { env } from "~/env.server";
import { logger } from "./logger.server";

type Options = {
redis?: RedisOptions;
keyPrefix: string;
limiter: Limiter;
logSuccess?: boolean;
logFailure?: boolean;
};

export type Limiter = ConstructorParameters<typeof Ratelimit>[0]["limiter"];
export type Duration = Parameters<typeof Ratelimit.slidingWindow>[1];
export type RateLimitResponse = Awaited<ReturnType<Ratelimit["limit"]>>;

export class RateLimiter {
#ratelimit: Ratelimit;

constructor(private readonly options: Options) {
const { redis, keyPrefix, limiter } = options;
const prefix = `ratelimit:${keyPrefix}`;
this.#ratelimit = new Ratelimit({
redis: createRedisRateLimitClient(
redis ?? {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
}
),
limiter,
ephemeralCache: new Map(),
analytics: false,
prefix,
});

logger.info(`RateLimiter (${keyPrefix}): initialized`, {
keyPrefix,
redisKeyspace: prefix,
});
}

async limit(identifier: string, rate = 1): Promise<RateLimitResponse> {
const result = this.#ratelimit.limit(identifier, { rate });
const { success, limit, reset, remaining } = await result;

if (success && this.options.logSuccess) {
logger.info(`RateLimiter (${this.options.keyPrefix}): under rate limit`, {
limit,
reset,
remaining,
identifier,
});
}

//log these by default
if (!success && this.options.logFailure !== false) {
logger.info(`RateLimiter (${this.options.keyPrefix}): rate limit exceeded`, {
limit,
reset,
remaining,
identifier,
});
}

return result;
}
}

export function createRedisRateLimitClient(
redisOptions: RedisOptions
): ConstructorParameters<typeof Ratelimit>[0]["redis"] {
const redis = new Redis(redisOptions);

return {
sadd: async <TData>(key: string, ...members: TData[]): Promise<number> => {
return redis.sadd(key, members as (string | number | Buffer)[]);
},
hset: <TValue>(
key: string,
obj: {
[key: string]: TValue;
}
): Promise<number> => {
return redis.hset(key, obj);
},
eval: <TArgs extends unknown[], TData = unknown>(
...args: [script: string, keys: string[], args: TArgs]
): Promise<TData> => {
const script = args[0];
const keys = args[1];
const argsArray = args[2];
return redis.eval(
script,
keys.length,
...keys,
...(argsArray as (string | Buffer | number)[])
) as Promise<TData>;
},
};
}
2 changes: 1 addition & 1 deletion apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ function getWorkerQueue() {
},
"events.deliverScheduled": {
priority: 0, // smaller number = higher priority
maxAttempts: 5,
maxAttempts: 8,
handler: async ({ id, payload }, job) => {
const service = new DeliverScheduledEventService();

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
"@trigger.dev/yalt": "workspace:*",
"@types/pg": "8.6.6",
"@uiw/react-codemirror": "^4.19.5",
"@upstash/ratelimit": "^1.0.1",
"@upstash/ratelimit": "^1.1.3",
"@whatwg-node/fetch": "^0.9.14",
"assert-never": "^1.2.1",
"aws4fetch": "^1.0.18",
Expand Down
14 changes: 7 additions & 7 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading