Skip to content

Commit f2bb0d0

Browse files
committed
Merge branch 'main' into v3/self-hosting-latest
2 parents 4d06805 + b5cdb0c commit f2bb0d0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+791
-154
lines changed

apps/coordinator/src/index.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,49 @@ class TaskCoordinator {
162162

163163
taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
164164
},
165+
RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => {
166+
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
167+
168+
if (!taskSocket) {
169+
logger.log("Socket for attempt not found", {
170+
attemptFriendlyId: message.attemptFriendlyId,
171+
});
172+
return {
173+
success: false,
174+
error: {
175+
name: "SocketNotFoundError",
176+
message: "Socket for attempt not found",
177+
},
178+
};
179+
}
180+
181+
//if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue
182+
if (taskSocket.data.requiresCheckpointResumeWithMessage) {
183+
logger.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack", {
184+
socketData: taskSocket.data,
185+
});
186+
187+
return {
188+
success: false,
189+
error: {
190+
name: "CheckpointMessagePresentError",
191+
message:
192+
"Checkpoint message is present, so we need to kill the process and resume from the queue.",
193+
},
194+
};
195+
}
196+
197+
await chaosMonkey.call();
198+
199+
// In case the task resumed faster than we could checkpoint
200+
this.#cancelCheckpoint(message.runId);
201+
202+
taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
203+
204+
return {
205+
success: true,
206+
};
207+
},
165208
RESUME_AFTER_DURATION: async (message) => {
166209
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
167210

@@ -792,6 +835,18 @@ class TaskCoordinator {
792835
return;
793836
}
794837

838+
logger.log("WAIT_FOR_TASK checkpoint created", {
839+
checkpoint,
840+
socketData: socket.data,
841+
});
842+
843+
//setting this means we can only resume from a checkpoint
844+
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
845+
logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", {
846+
checkpoint,
847+
socketData: socket.data,
848+
});
849+
795850
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
796851
version: "v1",
797852
attemptFriendlyId: message.attemptFriendlyId,
@@ -804,6 +859,7 @@ class TaskCoordinator {
804859
});
805860

806861
if (ack?.keepRunAlive) {
862+
socket.data.requiresCheckpointResumeWithMessage = undefined;
807863
logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId });
808864
return;
809865
}
@@ -862,6 +918,18 @@ class TaskCoordinator {
862918
return;
863919
}
864920

921+
logger.log("WAIT_FOR_BATCH checkpoint created", {
922+
checkpoint,
923+
socketData: socket.data,
924+
});
925+
926+
//setting this means we can only resume from a checkpoint
927+
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
928+
logger.log("WAIT_FOR_BATCH set checkpoint", {
929+
checkpoint,
930+
socketData: socket.data,
931+
});
932+
865933
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
866934
version: "v1",
867935
attemptFriendlyId: message.attemptFriendlyId,
@@ -875,6 +943,7 @@ class TaskCoordinator {
875943
});
876944

877945
if (ack?.keepRunAlive) {
946+
socket.data.requiresCheckpointResumeWithMessage = undefined;
878947
logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId });
879948
return;
880949
}

apps/webapp/app/hooks/useSearchParam.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ export function useSearchParams() {
1818
}
1919

2020
if (typeof value === "string") {
21-
search.set(param, encodeURIComponent(value));
21+
search.set(param, value);
2222
continue;
2323
}
2424

2525
search.delete(param);
2626
for (const v of value) {
27-
search.append(param, encodeURIComponent(v));
27+
search.append(param, v);
2828
}
2929
}
3030
},

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -725,20 +725,47 @@ export class SharedQueueConsumer {
725725
}
726726

727727
try {
728-
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", {
729-
runId: resumableAttempt.taskRunId,
730-
attemptId: resumableAttempt.id,
731-
});
732-
733-
// The attempt should still be running so we can broadcast to all coordinators to resume immediately
734-
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
735-
version: "v1",
728+
const resumeMessage = {
729+
version: "v1" as const,
736730
runId: resumableAttempt.taskRunId,
737731
attemptId: resumableAttempt.id,
738732
attemptFriendlyId: resumableAttempt.friendlyId,
739733
completions,
740734
executions,
735+
};
736+
737+
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message });
738+
739+
// The attempt should still be running so we can broadcast to all coordinators to resume immediately
740+
const responses = await socketIo.coordinatorNamespace
741+
.timeout(10_000)
742+
.emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage);
743+
744+
logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", {
745+
resumeMessage,
746+
responses,
747+
message,
741748
});
749+
750+
if (responses.length === 0) {
751+
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", {
752+
resumeMessage,
753+
message,
754+
});
755+
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
756+
return;
757+
}
758+
759+
const hasSuccess = responses.some((response) => response.success);
760+
if (!hasSuccess) {
761+
logger.warn("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
762+
resumeMessage,
763+
responses,
764+
message,
765+
});
766+
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
767+
return;
768+
}
742769
} catch (e) {
743770
if (e instanceof Error) {
744771
this._currentSpan?.recordException(e);
@@ -748,7 +775,12 @@ export class SharedQueueConsumer {
748775

749776
this._endSpanInNextIteration = true;
750777

751-
await this.#nackAndDoMoreWork(message.messageId);
778+
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK threw, nacking with delay", {
779+
message,
780+
error: e,
781+
});
782+
783+
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
752784
return;
753785
}
754786

apps/webapp/app/v3/services/resumeBatchRun.server.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ export class ResumeBatchRunService extends BaseService {
122122
// When the checkpoint is created, it will continue the run
123123
logger.error("ResumeBatchRunService: attempt is paused but there's no checkpoint event", {
124124
batchRunId: batchRun.id,
125-
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
125+
dependentTaskAttempt: batchRun.dependentTaskAttempt,
126+
checkpointEventId: batchRun.checkpointEventId,
127+
hasCheckpointEvent: !!batchRun.checkpointEventId,
126128
});
127129
return;
128130
}
@@ -132,7 +134,9 @@ export class ResumeBatchRunService extends BaseService {
132134
if (wasUpdated) {
133135
logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", {
134136
batchRunId: batchRun.id,
135-
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
137+
dependentTaskAttempt: batchRun.dependentTaskAttempt,
138+
checkpointEventId: batchRun.checkpointEventId,
139+
hasCheckpointEvent: !!batchRun.checkpointEventId,
136140
});
137141
await marqs?.replaceMessage(dependentRun.id, {
138142
type: "RESUME",

docs/examples/dall-e3-generate-image.mdx

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ export const generateContent = task({
2929
maxAttempts: 3, // Retry up to 3 times
3030
},
3131
run: async ({ theme, description }: Payload) => {
32-
3332
// Generate text
3433
const textResult = await openai.chat.completions.create({
3534
model: "gpt-4o",
@@ -64,4 +63,15 @@ function generateTextPrompt(theme: string, description: string): any {
6463
function generateImagePrompt(theme: string, description: string): any {
6564
return `Theme: ${theme}\n\nDescription: ${description}`;
6665
}
67-
```
66+
```
67+
68+
## Testing your task
69+
70+
To test this task in the dashboard, you can use the following payload:
71+
72+
```json
73+
{
74+
"theme": "A beautiful sunset",
75+
"description": "A sunset over the ocean with a tiny yacht in the distance."
76+
}
77+
```

0 commit comments

Comments
 (0)