Skip to content

Commit b43bab0

Browse files
committed
Merge remote-tracking branch 'origin/main' into fix/dev-engine-url
2 parents 1b042dd + 863ecf4 commit b43bab0

File tree

7 files changed

+138
-76
lines changed

7 files changed

+138
-76
lines changed

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -571,17 +571,6 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
571571

572572
//triggered all the runs
573573
if (updatedBatch.runIds.length === updatedBatch.runCount) {
574-
//unblock the parent run from the batch
575-
//this prevents the parent continuing before all the runs are created
576-
if (parentRunId && resumeParentOnCompletion) {
577-
await this._engine.unblockRunForCreatedBatch({
578-
runId: RunId.fromFriendlyId(parentRunId),
579-
batchId: batch.id,
580-
environmentId: environment.id,
581-
projectId: environment.projectId,
582-
});
583-
}
584-
585574
//if all the runs were idempotent, it's possible the batch is already completed
586575
await this._engine.tryCompleteBatch({ batchId: batch.id });
587576
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ export class RunEngine {
290290

291291
this.batchSystem = new BatchSystem({
292292
resources,
293+
waitpointSystem: this.waitpointSystem,
293294
});
294295

295296
this.runAttemptSystem = new RunAttemptSystem({
@@ -905,43 +906,6 @@ export class RunEngine {
905906
}
906907
}
907908

908-
/**
909-
* This is called when all the runs for a batch have been created.
910-
* This does NOT mean that all the runs for the batch are completed.
911-
*/
912-
async unblockRunForCreatedBatch({
913-
runId,
914-
batchId,
915-
tx,
916-
}: {
917-
runId: string;
918-
batchId: string;
919-
environmentId: string;
920-
projectId: string;
921-
tx?: PrismaClientOrTransaction;
922-
}): Promise<void> {
923-
const prisma = tx ?? this.prisma;
924-
925-
const waitpoint = await prisma.waitpoint.findFirst({
926-
where: {
927-
completedByBatchId: batchId,
928-
},
929-
});
930-
931-
if (!waitpoint) {
932-
this.logger.error("RunEngine.unblockRunForBatch(): Waitpoint not found", {
933-
runId,
934-
batchId,
935-
});
936-
throw new ServiceValidationError("Waitpoint not found for batch", 404);
937-
}
938-
939-
await this.completeWaitpoint({
940-
id: waitpoint.id,
941-
output: { value: "Batch waitpoint completed", isError: false },
942-
});
943-
}
944-
945909
async tryCompleteBatch({ batchId }: { batchId: string }): Promise<void> {
946910
return this.batchSystem.scheduleCompleteBatch({ batchId });
947911
}

internal-packages/run-engine/src/engine/systems/batchSystem.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
import { startSpan } from "@internal/tracing";
22
import { isFinalRunStatus } from "../statuses.js";
33
import { SystemResources } from "./systems.js";
4+
import { WaitpointSystem } from "./waitpointSystem.js";
45

56
export type BatchSystemOptions = {
67
resources: SystemResources;
8+
waitpointSystem: WaitpointSystem;
79
};
810

911
export class BatchSystem {
1012
private readonly $: SystemResources;
13+
private readonly waitpointSystem: WaitpointSystem;
1114

1215
constructor(private readonly options: BatchSystemOptions) {
1316
this.$ = options.resources;
17+
this.waitpointSystem = options.waitpointSystem;
1418
}
1519

1620
public async scheduleCompleteBatch({ batchId }: { batchId: string }): Promise<void> {
@@ -19,8 +23,8 @@ export class BatchSystem {
1923
id: `tryCompleteBatch:${batchId}`,
2024
job: "tryCompleteBatch",
2125
payload: { batchId: batchId },
22-
//2s in the future
23-
availableAt: new Date(Date.now() + 2_000),
26+
//200ms in the future
27+
availableAt: new Date(Date.now() + 200),
2428
});
2529
}
2630

@@ -75,6 +79,28 @@ export class BatchSystem {
7579
status: "COMPLETED",
7680
},
7781
});
82+
83+
//get waitpoint (if there is one)
84+
const waitpoint = await this.$.prisma.waitpoint.findFirst({
85+
where: {
86+
completedByBatchId: batchId,
87+
},
88+
});
89+
90+
if (!waitpoint) {
91+
this.$.logger.debug(
92+
"RunEngine.unblockRunForBatch(): Waitpoint not found. This is ok, because only batchTriggerAndWait has waitpoints",
93+
{
94+
batchId,
95+
}
96+
);
97+
return;
98+
}
99+
100+
await this.waitpointSystem.completeWaitpoint({
101+
id: waitpoint.id,
102+
output: { value: "Batch waitpoint completed", isError: false },
103+
});
78104
} else {
79105
this.$.logger.debug("#tryCompleteBatch: Not all runs are completed", { batchId });
80106
}

internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,6 @@ describe("RunEngine batchTriggerAndWait", () => {
191191
expect(batchWaitpoint?.waitpoint.type).toBe("BATCH");
192192
expect(batchWaitpoint?.waitpoint.completedByBatchId).toBe(batch.id);
193193

194-
await engine.unblockRunForCreatedBatch({
195-
runId: parentRun.id,
196-
batchId: batch.id,
197-
environmentId: authenticatedEnvironment.id,
198-
projectId: authenticatedEnvironment.projectId,
199-
});
200-
201194
//dequeue and start the 1st child
202195
const dequeuedChild = await engine.dequeueFromMasterQueue({
203196
consumerId: "test_12345",
@@ -303,7 +296,7 @@ describe("RunEngine batchTriggerAndWait", () => {
303296
expect(child2WaitpointAfter?.status).toBe("COMPLETED");
304297
expect(child2WaitpointAfter?.output).toBe('{"baz":"qux"}');
305298

306-
await setTimeout(500);
299+
await setTimeout(1_000);
307300

308301
const runWaitpointsAfterSecondChild = await prisma.taskRunWaitpoint.findMany({
309302
where: {
@@ -497,13 +490,6 @@ describe("RunEngine batchTriggerAndWait", () => {
497490
expect(parentAfterBatchChild.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
498491
expect(parentAfterBatchChild.batch?.id).toBe(batch.id);
499492

500-
await engine.unblockRunForCreatedBatch({
501-
runId: parentRun.id,
502-
batchId: batch.id,
503-
environmentId: authenticatedEnvironment.id,
504-
projectId: authenticatedEnvironment.projectId,
505-
});
506-
507493
//dequeue and start the batch child
508494
const dequeuedBatchChild = await engine.dequeueFromMasterQueue({
509495
consumerId: "test_12345",

internal-packages/run-engine/src/engine/tests/checkpoints.test.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,13 +1166,6 @@ describe("RunEngine checkpoints", () => {
11661166
expect(batchWaitpoint?.waitpoint.type).toBe("BATCH");
11671167
expect(batchWaitpoint?.waitpoint.completedByBatchId).toBe(batch.id);
11681168

1169-
await engine.unblockRunForCreatedBatch({
1170-
runId: parentRun.id,
1171-
batchId: batch.id,
1172-
environmentId: authenticatedEnvironment.id,
1173-
projectId: authenticatedEnvironment.projectId,
1174-
});
1175-
11761169
// Create a checkpoint
11771170
const checkpointResult = await engine.createCheckpoint({
11781171
runId: parentRun.id,

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export type RunQueueOptions = {
4949
keys: RunQueueKeyProducer;
5050
queueSelectionStrategy: RunQueueSelectionStrategy;
5151
verbose?: boolean;
52-
logger: Logger;
52+
logger?: Logger;
5353
retryOptions?: RetryOptions;
5454
};
5555

@@ -88,7 +88,7 @@ export class RunQueue {
8888
});
8989
},
9090
});
91-
this.logger = options.logger;
91+
this.logger = options.logger ?? new Logger("RunQueue", "warn");
9292

9393
this.keys = options.keys;
9494
this.queueSelectionStrategy = options.queueSelectionStrategy;
@@ -404,11 +404,17 @@ export class RunQueue {
404404
tenantQueues[env.envId] = [...env.queues]; // Create a copy of the queues array
405405
}
406406

407+
// Track if we successfully dequeued any message in a complete cycle
408+
let successfulDequeueInCycle = false;
409+
407410
// Continue until we've hit max count or all tenants have empty queue lists
408411
while (
409412
messages.length < maxCount &&
410413
Object.values(tenantQueues).some((queues) => queues.length > 0)
411414
) {
415+
// Reset the success flag at the start of each cycle
416+
successfulDequeueInCycle = false;
417+
412418
for (const env of envQueues) {
413419
attemptedEnvs++;
414420

@@ -428,6 +434,7 @@ export class RunQueue {
428434

429435
if (message) {
430436
messages.push(message);
437+
successfulDequeueInCycle = true;
431438
// Re-add this queue at the end, since it might have more messages
432439
tenantQueues[env.envId].push(queue);
433440
}
@@ -438,6 +445,14 @@ export class RunQueue {
438445
break;
439446
}
440447
}
448+
449+
// If we completed a full cycle through all tenants with no successful dequeues,
450+
// exit early as we're likely hitting concurrency limits or have no ready messages
451+
if (!successfulDequeueInCycle) {
452+
// IMPORTANT: Keep this log message as it's used in tests
453+
this.logger.log("No successful dequeues in a full cycle, exiting...");
454+
break;
455+
}
441456
}
442457

443458
span.setAttributes({

internal-packages/run-engine/src/run-queue/tests/dequeueMessageFromMasterQueue.test.ts

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { redisTest } from "@internal/testcontainers";
22
import { trace } from "@internal/tracing";
3-
import { Logger } from "@trigger.dev/core/logger";
43
import { describe } from "node:test";
54
import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js";
65
import { RunQueue } from "../index.js";
@@ -12,7 +11,6 @@ const testOptions = {
1211
tracer: trace.getTracer("rq"),
1312
workers: 1,
1413
defaultEnvConcurrency: 25,
15-
logger: new Logger("RunQueue", "warn"),
1614
retryOptions: {
1715
maxAttempts: 5,
1816
factor: 1.1,
@@ -264,4 +262,95 @@ describe("RunQueue.dequeueMessageFromMasterQueue", () => {
264262
}
265263
}
266264
);
265+
266+
redisTest(
267+
"should exit early when no messages can be dequeued in a full cycle",
268+
async ({ redisContainer }) => {
269+
const mockLogger = {
270+
log: vi.fn(),
271+
error: vi.fn(),
272+
warn: vi.fn(),
273+
debug: vi.fn(),
274+
name: "test-logger",
275+
level: "debug",
276+
filteredKeys: [],
277+
additionalFields: {},
278+
setLevel: vi.fn(),
279+
setFilteredKeys: vi.fn(),
280+
setAdditionalFields: vi.fn(),
281+
child: vi.fn(),
282+
};
283+
284+
const queue = new RunQueue({
285+
...testOptions,
286+
queueSelectionStrategy: new FairQueueSelectionStrategy({
287+
redis: {
288+
keyPrefix: "runqueue:test:",
289+
host: redisContainer.getHost(),
290+
port: redisContainer.getPort(),
291+
},
292+
keys: testOptions.keys,
293+
}),
294+
redis: {
295+
keyPrefix: "runqueue:test:",
296+
host: redisContainer.getHost(),
297+
port: redisContainer.getPort(),
298+
},
299+
// @ts-expect-error
300+
logger: mockLogger,
301+
});
302+
303+
try {
304+
const envMasterQueue = `env:${authenticatedEnvDev.id}`;
305+
const queueCount = 10; // Reduced for simplicity
306+
307+
// First, create all queues and enqueue initial messages
308+
for (let i = 0; i < queueCount; i++) {
309+
const queueName = `${messageDev.queue}_${i}`;
310+
// Set each queue's concurrency limit to 0 (this guarantees dequeue will fail)
311+
await queue.updateQueueConcurrencyLimits(authenticatedEnvDev, queueName, 0);
312+
313+
// Enqueue a message to each queue
314+
await queue.enqueueMessage({
315+
env: authenticatedEnvDev,
316+
message: { ...messageDev, runId: `r${4321 + i}`, queue: queueName },
317+
masterQueues: ["main", envMasterQueue],
318+
});
319+
}
320+
321+
// Try to dequeue messages - this should exit early due to concurrency limits
322+
const startTime = Date.now();
323+
const dequeued = await queue.dequeueMessageFromMasterQueue(
324+
"test_12345",
325+
envMasterQueue,
326+
queueCount
327+
);
328+
const endTime = Date.now();
329+
330+
// Verify no messages were dequeued
331+
expect(dequeued.length).toBe(0);
332+
333+
// Verify the operation completed quickly (under 1000ms)
334+
const duration = endTime - startTime;
335+
expect(duration).toBeLessThan(1000);
336+
337+
// Verify we only logged one early exit message
338+
expect(mockLogger.log).toHaveBeenCalledWith(
339+
expect.stringContaining("No successful dequeues in a full cycle, exiting")
340+
);
341+
expect(mockLogger.log.mock.calls.length).toBeLessThanOrEqual(2);
342+
343+
// Verify all messages are still in queues
344+
let totalRemaining = 0;
345+
for (let i = 0; i < queueCount; i++) {
346+
const queueName = `${messageDev.queue}_${i}`;
347+
const length = await queue.lengthOfQueue(authenticatedEnvDev, queueName);
348+
totalRemaining += length;
349+
}
350+
expect(totalRemaining).toBe(queueCount);
351+
} finally {
352+
await queue.quit();
353+
}
354+
}
355+
);
267356
});

0 commit comments

Comments
 (0)