Skip to content

Commit 4a0cbbf

Browse files
committed
Skip triggering scheduled tasks if the task isn’t found in the current deployment
Also fixes an issue when editing the environments of a schedule
1 parent fce2f38 commit 4a0cbbf

File tree

2 files changed

+69
-26
lines changed

2 files changed

+69
-26
lines changed

apps/webapp/app/v3/services/triggerScheduledTask.server.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ import { BaseService } from "./baseService.server";
33
import { workerQueue } from "~/services/worker.server";
44
import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server";
55
import { TriggerTaskService } from "./triggerTask.server";
6-
import { logger, stringifyIO } from "@trigger.dev/core/v3";
6+
import { stringifyIO } from "@trigger.dev/core/v3";
77
import { nextScheduledTimestamps } from "../utils/calculateNextSchedule.server";
8+
import { findCurrentWorkerDeployment } from "../models/workerDeployment.server";
9+
import { logger } from "~/services/logger.server";
810

911
export class TriggerScheduledTaskService extends BaseService {
1012
public async call(instanceId: string) {
@@ -49,6 +51,40 @@ export class TriggerScheduledTaskService extends BaseService {
4951
shouldTrigger = false;
5052
}
5153

54+
if (instance.environment.type !== "DEVELOPMENT") {
55+
// Get the current backgroundWorker for this environment
56+
const currentWorkerDeployment = await findCurrentWorkerDeployment(instance.environment.id);
57+
58+
if (!currentWorkerDeployment) {
59+
logger.debug("No current worker deployment found, skipping task trigger", {
60+
instanceId,
61+
scheduleId: instance.taskSchedule.friendlyId,
62+
environmentId: instance.environment.id,
63+
});
64+
65+
shouldTrigger = false;
66+
} else if (
67+
!currentWorkerDeployment.worker ||
68+
!currentWorkerDeployment.worker.tasks.some(
69+
(t) => t.id === instance.taskSchedule.taskIdentifier
70+
)
71+
) {
72+
logger.debug(
73+
"Current worker deployment does not contain the scheduled task identifier, skipping task trigger",
74+
{
75+
instanceId,
76+
scheduleId: instance.taskSchedule.friendlyId,
77+
environmentId: instance.environment.id,
78+
workerDeploymentId: currentWorkerDeployment.id,
79+
workerId: currentWorkerDeployment.worker?.id,
80+
taskIdentifier: instance.taskSchedule.taskIdentifier,
81+
}
82+
);
83+
84+
shouldTrigger = false;
85+
}
86+
}
87+
5288
const registerNextService = new RegisterNextTaskScheduleInstanceService();
5389

5490
if (shouldTrigger) {

apps/webapp/app/v3/services/upsertTaskSchedule.server.ts

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -186,26 +186,15 @@ export class UpsertTaskScheduleService extends BaseService {
186186
});
187187

188188
// create the new instances
189-
let instances: InstanceWithEnvironment[] = [];
189+
const newInstances: InstanceWithEnvironment[] = [];
190+
const updatingInstances: InstanceWithEnvironment[] = [];
190191

191192
for (const environmentId of options.environments) {
192193
const existingInstance = existingInstances.find((i) => i.environmentId === environmentId);
193194

194195
if (existingInstance) {
195-
if (!existingInstance.active) {
196-
// If the instance is not active, we need to activate it
197-
await tx.taskScheduleInstance.update({
198-
where: {
199-
id: existingInstance.id,
200-
},
201-
data: {
202-
active: true,
203-
},
204-
});
205-
}
206-
207196
// Update the existing instance
208-
instances.push({ ...existingInstance, active: true });
197+
updatingInstances.push(existingInstance);
209198
} else {
210199
// Create a new instance
211200
const instance = await tx.taskScheduleInstance.create({
@@ -226,35 +215,53 @@ export class UpsertTaskScheduleService extends BaseService {
226215
},
227216
});
228217

229-
instances.push(instance);
218+
newInstances.push(instance);
230219
}
231220
}
232221

233222
// find the instances that need to be removed
234-
const instancesToDeactivate = existingInstances.filter(
223+
const instancesToDeleted = existingInstances.filter(
235224
(i) => !options.environments.includes(i.environmentId)
236225
);
237226

238-
// deactivate the instances
239-
for (const instance of instancesToDeactivate) {
240-
await tx.taskScheduleInstance.update({
227+
// delete the instances no longer selected
228+
for (const instance of instancesToDeleted) {
229+
await tx.taskScheduleInstance.delete({
241230
where: {
242231
id: instance.id,
243232
},
244-
data: {
245-
active: false,
246-
},
247233
});
248234
}
249235

250-
if (scheduleHasChanged) {
251-
const registerService = new RegisterNextTaskScheduleInstanceService(tx);
236+
const registerService = new RegisterNextTaskScheduleInstanceService(tx);
237+
238+
for (const instance of newInstances) {
239+
await registerService.call(instance.id);
240+
}
252241

253-
for (const instance of existingInstances) {
242+
if (scheduleHasChanged) {
243+
for (const instance of updatingInstances) {
254244
await registerService.call(instance.id);
255245
}
256246
}
257247

248+
const instances = await tx.taskScheduleInstance.findMany({
249+
where: {
250+
taskScheduleId: scheduleRecord.id,
251+
},
252+
include: {
253+
environment: {
254+
include: {
255+
orgMember: {
256+
include: {
257+
user: true,
258+
},
259+
},
260+
},
261+
},
262+
},
263+
});
264+
258265
return { scheduleRecord, instances };
259266
}
260267

0 commit comments

Comments
 (0)