Skip to content

Commit 6db88f2

Browse files
committed
more wip
1 parent 0e1c8ff commit 6db88f2

32 files changed

+124
-100
lines changed

internal-packages/redis-worker/package.json

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@
66
"types": "./src/index.ts",
77
"type": "module",
88
"dependencies": {
9-
"@opentelemetry/api": "^1.9.0",
9+
"@internal/tracing": "workspace:*",
1010
"@internal/redis": "workspace:*",
1111
"@trigger.dev/core": "workspace:*",
12-
"ioredis": "^5.3.2",
1312
"lodash.omit": "^4.5.0",
1413
"nanoid": "^5.0.7",
1514
"p-limit": "^6.2.0",
@@ -24,4 +23,4 @@
2423
"typecheck": "tsc --noEmit",
2524
"test": "vitest --no-file-parallelism"
2625
}
27-
}
26+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
export * from "./queue";
2-
export * from "./worker";
1+
export * from "./queue.js";
2+
export * from "./worker.js";

internal-packages/redis-worker/src/queue.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1-
import { createRedisClient } from "@internal/redis";
1+
import {
2+
createRedisClient,
3+
type Redis,
4+
type Callback,
5+
type RedisOptions,
6+
type Result,
7+
} from "@internal/redis";
28
import { Logger } from "@trigger.dev/core/logger";
3-
import Redis, { type Callback, type RedisOptions, type Result } from "ioredis";
49
import { nanoid } from "nanoid";
510
import { z } from "zod";
611

@@ -436,7 +441,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
436441
}
437442
}
438443

439-
declare module "ioredis" {
444+
declare module "@internal/redis" {
440445
interface RedisCommander<Context> {
441446
enqueueItem(
442447
//keys

internal-packages/redis-worker/src/telemetry.ts

Lines changed: 0 additions & 31 deletions
This file was deleted.

internal-packages/redis-worker/src/worker.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { describe } from "node:test";
44
import { expect } from "vitest";
55
import { z } from "zod";
66
import { Worker } from "./worker.js";
7-
import Redis from "ioredis";
87
import { createRedisClient } from "@internal/redis";
98

109
describe("Worker", () => {

internal-packages/redis-worker/src/worker.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
import { SpanKind, trace, Tracer } from "@opentelemetry/api";
1+
import { SpanKind, startSpan, trace, Tracer } from "@internal/tracing";
22
import { Logger } from "@trigger.dev/core/logger";
33
import { calculateNextRetryDelay } from "@trigger.dev/core/v3";
44
import { type RetryOptions } from "@trigger.dev/core/v3/schemas";
5-
import { type RedisOptions } from "ioredis";
5+
import { Redis, type RedisOptions } from "@internal/redis";
66
import { z } from "zod";
77
import { AnyQueueItem, SimpleQueue } from "./queue.js";
8-
import Redis from "ioredis";
98
import { nanoid } from "nanoid";
10-
import { startSpan } from "./telemetry.js";
119
import pLimit from "p-limit";
1210
import { createRedisClient } from "@internal/redis";
1311

internal-packages/redis-worker/tsconfig.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
"compilerOptions": {
33
"target": "ES2019",
44
"lib": ["ES2019", "DOM", "DOM.Iterable", "DOM.AsyncIterable"],
5-
"module": "CommonJS",
6-
"moduleResolution": "Node",
5+
"module": "Node16",
6+
"moduleResolution": "Node16",
77
"moduleDetection": "force",
88
"verbatimModuleSyntax": false,
99
"types": ["vitest/globals"],

internal-packages/redis/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Redis, RedisOptions } from "ioredis";
22
import { Logger } from "@trigger.dev/core/logger";
33

4-
export { Redis, type Callback, type RedisOptions, type Result } from "ioredis";
4+
export { Redis, type Callback, type RedisOptions, type Result, type RedisCommander } from "ioredis";
55

66
const defaultOptions: Partial<RedisOptions> = {
77
retryStrategy: (times: number) => {

internal-packages/run-engine/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
"@trigger.dev/core": "workspace:*",
1313
"@trigger.dev/database": "workspace:*",
1414
"assert-never": "^1.2.1",
15-
"ioredis": "^5.3.2",
1615
"nanoid": "^3.3.4",
1716
"redlock": "5.0.0-beta.2",
1817
"zod": "3.23.8",

internal-packages/run-engine/src/engine/eventBus.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { TaskRunExecutionStatus, TaskRunStatus } from "@trigger.dev/database";
2-
import { AuthenticatedEnvironment } from "../shared";
2+
import { AuthenticatedEnvironment } from "../shared/index.js";
33
import { FlushedRunMetadata, TaskRunError } from "@trigger.dev/core/v3";
44

55
export type EventBusEvents = {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { createRedisClient } from "@internal/redis";
1+
import { createRedisClient, Redis } from "@internal/redis";
22
import { Worker } from "@internal/redis-worker";
3-
import { Attributes, Span, SpanKind, trace, Tracer } from "@opentelemetry/api";
3+
import { Attributes, Span, SpanKind, trace, Tracer } from "@internal/tracing";
44
import { assertExhaustive } from "@trigger.dev/core";
55
import { Logger } from "@trigger.dev/core/logger";
66
import {
@@ -48,29 +48,29 @@ import {
4848
TaskRunStatus,
4949
Waitpoint,
5050
} from "@trigger.dev/database";
51-
import assertNever from "assert-never";
52-
import { Redis } from "ioredis";
51+
import { assertNever } from "assert-never";
5352
import { nanoid } from "nanoid";
5453
import { EventEmitter } from "node:events";
5554
import { z } from "zod";
56-
import { RunQueue } from "../run-queue";
57-
import { SimpleWeightedChoiceStrategy } from "../run-queue/simpleWeightedPriorityStrategy";
58-
import { MinimalAuthenticatedEnvironment } from "../shared";
59-
import { MAX_TASK_RUN_ATTEMPTS } from "./consts";
60-
import { getRunWithBackgroundWorkerTasks } from "./db/worker";
61-
import { runStatusFromError } from "./errors";
62-
import { EventBusEvents } from "./eventBus";
63-
import { executionResultFromSnapshot, getLatestExecutionSnapshot } from "./executionSnapshots";
64-
import { RunLocker } from "./locking";
65-
import { getMachinePreset } from "./machinePresets";
55+
import { RunQueue } from "../run-queue/index.js";
56+
import { FairDequeuingStrategy } from "../run-queue/fairDequeuingStrategy.js";
57+
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
58+
import { MAX_TASK_RUN_ATTEMPTS } from "./consts.js";
59+
import { getRunWithBackgroundWorkerTasks } from "./db/worker.js";
60+
import { runStatusFromError } from "./errors.js";
61+
import { EventBusEvents } from "./eventBus.js";
62+
import { executionResultFromSnapshot, getLatestExecutionSnapshot } from "./executionSnapshots.js";
63+
import { RunLocker } from "./locking.js";
64+
import { getMachinePreset } from "./machinePresets.js";
6665
import {
6766
isCheckpointable,
6867
isDequeueableExecutionStatus,
6968
isExecuting,
7069
isFinalRunStatus,
7170
isPendingExecuting,
72-
} from "./statuses";
73-
import { HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types";
71+
} from "./statuses.js";
72+
import { HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
73+
import { RunQueueShortKeyProducer } from "../run-queue/keyProducer.js";
7474
import { retryOutcomeFromCompletion } from "./retrying";
7575

7676
const workerCatalog = {
@@ -153,11 +153,16 @@ export class RunEngine {
153153
);
154154
this.runLock = new RunLocker({ redis: this.runLockRedis });
155155

156+
const keys = new RunQueueShortKeyProducer("rq:");
157+
156158
this.runQueue = new RunQueue({
157159
name: "rq",
158160
tracer: trace.getTracer("rq"),
159-
queuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 36 }),
160-
envQueuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 12 }),
161+
keys,
162+
queuePriorityStrategy: new FairDequeuingStrategy({
163+
keys,
164+
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
165+
}),
161166
defaultEnvConcurrency: options.queue?.defaultEnvConcurrency ?? 10,
162167
logger: new Logger("RunQueue", "debug"),
163168
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import Redis from "ioredis";
21
import Redlock, { RedlockAbortSignal } from "redlock";
32
import { AsyncLocalStorage } from "async_hooks";
3+
import { Redis } from "@internal/redis";
44

55
interface LockContext {
66
resources: string;

internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import {
33
setupAuthenticatedEnvironment,
44
setupBackgroundWorker,
55
} from "@internal/testcontainers";
6-
import { trace } from "@opentelemetry/api";
6+
import { trace } from "@internal/tracing";
77
import { generateFriendlyId } from "@trigger.dev/core/v3/apps";
88
import { expect } from "vitest";
99
import { RunEngine } from "../index.js";

internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
setupAuthenticatedEnvironment,
55
setupBackgroundWorker,
66
} from "@internal/testcontainers";
7-
import { trace } from "@opentelemetry/api";
7+
import { trace } from "@internal/tracing";
88
import { expect, describe } from "vitest";
99
import { RunEngine } from "../index.js";
1010
import { setTimeout } from "node:timers/promises";

internal-packages/run-engine/src/engine/tests/cancelling.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
setupBackgroundWorker,
55
assertNonNullable,
66
} from "@internal/testcontainers";
7-
import { trace } from "@opentelemetry/api";
7+
import { trace } from "@internal/tracing";
88
import { expect } from "vitest";
99
import { RunEngine } from "../index.js";
1010
import { setTimeout } from "timers/promises";

internal-packages/run-engine/src/engine/tests/checkpoints.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
setupBackgroundWorker,
66
assertNonNullable,
77
} from "@internal/testcontainers";
8-
import { trace } from "@opentelemetry/api";
8+
import { trace } from "@internal/tracing";
99
import { expect } from "vitest";
1010
import { RunEngine } from "../index.js";
1111
import { setTimeout } from "timers/promises";

internal-packages/run-engine/src/engine/tests/delays.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
setupBackgroundWorker,
55
assertNonNullable,
66
} from "@internal/testcontainers";
7-
import { trace } from "@opentelemetry/api";
7+
import { trace } from "@internal/tracing";
88
import { expect } from "vitest";
99
import { RunEngine } from "../index.js";
1010
import { setTimeout } from "timers/promises";

internal-packages/run-engine/src/engine/tests/dequeuing.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import {
33
setupAuthenticatedEnvironment,
44
setupBackgroundWorker,
55
} from "@internal/testcontainers";
6-
import { trace } from "@opentelemetry/api";
6+
import { trace } from "@internal/tracing";
77
import { generateFriendlyId } from "@trigger.dev/core/v3/apps";
88
import { expect } from "vitest";
99
import { RunEngine } from "../index.js";

internal-packages/run-engine/src/engine/tests/heartbeats.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
setupBackgroundWorker,
55
assertNonNullable,
66
} from "@internal/testcontainers";
7-
import { trace } from "@opentelemetry/api";
7+
import { trace } from "@internal/tracing";
88
import { expect, describe } from "vitest";
99
import { RunEngine } from "../index.js";
1010
import { setTimeout } from "timers/promises";

internal-packages/run-engine/src/engine/tests/notDeployed.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
setupBackgroundWorker,
55
assertNonNullable,
66
} from "@internal/testcontainers";
7-
import { trace } from "@opentelemetry/api";
7+
import { trace } from "@internal/tracing";
88
import { expect } from "vitest";
99
import { RunEngine } from "../index.js";
1010
import { setTimeout } from "timers/promises";

internal-packages/run-engine/src/engine/tests/priority.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import {
33
setupAuthenticatedEnvironment,
44
setupBackgroundWorker,
55
} from "@internal/testcontainers";
6-
import { trace } from "@opentelemetry/api";
6+
import { trace } from "@internal/tracing";
77
import { generateFriendlyId } from "@trigger.dev/core/v3/apps";
88
import { expect } from "vitest";
99
import { RunEngine } from "../index.js";

internal-packages/run-engine/src/engine/tests/trigger.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
setupAuthenticatedEnvironment,
55
setupBackgroundWorker,
66
} from "@internal/testcontainers";
7-
import { trace } from "@opentelemetry/api";
7+
import { trace } from "@internal/tracing";
88
import { expect } from "vitest";
99
import { EventBusEventArgs } from "../eventBus.js";
1010
import { RunEngine } from "../index.js";

internal-packages/run-engine/src/engine/tests/triggerAndWait.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
setupAuthenticatedEnvironment,
55
setupBackgroundWorker,
66
} from "@internal/testcontainers";
7-
import { trace } from "@opentelemetry/api";
7+
import { trace } from "@internal/tracing";
88
import { expect } from "vitest";
99
import { RunEngine } from "../index.js";
1010
import { setTimeout } from "node:timers/promises";

internal-packages/run-engine/src/engine/tests/ttl.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
setupBackgroundWorker,
55
assertNonNullable,
66
} from "@internal/testcontainers";
7-
import { trace } from "@opentelemetry/api";
7+
import { trace } from "@internal/tracing";
88
import { expect } from "vitest";
99
import { RunEngine } from "../index.js";
1010
import { setTimeout } from "timers/promises";

internal-packages/run-engine/src/engine/types.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { type WorkerConcurrencyOptions } from "@internal/redis-worker";
2-
import { Tracer } from "@opentelemetry/api";
2+
import { Tracer } from "@internal/tracing";
33
import { MachinePreset, MachinePresetName, QueueOptions, RetryOptions } from "@trigger.dev/core/v3";
44
import { PrismaClient } from "@trigger.dev/database";
5-
import { type RedisOptions } from "ioredis";
6-
import { MinimalAuthenticatedEnvironment } from "../shared";
5+
import { type RedisOptions } from "@internal/redis";
6+
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
77

88
export type RunEngineOptions = {
99
prisma: PrismaClient;

0 commit comments

Comments
 (0)