Skip to content

Support for logically separated redis instances #1647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,93 @@ const EnvironmentSchema = z.object({
REDIS_PASSWORD: z.string().optional(),
REDIS_TLS_DISABLED: z.string().optional(),

RATE_LIMIT_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
RATE_LIMIT_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
RATE_LIMIT_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
RATE_LIMIT_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
RATE_LIMIT_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
RATE_LIMIT_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
RATE_LIMIT_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),

CACHE_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
CACHE_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
CACHE_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
CACHE_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
CACHE_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
CACHE_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),

PUBSUB_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
PUBSUB_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
PUBSUB_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
PUBSUB_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
PUBSUB_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
PUBSUB_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),

DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS: z.coerce.number().int().positive().default(1),
Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/app/services/apiRateLimit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import { Duration } from "./rateLimiter.server";

export const apiRateLimiter = authorizationRateLimitMiddleware({
redis: {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
port: env.RATE_LIMIT_REDIS_PORT,
host: env.RATE_LIMIT_REDIS_HOST,
username: env.RATE_LIMIT_REDIS_USERNAME,
password: env.RATE_LIMIT_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
keyPrefix: "api",
defaultLimiter: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ export function authorizationRateLimitMiddleware({

const redisClient = createRedisRateLimitClient(
redis ?? {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
port: env.RATE_LIMIT_REDIS_PORT,
host: env.RATE_LIMIT_REDIS_HOST,
username: env.RATE_LIMIT_REDIS_USERNAME,
password: env.RATE_LIMIT_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
}
);

Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/app/services/platform.v3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ function initializePlatformCache() {
const redisCacheStore = new RedisCacheStore({
connection: {
keyPrefix: "tr:cache:platform:v3",
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
port: env.CACHE_REDIS_PORT,
host: env.CACHE_REDIS_HOST,
username: env.CACHE_REDIS_USERNAME,
password: env.CACHE_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
});

Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/app/services/rateLimiter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ export class RateLimiter {
redisClient ??
createRedisRateLimitClient(
redis ?? {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
port: env.RATE_LIMIT_REDIS_PORT,
host: env.RATE_LIMIT_REDIS_HOST,
username: env.RATE_LIMIT_REDIS_USERNAME,
password: env.RATE_LIMIT_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
}
),
limiter,
Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/app/services/realtimeClientGlobal.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ function initializeRealtimeClient() {
electricOrigin: env.ELECTRIC_ORIGIN,
keyPrefix: "tr:realtime:concurrency",
redis: {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
port: env.RATE_LIMIT_REDIS_PORT,
host: env.RATE_LIMIT_REDIS_HOST,
username: env.RATE_LIMIT_REDIS_USERNAME,
password: env.RATE_LIMIT_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
...(env.RATE_LIMIT_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
cachedLimitProvider: {
async getCachedLimit(organizationId, defaultValue) {
Expand Down
33 changes: 17 additions & 16 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -991,26 +991,24 @@ export class EventRepository {
async subscribeToTrace(traceId: string) {
const redis = new Redis(this._config.redis);

const channel = `events:${traceId}:*`;
const channel = `events:${traceId}`;

// Subscribe to the channel.
await redis.psubscribe(channel);
await redis.subscribe(channel);

// Increment the subscriber count.
this._subscriberCount++;

const eventEmitter = new EventEmitter();

// Define the message handler.
redis.on("pmessage", (pattern, channelReceived, message) => {
if (channelReceived.startsWith(`events:${traceId}:`)) {
eventEmitter.emit("message", message);
}
redis.on("message", (_, message) => {
eventEmitter.emit("message", message);
});

// Return a function that can be used to unsubscribe.
const unsubscribe = async () => {
await redis.punsubscribe(channel);
await redis.unsubscribe(channel);
redis.quit();
this._subscriberCount--;
};
Expand Down Expand Up @@ -1101,10 +1099,13 @@ export class EventRepository {

async #publishToRedis(events: CreatableEvent[]) {
if (events.length === 0) return;
const uniqueTraceSpans = new Set(events.map((e) => `events:${e.traceId}:${e.spanId}`));
for (const id of uniqueTraceSpans) {
await this._redisPublishClient.publish(id, new Date().toISOString());
}
const uniqueTraces = new Set(events.map((e) => `events:${e.traceId}`));

await Promise.allSettled(
Array.from(uniqueTraces).map((traceId) =>
this._redisPublishClient.publish(traceId, new Date().toISOString())
)
);
}

public generateTraceId() {
Expand Down Expand Up @@ -1142,12 +1143,12 @@ function initializeEventRepo() {
batchInterval: env.EVENTS_BATCH_INTERVAL,
retentionInDays: env.EVENTS_DEFAULT_LOG_RETENTION,
redis: {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
port: env.PUBSUB_REDIS_PORT,
host: env.PUBSUB_REDIS_HOST,
username: env.PUBSUB_REDIS_USERNAME,
password: env.PUBSUB_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
});

Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/app/v3/marqs/devPubSub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ export const devPubSub = singleton("devPubSub", initializeDevPubSub);
function initializeDevPubSub() {
const pubSub = new ZodPubSub({
redis: {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
port: env.PUBSUB_REDIS_PORT,
host: env.PUBSUB_REDIS_HOST,
username: env.PUBSUB_REDIS_USERNAME,
password: env.PUBSUB_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
schema: messageCatalog,
});
Expand Down
33 changes: 0 additions & 33 deletions apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ export type FairDequeuingStrategyOptions = {
defaultOrgConcurrency: number;
defaultEnvConcurrency: number;
parentQueueLimit: number;
checkForDisabledOrgs: boolean;
tracer: Tracer;
seed?: string;
/**
Expand Down Expand Up @@ -88,7 +87,6 @@ const defaultBiases: FairDequeuingStrategyBiases = {
export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
private _cache: UnkeyCache<{
concurrencyLimit: number;
disabledConcurrency: boolean;
}>;

private _rng: seedrandom.PRNG;
Expand All @@ -107,11 +105,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
fresh: 60_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value.
stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh.
}),
disabledConcurrency: new Namespace<boolean>(ctx, {
stores: [memory],
fresh: 30_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value.
stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh.
}),
});

this._rng = seedrandom(options.seed);
Expand Down Expand Up @@ -512,16 +505,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
return await startSpan(this.options.tracer, "getOrgConcurrency", async (span) => {
span.setAttribute("org_id", orgId);

if (this.options.checkForDisabledOrgs) {
const isDisabled = await this.#getConcurrencyDisabled(orgId);

if (isDisabled) {
span.setAttribute("disabled", true);

return { current: 0, limit: 0 };
}
}

const [currentValue, limitValue] = await Promise.all([
this.#getOrgCurrentConcurrency(orgId),
this.#getOrgConcurrencyLimit(orgId),
Expand Down Expand Up @@ -587,22 +570,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
});
}

async #getConcurrencyDisabled(orgId: string) {
return await startSpan(this.options.tracer, "getConcurrencyDisabled", async (span) => {
span.setAttribute("org_id", orgId);

const key = this.options.keys.disabledConcurrencyLimitKey(orgId);

const result = await this._cache.disabledConcurrency.swr(key, async () => {
const value = await this.options.redis.exists(key);

return Boolean(value);
});

return typeof result.val === "boolean" ? result.val : false;
});
}

async #getOrgConcurrencyLimit(orgId: string) {
return await startSpan(this.options.tracer, "getOrgConcurrencyLimit", async (span) => {
span.setAttribute("org_id", orgId);
Expand Down
2 changes: 0 additions & 2 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,6 @@ function getMarQSClient() {
keys: keysProducer,
defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT,
defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT,
checkForDisabledOrgs: true,
biases: {
concurrencyLimitBias: env.MARQS_CONCURRENCY_LIMIT_BIAS,
availableCapacityBias: env.MARQS_AVAILABLE_CAPACITY_BIAS,
Expand All @@ -1635,7 +1634,6 @@ function getMarQSClient() {
keys: keysProducer,
defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT,
defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT,
checkForDisabledOrgs: false,
biases: {
concurrencyLimitBias: 0.0,
availableCapacityBias: 0.0,
Expand Down
1 change: 0 additions & 1 deletion apps/webapp/app/v3/marqs/v2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ function getMarQSClient() {
keys: new MarQSV2KeyProducer(KEY_PREFIX),
defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY,
defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT,
checkForDisabledOrgs: true,
}),
envQueuePriorityStrategy: new NoopFairDequeuingStrategy(), // We don't use this in v2, since all queues go through the shared queue
workers: 0,
Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/app/v3/services/projectPubSub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ export const projectPubSub = singleton("projectPubSub", initializeProjectPubSub)
function initializeProjectPubSub() {
const pubSub = new ZodPubSub({
redis: {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
port: env.PUBSUB_REDIS_PORT,
host: env.PUBSUB_REDIS_HOST,
username: env.PUBSUB_REDIS_USERNAME,
password: env.PUBSUB_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
...(env.PUBSUB_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
schema: messageCatalog,
});
Expand Down
Loading