Skip to content

Commit 44e1b87

Browse files
authored
v3: various schedule fixes (#1040)
* Improve the SDK function types and expose a new APIError instead of the APIResult type * Skip triggering scheduled tasks if the task isn’t found in the current deployment Also fixes an issue when editing the environments of a schedule
1 parent 71b0ef8 commit 44e1b87

File tree

20 files changed

+549
-288
lines changed

20 files changed

+549
-288
lines changed

.changeset/ninety-pets-travel.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Improve the SDK function types and expose a new APIError instead of the APIResult type

apps/webapp/app/routes/api.v1.schedules.$scheduleId.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { prisma } from "~/db.server";
66
import { ViewSchedulePresenter } from "~/presenters/v3/ViewSchedulePresenter.server";
77
import { authenticateApiRequest } from "~/services/apiAuth.server";
88
import { UpsertSchedule } from "~/v3/schedules";
9+
import { ServiceValidationError } from "~/v3/services/baseService.server";
910
import { UpsertTaskScheduleService } from "~/v3/services/upsertTaskSchedule.server";
1011

1112
const ParamsSchema = z.object({
@@ -87,6 +88,10 @@ export async function action({ request, params }: ActionFunctionArgs) {
8788

8889
return json(responseObject, { status: 200 });
8990
} catch (error) {
91+
if (error instanceof ServiceValidationError) {
92+
return json({ error: error.message }, { status: 422 });
93+
}
94+
9095
return json(
9196
{ error: error instanceof Error ? error.message : "Internal Server Error" },
9297
{ status: 500 }

apps/webapp/app/routes/api.v1.schedules.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { z } from "zod";
55
import { ScheduleListPresenter } from "~/presenters/v3/ScheduleListPresenter.server";
66
import { authenticateApiRequest } from "~/services/apiAuth.server";
77
import { UpsertSchedule } from "~/v3/schedules";
8+
import { ServiceValidationError } from "~/v3/services/baseService.server";
89
import { UpsertTaskScheduleService } from "~/v3/services/upsertTaskSchedule.server";
910

1011
const SearchParamsSchema = z.object({
@@ -63,6 +64,10 @@ export async function action({ request }: ActionFunctionArgs) {
6364

6465
return json(responseObject, { status: 200 });
6566
} catch (error) {
67+
if (error instanceof ServiceValidationError) {
68+
return json({ error: error.message }, { status: 422 });
69+
}
70+
6671
return json(
6772
{ error: error instanceof Error ? error.message : "Internal Server Error" },
6873
{ status: 500 }

apps/webapp/app/v3/services/baseService.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,10 @@ export abstract class BaseService {
3232
);
3333
}
3434
}
35+
36+
export class ServiceValidationError extends Error {
37+
constructor(message: string) {
38+
super(message);
39+
this.name = "ServiceValidationError";
40+
}
41+
}

apps/webapp/app/v3/services/triggerScheduledTask.server.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ import { BaseService } from "./baseService.server";
33
import { workerQueue } from "~/services/worker.server";
44
import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server";
55
import { TriggerTaskService } from "./triggerTask.server";
6-
import { logger, stringifyIO } from "@trigger.dev/core/v3";
6+
import { stringifyIO } from "@trigger.dev/core/v3";
77
import { nextScheduledTimestamps } from "../utils/calculateNextSchedule.server";
8+
import { findCurrentWorkerDeployment } from "../models/workerDeployment.server";
9+
import { logger } from "~/services/logger.server";
810

911
export class TriggerScheduledTaskService extends BaseService {
1012
public async call(instanceId: string) {
@@ -49,6 +51,40 @@ export class TriggerScheduledTaskService extends BaseService {
4951
shouldTrigger = false;
5052
}
5153

54+
if (instance.environment.type !== "DEVELOPMENT") {
55+
// Get the current backgroundWorker for this environment
56+
const currentWorkerDeployment = await findCurrentWorkerDeployment(instance.environment.id);
57+
58+
if (!currentWorkerDeployment) {
59+
logger.debug("No current worker deployment found, skipping task trigger", {
60+
instanceId,
61+
scheduleId: instance.taskSchedule.friendlyId,
62+
environmentId: instance.environment.id,
63+
});
64+
65+
shouldTrigger = false;
66+
} else if (
67+
!currentWorkerDeployment.worker ||
68+
!currentWorkerDeployment.worker.tasks.some(
69+
(t) => t.id === instance.taskSchedule.taskIdentifier
70+
)
71+
) {
72+
logger.debug(
73+
"Current worker deployment does not contain the scheduled task identifier, skipping task trigger",
74+
{
75+
instanceId,
76+
scheduleId: instance.taskSchedule.friendlyId,
77+
environmentId: instance.environment.id,
78+
workerDeploymentId: currentWorkerDeployment.id,
79+
workerId: currentWorkerDeployment.worker?.id,
80+
taskIdentifier: instance.taskSchedule.taskIdentifier,
81+
}
82+
);
83+
84+
shouldTrigger = false;
85+
}
86+
}
87+
5288
const registerNextService = new RegisterNextTaskScheduleInstanceService();
5389

5490
if (shouldTrigger) {

apps/webapp/app/v3/services/upsertTaskSchedule.server.ts

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { ZodError } from "zod";
44
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
55
import { generateFriendlyId } from "../friendlyIdentifiers";
66
import { CronPattern, UpsertSchedule } from "../schedules";
7-
import { BaseService } from "./baseService.server";
7+
import { BaseService, ServiceValidationError } from "./baseService.server";
88
import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server";
99
import cronstrue from "cronstrue";
1010
import { calculateNextScheduledTimestamp } from "../utils/calculateNextSchedule.server";
@@ -32,10 +32,10 @@ export class UpsertTaskScheduleService extends BaseService {
3232
CronPattern.parse(schedule.cron);
3333
} catch (e) {
3434
if (e instanceof ZodError) {
35-
throw new Error(`Invalid cron expression: ${e.issues[0].message}`);
35+
throw new ServiceValidationError(`Invalid cron expression: ${e.issues[0].message}`);
3636
}
3737

38-
throw new Error(
38+
throw new ServiceValidationError(
3939
`Invalid cron expression: ${e instanceof Error ? e.message : JSON.stringify(e)}`
4040
);
4141
}
@@ -186,26 +186,15 @@ export class UpsertTaskScheduleService extends BaseService {
186186
});
187187

188188
// create the new instances
189-
let instances: InstanceWithEnvironment[] = [];
189+
const newInstances: InstanceWithEnvironment[] = [];
190+
const updatingInstances: InstanceWithEnvironment[] = [];
190191

191192
for (const environmentId of options.environments) {
192193
const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
193194

194195
if (existingInstance) {
195-
if (!existingInstance.active) {
196-
// If the instance is not active, we need to activate it
197-
await tx.taskScheduleInstance.update({
198-
where: {
199-
id: existingInstance.id,
200-
},
201-
data: {
202-
active: true,
203-
},
204-
});
205-
}
206-
207196
// Update the existing instance
208-
instances.push({ ...existingInstance, active: true });
197+
updatingInstances.push(existingInstance);
209198
} else {
210199
// Create a new instance
211200
const instance = await tx.taskScheduleInstance.create({
@@ -226,35 +215,53 @@ export class UpsertTaskScheduleService extends BaseService {
226215
},
227216
});
228217

229-
instances.push(instance);
218+
newInstances.push(instance);
230219
}
231220
}
232221

233222
// find the instances that need to be removed
234-
const instancesToDeactivate = existingInstances.filter(
223+
const instancesToDeleted = existingInstances.filter(
235224
(i) => !options.environments.includes(i.environmentId)
236225
);
237226

238-
// deactivate the instances
239-
for (const instance of instancesToDeactivate) {
240-
await tx.taskScheduleInstance.update({
227+
// delete the instances no longer selected
228+
for (const instance of instancesToDeleted) {
229+
await tx.taskScheduleInstance.delete({
241230
where: {
242231
id: instance.id,
243232
},
244-
data: {
245-
active: false,
246-
},
247233
});
248234
}
249235

250-
if (scheduleHasChanged) {
251-
const registerService = new RegisterNextTaskScheduleInstanceService(tx);
236+
const registerService = new RegisterNextTaskScheduleInstanceService(tx);
237+
238+
for (const instance of newInstances) {
239+
await registerService.call(instance.id);
240+
}
252241

253-
for (const instance of existingInstances) {
242+
if (scheduleHasChanged) {
243+
for (const instance of updatingInstances) {
254244
await registerService.call(instance.id);
255245
}
256246
}
257247

248+
const instances = await tx.taskScheduleInstance.findMany({
249+
where: {
250+
taskScheduleId: scheduleRecord.id,
251+
},
252+
include: {
253+
environment: {
254+
include: {
255+
orgMember: {
256+
include: {
257+
user: true,
258+
},
259+
},
260+
},
261+
},
262+
},
263+
});
264+
258265
return { scheduleRecord, instances };
259266
}
260267

docs/v3-openapi.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
"400": {
4141
"description": "Invalid request parameters"
4242
},
43+
"422": {
44+
"description": "Unprocessable Entity"
45+
},
4346
"401": {
4447
"description": "Unauthorized"
4548
}
@@ -216,6 +219,9 @@
216219
},
217220
"404": {
218221
"description": "Resource not found"
222+
},
223+
"422": {
224+
"description": "Unprocessable Entity"
219225
}
220226
},
221227
"tags": [

packages/core/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@
7676
"superjson": "^2.2.1",
7777
"ulidx": "^2.2.1",
7878
"zod": "3.22.3",
79-
"zod-error": "1.5.0"
79+
"zod-error": "1.5.0",
80+
"zod-validation-error": "^1.5.0"
8081
},
8182
"devDependencies": {
8283
"@trigger.dev/tsconfig": "workspace:*",

packages/core/src/v3/apiClient/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { context, propagation } from "@opentelemetry/api";
2-
import { ZodFetchOptions, zodfetch } from "../../zodfetch";
2+
import { ZodFetchOptions, zodfetch } from "../zodfetch";
33
import {
44
BatchTriggerTaskRequestBody,
55
BatchTriggerTaskResponse,
@@ -25,7 +25,7 @@ export type TriggerOptions = {
2525

2626
const zodFetchOptions: ZodFetchOptions = {
2727
retry: {
28-
maxAttempts: 5,
28+
maxAttempts: 3,
2929
minTimeoutInMs: 1000,
3030
maxTimeoutInMs: 30_000,
3131
factor: 2,

0 commit comments

Comments
 (0)