Skip to content

Commit 3c8fb96

Browse files
committed
Complete waitpoint before getting affected runs. Added more logging
1 parent 1b1ad16 commit 3c8fb96

File tree

1 file changed

+73
-22
lines changed

1 file changed

+73
-22
lines changed

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 73 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,7 @@ export class WaitpointSystem {
6666
isError: boolean;
6767
};
6868
}): Promise<Waitpoint> {
69-
// 1. Find the TaskRuns blocked by this waitpoint
70-
const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({
71-
where: { waitpointId: id },
72-
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
73-
});
74-
75-
if (affectedTaskRuns.length === 0) {
76-
this.$.logger.debug(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
77-
waitpointId: id,
78-
});
79-
}
80-
69+
// 1. Complete the Waitpoint (if not completed)
8170
let [waitpointError, waitpoint] = await tryCatch(
8271
this.$.prisma.waitpoint.update({
8372
where: { id, status: "PENDING" },
@@ -109,15 +98,41 @@ export class WaitpointSystem {
10998
throw new Error(`Waitpoint ${id} not found`);
11099
}
111100

112-
//schedule trying to continue the runs
101+
if (waitpoint.status !== "COMPLETED") {
102+
throw new Error(`Waitpoint ${id} is not completed`);
103+
}
104+
105+
// 2. Find the TaskRuns blocked by this waitpoint
106+
const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({
107+
where: { waitpointId: id },
108+
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
109+
});
110+
111+
if (affectedTaskRuns.length === 0) {
112+
this.$.logger.debug(`completeWaitpoint: no TaskRunWaitpoints found for waitpoint`, {
113+
waitpointId: id,
114+
});
115+
}
116+
117+
// 3. Schedule trying to continue the runs
113118
for (const run of affectedTaskRuns) {
119+
const jobId = `continueRunIfUnblocked:${run.taskRunId}`;
120+
//50ms in the future
121+
const availableAt = new Date(Date.now() + 50);
122+
123+
this.$.logger.debug(`completeWaitpoint: enqueueing continueRunIfUnblocked`, {
124+
waitpointId: id,
125+
runId: run.taskRunId,
126+
jobId,
127+
availableAt,
128+
});
129+
114130
await this.$.worker.enqueue({
115131
//this will debounce the call
116-
id: `continueRunIfUnblocked:${run.taskRunId}`,
132+
id: jobId,
117133
job: "continueRunIfUnblocked",
118134
payload: { runId: run.taskRunId },
119-
//50ms in the future
120-
availableAt: new Date(Date.now() + 50),
135+
availableAt,
121136
});
122137

123138
// emit an event to complete associated cached runs
@@ -469,6 +484,10 @@ export class WaitpointSystem {
469484
}
470485

471486
public async continueRunIfUnblocked({ runId }: { runId: string }) {
487+
this.$.logger.debug(`continueRunIfUnblocked: start`, {
488+
runId,
489+
});
490+
472491
// 1. Get the any blocking waitpoints
473492
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
474493
where: { taskRunId: runId },
@@ -483,6 +502,10 @@ export class WaitpointSystem {
483502

484503
// 2. There are blockers still, so do nothing
485504
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
505+
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
506+
runId,
507+
blockingWaitpoints,
508+
});
486509
return;
487510
}
488511

@@ -505,15 +528,18 @@ export class WaitpointSystem {
505528
});
506529

507530
if (!run) {
508-
throw new Error(`#continueRunIfUnblocked: run not found: ${runId}`);
531+
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
532+
runId,
533+
});
534+
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
509535
}
510536

511537
//4. Continue the run whether it's executing or not
512538
await this.$.runLock.lock("continueRunIfUnblocked", [runId], 5000, async () => {
513539
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);
514540

515541
if (isFinishedOrPendingFinished(snapshot.executionStatus)) {
516-
this.$.logger.debug(`#continueRunIfUnblocked: run is finished, skipping`, {
542+
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
517543
runId,
518544
snapshot,
519545
});
@@ -555,6 +581,15 @@ export class WaitpointSystem {
555581

556582
await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
557583

584+
this.$.logger.debug(
585+
`continueRunIfUnblocked: run was still executing, sending notification`,
586+
{
587+
runId,
588+
snapshot,
589+
newSnapshot,
590+
}
591+
);
592+
558593
await sendNotificationToWorker({
559594
runId,
560595
snapshot: newSnapshot,
@@ -563,7 +598,7 @@ export class WaitpointSystem {
563598
} else {
564599
// Because we cannot reacquire the concurrency, we need to enqueue the run again
565600
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
566-
await this.enqueueSystem.enqueueRun({
601+
const newSnapshot = await this.enqueueSystem.enqueueRun({
567602
run,
568603
env: run.runtimeEnvironment,
569604
snapshot: {
@@ -577,21 +612,27 @@ export class WaitpointSystem {
577612
index: b.batchIndex ?? undefined,
578613
})),
579614
});
615+
616+
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
617+
runId,
618+
snapshot,
619+
newSnapshot,
620+
});
580621
}
581622
} else {
582623
if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) {
583624
// TODO: We're screwed, should probably fail the run immediately
584-
this.$.logger.error(`#continueRunIfUnblocked: run has no checkpoint`, {
625+
this.$.logger.error(`continueRunIfUnblocked: run has no checkpoint`, {
585626
runId: run.id,
586627
snapshot,
587628
blockingWaitpoints,
588629
});
589-
throw new Error(`#continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
630+
throw new Error(`continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
590631
}
591632

592633
//put it back in the queue, with the original timestamp (w/ priority)
593634
//this prioritizes dequeuing waiting runs over new runs
594-
await this.enqueueSystem.enqueueRun({
635+
const newSnapshot = await this.enqueueSystem.enqueueRun({
595636
run,
596637
env: run.runtimeEnvironment,
597638
snapshot: {
@@ -604,6 +645,12 @@ export class WaitpointSystem {
604645
})),
605646
checkpointId: snapshot.checkpointId ?? undefined,
606647
});
648+
649+
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
650+
runId,
651+
snapshot,
652+
newSnapshot,
653+
});
607654
}
608655
});
609656

@@ -613,6 +660,10 @@ export class WaitpointSystem {
613660
taskRunId: runId,
614661
},
615662
});
663+
664+
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
665+
runId,
666+
});
616667
}
617668

618669
public async createRunAssociatedWaitpoint(

0 commit comments

Comments
 (0)