Skip to content

Support redis/valkey cluster mode #1650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const EnvironmentSchema = z.object({
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
RATE_LIMIT_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

CACHE_REDIS_HOST: z
.string()
Expand Down Expand Up @@ -148,6 +149,7 @@ const EnvironmentSchema = z.object({
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
CACHE_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

PUBSUB_REDIS_HOST: z
.string()
Expand Down Expand Up @@ -177,6 +179,7 @@ const EnvironmentSchema = z.object({
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
Expand Down
63 changes: 63 additions & 0 deletions apps/webapp/app/redis.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { Cluster, Redis, type ClusterNode, type ClusterOptions } from "ioredis";
import { logger } from "./services/logger.server";

export type RedisWithClusterOptions = {
host?: string;
port?: number;
username?: string;
password?: string;
tlsDisabled?: boolean;
clusterMode?: boolean;
clusterOptions?: Omit<ClusterOptions, "redisOptions">;
keyPrefix?: string;
};

export type RedisClient = Redis | Cluster;

export function createRedisClient(
connectionName: string,
options: RedisWithClusterOptions
): Redis | Cluster {
if (options.clusterMode) {
const nodes: ClusterNode[] = [
{
host: options.host,
port: options.port,
},
];
Comment on lines +22 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve cluster nodes configuration.

The cluster configuration assumes a single node, which doesn't fully utilize Redis cluster capabilities. Consider accepting multiple nodes through the options.

Enhance the cluster configuration:

-    const nodes: ClusterNode[] = [
-      {
-        host: options.host,
-        port: options.port,
-      },
-    ];
+    const nodes: ClusterNode[] = options.clusterOptions?.nodes ?? [
+      {
+        host: options.host,
+        port: options.port,
+      },
+    ];
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const nodes: ClusterNode[] = [
{
host: options.host,
port: options.port,
},
];
const nodes: ClusterNode[] = options.clusterOptions?.nodes ?? [
{
host: options.host,
port: options.port,
},
];


logger.debug("Creating a redis cluster client", {
connectionName,
host: options.host,
port: options.port,
});

return new Redis.Cluster(nodes, {
...options.clusterOptions,
redisOptions: {
connectionName,
keyPrefix: options.keyPrefix,
username: options.username,
password: options.password,
enableAutoPipelining: true,
...(options.tlsDisabled ? {} : { tls: {} }),
},
});
} else {
logger.debug("Creating a redis client", {
connectionName,
host: options.host,
port: options.port,
});

return new Redis({
connectionName,
host: options.host,
port: options.port,
username: options.username,
password: options.password,
enableAutoPipelining: true,
...(options.tlsDisabled ? {} : { tls: {} }),
});
}
}
4 changes: 2 additions & 2 deletions apps/webapp/app/services/apiRateLimit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({
host: env.RATE_LIMIT_REDIS_HOST,
username: env.RATE_LIMIT_REDIS_USERNAME,
password: env.RATE_LIMIT_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true",
clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
},
keyPrefix: "api",
defaultLimiter: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { env } from "~/env.server";
import { logger } from "./logger.server";
import { createRedisRateLimitClient, Duration, RateLimiter } from "./rateLimiter.server";
import { RedisCacheStore } from "./unkey/redisCacheStore.server";
import { RedisWithClusterOptions } from "~/redis.server";

const DurationSchema = z.custom<Duration>((value) => {
if (typeof value !== "string") {
Expand Down Expand Up @@ -54,7 +55,7 @@ export type RateLimiterConfig = z.infer<typeof RateLimiterConfig>;
type LimitConfigOverrideFunction = (authorizationValue: string) => Promise<unknown>;

type Options = {
redis?: RedisOptions;
redis?: RedisWithClusterOptions;
keyPrefix: string;
pathMatchers: (RegExp | string)[];
pathWhiteList?: (RegExp | string)[];
Expand Down Expand Up @@ -167,8 +168,8 @@ export function authorizationRateLimitMiddleware({
host: env.RATE_LIMIT_REDIS_HOST,
username: env.RATE_LIMIT_REDIS_USERNAME,
password: env.RATE_LIMIT_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true",
clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
}
);

Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/services/platform.v3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ function initializePlatformCache() {
host: env.CACHE_REDIS_HOST,
username: env.CACHE_REDIS_USERNAME,
password: env.CACHE_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
tlsDisabled: env.CACHE_REDIS_TLS_DISABLED === "true",
clusterMode: env.CACHE_REDIS_CLUSTER_MODE_ENABLED === "1",
},
});

Expand Down
13 changes: 8 additions & 5 deletions apps/webapp/app/services/rateLimiter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Ratelimit } from "@upstash/ratelimit";
import Redis, { RedisOptions } from "ioredis";
import { RedisOptions } from "ioredis";
import { env } from "~/env.server";
import { createRedisClient, RedisWithClusterOptions } from "~/redis.server";
import { logger } from "./logger.server";

type Options = {
Expand Down Expand Up @@ -32,8 +33,8 @@ export class RateLimiter {
host: env.RATE_LIMIT_REDIS_HOST,
username: env.RATE_LIMIT_REDIS_USERNAME,
password: env.RATE_LIMIT_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true",
clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
}
),
limiter,
Expand Down Expand Up @@ -70,8 +71,10 @@ export class RateLimiter {
}
}

export function createRedisRateLimitClient(redisOptions: RedisOptions): RateLimiterRedisClient {
const redis = new Redis(redisOptions);
export function createRedisRateLimitClient(
redisOptions: RedisWithClusterOptions
): RateLimiterRedisClient {
const redis = createRedisClient("trigger:rateLimiter", redisOptions);

return {
sadd: async <TData>(key: string, ...members: TData[]): Promise<number> => {
Expand Down
7 changes: 4 additions & 3 deletions apps/webapp/app/services/realtimeClient.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import Redis, { Callback, Result, type RedisOptions } from "ioredis";
import { randomUUID } from "node:crypto";
import { longPollingFetch } from "~/utils/longPollingFetch";
import { logger } from "./logger.server";
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";

export interface CachedLimitProvider {
getCachedLimit: (organizationId: string, defaultValue: number) => Promise<number | undefined>;
}

export type RealtimeClientOptions = {
electricOrigin: string;
redis: RedisOptions;
redis: RedisWithClusterOptions;
cachedLimitProvider: CachedLimitProvider;
keyPrefix: string;
expiryTimeInSeconds?: number;
Expand All @@ -26,12 +27,12 @@ export type RealtimeRunsParams = {
};

export class RealtimeClient {
private redis: Redis;
private redis: RedisClient;
private expiryTimeInSeconds: number;
private cachedLimitProvider: CachedLimitProvider;

constructor(private options: RealtimeClientOptions) {
this.redis = new Redis(options.redis);
this.redis = createRedisClient("trigger:realtime", options.redis);
this.expiryTimeInSeconds = options.expiryTimeInSeconds ?? 60 * 5; // default to 5 minutes
this.cachedLimitProvider = options.cachedLimitProvider;
this.#registerCommands();
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/services/realtimeClientGlobal.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ function initializeRealtimeClient() {
host: env.RATE_LIMIT_REDIS_HOST,
username: env.RATE_LIMIT_REDIS_USERNAME,
password: env.RATE_LIMIT_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true",
clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
Comment on lines +15 to +16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Standardize environment variable checks across the codebase.

The environment variable checks use inconsistent patterns:

  • RATE_LIMIT_REDIS_TLS_DISABLED === "true"
  • RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1"

This inconsistency could lead to confusion and potential errors.

Standardize the checks to use the same pattern across all Redis configuration files:

-      tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true",
-      clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
+      tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "1",
+      clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "true",
clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",
tlsDisabled: env.RATE_LIMIT_REDIS_TLS_DISABLED === "1",
clusterMode: env.RATE_LIMIT_REDIS_CLUSTER_MODE_ENABLED === "1",

},
cachedLimitProvider: {
async getCachedLimit(organizationId, defaultValue) {
Expand Down
13 changes: 6 additions & 7 deletions apps/webapp/app/services/unkey/redisCacheStore.server.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import { Err, Ok, type Result } from "@unkey/error";
import type { Entry, Store } from "@unkey/cache/stores";
import type { RedisOptions } from "ioredis";
import { Redis } from "ioredis";
import { CacheError } from "@unkey/cache";
import type { Entry, Store } from "@unkey/cache/stores";
import { Err, Ok, type Result } from "@unkey/error";
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";

export type RedisCacheStoreConfig = {
connection: RedisOptions;
connection: RedisWithClusterOptions;
};

export class RedisCacheStore<TNamespace extends string, TValue = any>
implements Store<TNamespace, TValue>
{
public readonly name = "redis";
private readonly redis: Redis;
private readonly redis: RedisClient;

constructor(config: RedisCacheStoreConfig) {
this.redis = new Redis(config.connection);
this.redis = createRedisClient("trigger:cacheStore", config.connection);
}

private buildCacheKey(namespace: TNamespace, key: string): string {
Expand Down
14 changes: 7 additions & 7 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
unflattenAttributes,
} from "@trigger.dev/core/v3";
import { Prisma, TaskEvent, TaskEventStatus, type TaskEventKind } from "@trigger.dev/database";
import Redis, { RedisOptions } from "ioredis";
import { createHash } from "node:crypto";
import { EventEmitter } from "node:stream";
import { Gauge } from "prom-client";
Expand All @@ -32,6 +31,7 @@ import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
import { startActiveSpan } from "./tracer.server";
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";

const MAX_FLUSH_DEPTH = 5;

Expand Down Expand Up @@ -97,7 +97,7 @@ export type EventBuilder = {
export type EventRepoConfig = {
batchSize: number;
batchInterval: number;
redis: RedisOptions;
redis: RedisWithClusterOptions;
retentionInDays: number;
};

Expand Down Expand Up @@ -200,7 +200,7 @@ type TaskEventSummary = Pick<
export class EventRepository {
private readonly _flushScheduler: DynamicFlushScheduler<CreatableEvent>;
private _randomIdGenerator = new RandomIdGenerator();
private _redisPublishClient: Redis;
private _redisPublishClient: RedisClient;
private _subscriberCount = 0;

get subscriberCount() {
Expand All @@ -218,7 +218,7 @@ export class EventRepository {
callback: this.#flushBatch.bind(this),
});

this._redisPublishClient = new Redis(this._config.redis);
this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis);
}

async insert(event: CreatableEvent) {
Expand Down Expand Up @@ -989,7 +989,7 @@ export class EventRepository {
}

async subscribeToTrace(traceId: string) {
const redis = new Redis(this._config.redis);
const redis = createRedisClient("trigger:eventRepoSubscriber", this._config.redis);

const channel = `events:${traceId}`;

Expand Down Expand Up @@ -1147,8 +1147,8 @@ function initializeEventRepo() {
host: env.PUBSUB_REDIS_HOST,
username: env.PUBSUB_REDIS_USERNAME,
password: env.PUBSUB_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true",
clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
},
});

Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/marqs/devPubSub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ function initializeDevPubSub() {
host: env.PUBSUB_REDIS_HOST,
username: env.PUBSUB_REDIS_USERNAME,
password: env.PUBSUB_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true",
clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
Comment on lines +28 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Standardize environment variable checks (same issue as in realtimeClientGlobal.server.ts).

The environment variable checks use inconsistent patterns:

  • PUBSUB_REDIS_TLS_DISABLED === "true"
  • PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1"

Standardize the checks:

-      tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true",
-      clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
+      tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "1",
+      clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true",
clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "1",
clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",

},
schema: messageCatalog,
});
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/services/projectPubSub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ function initializeProjectPubSub() {
host: env.PUBSUB_REDIS_HOST,
username: env.PUBSUB_REDIS_USERNAME,
password: env.PUBSUB_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true",
clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
Comment on lines +29 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Standardize environment variable checks (same issue as in other PubSub services).

The environment variable checks use inconsistent patterns:

  • PUBSUB_REDIS_TLS_DISABLED === "true"
  • PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1"

Standardize the checks:

-      tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true",
-      clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
+      tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "1",
+      clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "true",
clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
tlsDisabled: env.PUBSUB_REDIS_TLS_DISABLED === "1",
clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",

},
schema: messageCatalog,
});
Expand Down
12 changes: 6 additions & 6 deletions apps/webapp/app/v3/utils/zodPubSub.server.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Logger } from "@trigger.dev/core/logger";
import { ZodMessageCatalogSchema, ZodMessageHandler } from "@trigger.dev/core/v3/zodMessageHandler";
import { Evt } from "evt";
import Redis, { RedisOptions } from "ioredis";
import { z } from "zod";
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";
import { logger } from "~/services/logger.server";
import { safeJsonParse } from "~/utils/json";

export type ZodPubSubOptions<TMessageCatalog extends ZodMessageCatalogSchema> = {
redis: RedisOptions;
redis: RedisWithClusterOptions;
schema: TMessageCatalog;
};

Expand All @@ -23,7 +23,7 @@ export interface ZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
implements ZodSubscriber<TMessageCatalog>
{
private _subscriber: Redis;
private _subscriber: RedisClient;
private _listeners: Map<string, (payload: unknown) => Promise<void>> = new Map();
private _messageHandler: ZodMessageHandler<TMessageCatalog>;

Expand All @@ -36,7 +36,7 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
private readonly _options: ZodPubSubOptions<TMessageCatalog>,
private readonly _logger: Logger
) {
this._subscriber = new Redis(_options.redis);
this._subscriber = createRedisClient("trigger:zodSubscriber", _options.redis);
this._messageHandler = new ZodMessageHandler({
schema: _options.schema,
logger: this._logger,
Expand Down Expand Up @@ -104,7 +104,7 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
}

export class ZodPubSub<TMessageCatalog extends ZodMessageCatalogSchema> {
private _publisher: Redis;
private _publisher: RedisClient;
private _logger = logger.child({ module: "ZodPubSub" });
private _subscriberCount = 0;

Expand All @@ -113,7 +113,7 @@ export class ZodPubSub<TMessageCatalog extends ZodMessageCatalogSchema> {
}

constructor(private _options: ZodPubSubOptions<TMessageCatalog>) {
this._publisher = new Redis(_options.redis);
this._publisher = createRedisClient("trigger:zodSubscriber", _options.redis);
}

public async publish<K extends keyof TMessageCatalog>(
Expand Down
Loading
Loading