Skip to content

batchTriggerAndWait checkpoint race condition when at max concurrency #1296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ apps/**/public/build
.trigger
.tshy*
.yarn
*.tsbuildinfo
*.tsbuildinfo
/packages/cli-v3/src/package.json
4 changes: 2 additions & 2 deletions apps/webapp/app/presenters/TeamPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getTeamMembersAndInvites } from "~/models/member.server";
import { BasePresenter } from "./v3/basePresenter.server";
import { getLimit } from "~/services/platform.v3.server";
import { BasePresenter } from "./v3/basePresenter.server";

export class TeamPresenter extends BasePresenter {
public async call({ userId, organizationId }: { userId: string; organizationId: string }) {
Expand All @@ -13,7 +13,7 @@ export class TeamPresenter extends BasePresenter {
return;
}

const limit = await getLimit(organizationId, "teamMembers", 25);
const limit = await getLimit(organizationId, "teamMembers", 100_000_000);

return {
...result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class AlertChannelListPresenter extends BasePresenter {
throw new Error(`Project not found: ${projectId}`);
}

const limit = await getLimit(organization.organizationId, "alerts", 25);
const limit = await getLimit(organization.organizationId, "alerts", 100_000_000);

return {
alertChannels: await Promise.all(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export class ScheduleListPresenter extends BasePresenter {
};
});

const limit = await getLimit(project.organizationId, "schedules", 500);
const limit = await getLimit(project.organizationId, "schedules", 100_000_000);

return {
currentPage: page,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/checkSchedule.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class CheckScheduleService extends BaseService {
throw new ServiceValidationError("Project not found");
}

const limit = await getLimit(project.organizationId, "schedules", 500);
const limit = await getLimit(project.organizationId, "schedules", 100_000_000);
const schedulesCount = await this._prisma.taskSchedule.count({
where: {
projectId,
Expand Down
10 changes: 10 additions & 0 deletions apps/webapp/app/v3/services/createCheckpoint.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,16 @@ export class CreateCheckpointService extends BaseService {
};
}

//if there's a message in the queue, we make sure the checkpoint event is on it
await marqs?.replaceMessage(
attempt.taskRun.id,
{
checkpointEventId: checkpointEvent.id,
},
undefined,
true
);

await ResumeBatchRunService.enqueue(batchRun.id, this._prisma);

return {
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/v3/services/initializeDeployment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ export class InitializeDeploymentService extends BaseService {
const nextVersion = calculateNextBuildVersion(latestDeployment?.version);

// Try and create a depot build and get back the external build data
const externalBuildData = !!payload.selfHosted
? await createRemoteImageBuild(environment.project)
: undefined;
const externalBuildData = payload.selfHosted
? undefined
: await createRemoteImageBuild(environment.project);

const triggeredBy = payload.userId
? await this._prisma.user.findUnique({
Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/v3/services/resumeTaskDependency.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ export class ResumeTaskDependencyService extends BaseService {
const dependentRun = dependency.dependentAttempt.taskRun;

if (dependency.dependentAttempt.status === "PAUSED" && dependency.checkpointEventId) {
logger.debug(
"Task dependency resume: Attempt is paused and there's a checkpoint. Enqueuing resume with checkpoint.",
{
attemptId: dependency.id,
dependentAttempt: dependency.dependentAttempt,
checkpointEventId: dependency.checkpointEventId,
hasCheckpointEvent: !!dependency.checkpointEventId,
runId: dependentRun.id,
}
);
await marqs?.enqueueMessage(
dependency.taskRun.runtimeEnvironment,
dependentRun.queue,
Expand All @@ -61,6 +71,7 @@ export class ResumeTaskDependencyService extends BaseService {
dependentAttempt: dependency.dependentAttempt,
checkpointEventId: dependency.checkpointEventId,
hasCheckpointEvent: !!dependency.checkpointEventId,
runId: dependentRun.id,
});

if (dependency.dependentAttempt.status === "PAUSED" && !dependency.checkpointEventId) {
Expand Down
36 changes: 35 additions & 1 deletion references/v3-catalog/src/trigger/checkpoints.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, task, wait } from "@trigger.dev/sdk/v3";
import { logger, queue, task, wait } from "@trigger.dev/sdk/v3";

type Payload = {
count?: number;
Expand Down Expand Up @@ -70,6 +70,7 @@ export const nestedDependencies = task({
maxDepth,
waitSeconds,
failAttemptChance,
batchSize,
});
logger.log(`Triggered complete ${i + 1}/${batchSize}`);

Expand Down Expand Up @@ -153,3 +154,36 @@ export const bulkPermanentlyFrozen = task({
);
},
});

const oneAtATime = queue({
name: "race-condition",
concurrencyLimit: 1,
});

export const raceConditionCheckpointDequeue = task({
id: "race-condition-checkpoint-dequeue",
queue: oneAtATime,
run: async ({ isBatch = true }: { isBatch?: boolean }) => {
await holdConcurrency.trigger({ waitSeconds: 45 });

if (isBatch) {
await fixedLengthTask.batchTriggerAndWait(
Array.from({ length: 1 }, (_, i) => ({
payload: { waitSeconds: 5 },
}))
);
} else {
await fixedLengthTask.triggerAndWait({ waitSeconds: 5 });
}

logger.log(`Successfully completed task`);
},
});

export const holdConcurrency = task({
id: "hold-concurrency",
queue: oneAtATime,
run: async ({ waitSeconds = 60 }: { waitSeconds?: number }) => {
await new Promise((resolve) => setTimeout(resolve, waitSeconds * 1000));
},
});
Loading