Skip to content

Commit 76a0f61

Browse files
committed
v3: improve schedule reliability by ensuring next tick is always scheduled
1 parent d934feb commit 76a0f61

File tree

2 files changed

+114
-101
lines changed

2 files changed

+114
-101
lines changed

apps/webapp/app/services/worker.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -590,11 +590,11 @@ function getWorkerQueue() {
590590
},
591591
"v3.triggerScheduledTask": {
592592
priority: 0,
593-
maxAttempts: 3,
593+
maxAttempts: 3, // total delay of 30 seconds
594594
handler: async (payload, job) => {
595595
const service = new TriggerScheduledTaskService();
596596

597-
return await service.call(payload.instanceId);
597+
return await service.call(payload.instanceId, job.attempts === job.max_attempts);
598598
},
599599
},
600600
"v3.performTaskAttemptAlerts": {

apps/webapp/app/v3/services/triggerScheduledTask.server.ts

Lines changed: 112 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import { findCurrentWorkerDeployment } from "../models/workerDeployment.server";
99
import { logger } from "~/services/logger.server";
1010

1111
export class TriggerScheduledTaskService extends BaseService {
12-
public async call(instanceId: string) {
12+
public async call(instanceId: string, finalAttempt: boolean) {
13+
const registerNextService = new RegisterNextTaskScheduleInstanceService();
14+
1315
const instance = await this._prisma.taskScheduleInstance.findUnique({
1416
where: {
1517
id: instanceId,
@@ -30,124 +32,135 @@ export class TriggerScheduledTaskService extends BaseService {
3032
return;
3133
}
3234

33-
let shouldTrigger = true;
34-
35-
if (!instance.active) {
36-
shouldTrigger = false;
37-
}
35+
try {
36+
let shouldTrigger = true;
3837

39-
if (!instance.taskSchedule.active) {
40-
shouldTrigger = false;
41-
}
38+
if (!instance.active) {
39+
shouldTrigger = false;
40+
}
4241

43-
if (!instance.nextScheduledTimestamp) {
44-
shouldTrigger = false;
45-
}
42+
if (!instance.taskSchedule.active) {
43+
shouldTrigger = false;
44+
}
4645

47-
if (
48-
instance.environment.type === "DEVELOPMENT" &&
49-
(!instance.environment.currentSession || instance.environment.currentSession.disconnectedAt)
50-
) {
51-
shouldTrigger = false;
52-
}
46+
if (!instance.nextScheduledTimestamp) {
47+
shouldTrigger = false;
48+
}
5349

54-
if (instance.environment.type !== "DEVELOPMENT") {
55-
// Get the current backgroundWorker for this environment
56-
const currentWorkerDeployment = await findCurrentWorkerDeployment(instance.environment.id);
50+
if (
51+
instance.environment.type === "DEVELOPMENT" &&
52+
(!instance.environment.currentSession || instance.environment.currentSession.disconnectedAt)
53+
) {
54+
shouldTrigger = false;
55+
}
5756

58-
if (!currentWorkerDeployment) {
59-
logger.debug("No current worker deployment found, skipping task trigger", {
60-
instanceId,
61-
scheduleId: instance.taskSchedule.friendlyId,
62-
environmentId: instance.environment.id,
63-
});
57+
if (instance.environment.type !== "DEVELOPMENT") {
58+
// Get the current backgroundWorker for this environment
59+
const currentWorkerDeployment = await findCurrentWorkerDeployment(instance.environment.id);
6460

65-
shouldTrigger = false;
66-
} else if (
67-
!currentWorkerDeployment.worker ||
68-
!currentWorkerDeployment.worker.tasks.some(
69-
(t) => t.slug === instance.taskSchedule.taskIdentifier
70-
)
71-
) {
72-
logger.debug(
73-
"Current worker deployment does not contain the scheduled task identifier, skipping task trigger",
74-
{
61+
if (!currentWorkerDeployment) {
62+
logger.debug("No current worker deployment found, skipping task trigger", {
7563
instanceId,
7664
scheduleId: instance.taskSchedule.friendlyId,
7765
environmentId: instance.environment.id,
78-
workerDeploymentId: currentWorkerDeployment.id,
79-
workerId: currentWorkerDeployment.worker?.id,
80-
taskIdentifier: instance.taskSchedule.taskIdentifier,
81-
}
66+
});
67+
68+
shouldTrigger = false;
69+
} else if (
70+
!currentWorkerDeployment.worker ||
71+
!currentWorkerDeployment.worker.tasks.some(
72+
(t) => t.slug === instance.taskSchedule.taskIdentifier
73+
)
74+
) {
75+
logger.debug(
76+
"Current worker deployment does not contain the scheduled task identifier, skipping task trigger",
77+
{
78+
instanceId,
79+
scheduleId: instance.taskSchedule.friendlyId,
80+
environmentId: instance.environment.id,
81+
workerDeploymentId: currentWorkerDeployment.id,
82+
workerId: currentWorkerDeployment.worker?.id,
83+
taskIdentifier: instance.taskSchedule.taskIdentifier,
84+
}
85+
);
86+
87+
shouldTrigger = false;
88+
}
89+
}
90+
91+
if (shouldTrigger) {
92+
// Enqueue triggering the task
93+
const triggerTask = new TriggerTaskService();
94+
95+
const payload = {
96+
scheduleId: instance.taskSchedule.friendlyId,
97+
timestamp: instance.nextScheduledTimestamp,
98+
lastTimestamp: instance.lastScheduledTimestamp ?? undefined,
99+
externalId: instance.taskSchedule.externalId ?? undefined,
100+
timezone: instance.taskSchedule.timezone,
101+
upcoming: nextScheduledTimestamps(
102+
instance.taskSchedule.generatorExpression,
103+
instance.taskSchedule.timezone,
104+
instance.nextScheduledTimestamp!,
105+
10
106+
),
107+
};
108+
109+
const payloadPacket = await stringifyIO(payload);
110+
111+
logger.debug("Triggering scheduled task", {
112+
instance,
113+
payloadPacket,
114+
});
115+
116+
const run = await triggerTask.call(
117+
instance.taskSchedule.taskIdentifier,
118+
instance.environment,
119+
{ payload: payloadPacket.data, options: { payloadType: payloadPacket.dataType } },
120+
{ customIcon: "scheduled" }
82121
);
83122

84-
shouldTrigger = false;
123+
if (!run) {
124+
logger.error("Failed to trigger task", {
125+
instanceId,
126+
scheduleId: instance.taskSchedule.friendlyId,
127+
payloadPacket,
128+
});
129+
} else {
130+
await this._prisma.taskRun.update({
131+
where: {
132+
id: run.id,
133+
},
134+
data: {
135+
scheduleId: instance.taskSchedule.id,
136+
scheduleInstanceId: instance.id,
137+
},
138+
});
139+
}
85140
}
86-
}
87141

88-
const registerNextService = new RegisterNextTaskScheduleInstanceService();
89-
90-
if (shouldTrigger) {
91-
// Enqueue triggering the task
92-
const triggerTask = new TriggerTaskService();
93-
94-
const payload = {
95-
scheduleId: instance.taskSchedule.friendlyId,
96-
timestamp: instance.nextScheduledTimestamp,
97-
lastTimestamp: instance.lastScheduledTimestamp ?? undefined,
98-
externalId: instance.taskSchedule.externalId ?? undefined,
99-
timezone: instance.taskSchedule.timezone,
100-
upcoming: nextScheduledTimestamps(
101-
instance.taskSchedule.generatorExpression,
102-
instance.taskSchedule.timezone,
103-
instance.nextScheduledTimestamp!,
104-
10
105-
),
106-
};
107-
108-
const payloadPacket = await stringifyIO(payload);
109-
110-
logger.debug("Triggering scheduled task", {
111-
instance,
112-
payloadPacket,
142+
await this._prisma.taskScheduleInstance.update({
143+
where: {
144+
id: instanceId,
145+
},
146+
data: {
147+
lastScheduledTimestamp: instance.nextScheduledTimestamp,
148+
},
113149
});
114150

115-
const run = await triggerTask.call(
116-
instance.taskSchedule.taskIdentifier,
117-
instance.environment,
118-
{ payload: payloadPacket.data, options: { payloadType: payloadPacket.dataType } },
119-
{ customIcon: "scheduled" }
120-
);
121-
122-
if (!run) {
123-
logger.error("Failed to trigger task", {
151+
await registerNextService.call(instanceId);
152+
} catch (e) {
153+
if (finalAttempt) {
154+
logger.error("Failed to trigger scheduled task, rescheduling the next run", {
124155
instanceId,
125-
scheduleId: instance.taskSchedule.friendlyId,
126-
payloadPacket,
156+
error: e,
127157
});
158+
159+
await registerNextService.call(instanceId);
128160
} else {
129-
await this._prisma.taskRun.update({
130-
where: {
131-
id: run.id,
132-
},
133-
data: {
134-
scheduleId: instance.taskSchedule.id,
135-
scheduleInstanceId: instance.id,
136-
},
137-
});
161+
throw e;
138162
}
139163
}
140-
141-
await this._prisma.taskScheduleInstance.update({
142-
where: {
143-
id: instanceId,
144-
},
145-
data: {
146-
lastScheduledTimestamp: instance.nextScheduledTimestamp,
147-
},
148-
});
149-
150-
await registerNextService.call(instanceId);
151164
}
152165

153166
public static async enqueue(instanceId: string, runAt: Date, tx?: PrismaClientOrTransaction) {

0 commit comments

Comments
 (0)