Skip to content

Commit 3ce5397

Browse files
authored
Implement the disable job feature backend when indexing endpoints (#382)
1 parent ee79ab3 commit 3ce5397

File tree

25 files changed

+511
-409
lines changed

25 files changed

+511
-409
lines changed

.changeset/calm-crabs-kick.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/cli": patch
3+
---
4+
5+
Added the send-event command

.changeset/honest-dingos-tap.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Support disabling jobs using the `enabled` flag

apps/webapp/app/services/endpoints/indexEndpoint.server.ts

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import { logger } from "../logger.server";
77
import { RegisterSourceService } from "../sources/registerSource.server";
88
import { RegisterDynamicScheduleService } from "../triggers/registerDynamicSchedule.server";
99
import { RegisterDynamicTriggerService } from "../triggers/registerDynamicTrigger.server";
10+
import { DisableJobService } from "../jobs/disableJob.server";
1011

1112
export class IndexEndpointService {
1213
#prismaClient: PrismaClient;
1314
#registerJobService = new RegisterJobService();
15+
#disableJobService = new DisableJobService();
1416
#registerSourceService = new RegisterSourceService();
1517
#registerDynamicTriggerService = new RegisterDynamicTriggerService();
1618
#registerDynamicScheduleService = new RegisterDynamicScheduleService();
@@ -57,23 +59,95 @@ export class IndexEndpointService {
5759
sources: 0,
5860
dynamicTriggers: 0,
5961
dynamicSchedules: 0,
62+
disabledJobs: 0,
6063
};
6164

65+
const existingJobs = await this.#prismaClient.job.findMany({
66+
where: {
67+
projectId: endpoint.projectId,
68+
},
69+
include: {
70+
aliases: {
71+
where: {
72+
name: "latest",
73+
environmentId: endpoint.environmentId,
74+
},
75+
include: {
76+
version: true,
77+
},
78+
take: 1,
79+
},
80+
},
81+
});
82+
6283
for (const job of jobs) {
6384
if (!job.enabled) {
64-
continue;
85+
const disabledJob = await this.#disableJobService
86+
.call(endpoint, { slug: job.id, version: job.version })
87+
.catch((error) => {
88+
logger.error("Failed to disable job", {
89+
endpointId: endpoint.id,
90+
job,
91+
error,
92+
});
93+
94+
return;
95+
});
96+
97+
if (disabledJob) {
98+
indexStats.disabledJobs++;
99+
}
100+
} else {
101+
try {
102+
await this.#registerJobService.call(endpoint, job);
103+
104+
indexStats.jobs++;
105+
} catch (error) {
106+
logger.error("Failed to register job", {
107+
endpointId: endpoint.id,
108+
job,
109+
error,
110+
});
111+
}
65112
}
113+
}
66114

67-
try {
68-
await this.#registerJobService.call(endpoint, job);
115+
// TODO: we need to do this for sources, dynamic triggers, and dynamic schedules
116+
const missingJobs = existingJobs.filter((job) => {
117+
return !jobs.find((j) => j.id === job.slug);
118+
});
69119

70-
indexStats.jobs++;
71-
} catch (error) {
72-
logger.error("Failed to register job", {
73-
endpointId: endpoint.id,
74-
job,
75-
error,
76-
});
120+
if (missingJobs.length > 0) {
121+
logger.debug("Disabling missing jobs", {
122+
endpointId: endpoint.id,
123+
missingJobIds: missingJobs.map((job) => job.slug),
124+
});
125+
126+
for (const job of missingJobs) {
127+
const latestVersion = job.aliases[0]?.version;
128+
129+
if (!latestVersion) {
130+
continue;
131+
}
132+
133+
const disabledJob = await this.#disableJobService
134+
.call(endpoint, {
135+
slug: job.slug,
136+
version: latestVersion.version,
137+
})
138+
.catch((error) => {
139+
logger.error("Failed to disable job", {
140+
endpointId: endpoint.id,
141+
job,
142+
error,
143+
});
144+
145+
return;
146+
});
147+
148+
if (disabledJob) {
149+
indexStats.disabledJobs++;
150+
}
77151
}
78152
}
79153

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import type { JobVersion } from "@trigger.dev/database";
2+
import type { PrismaClient } from "~/db.server";
3+
import { prisma } from "~/db.server";
4+
import { ExtendedEndpoint, findEndpoint } from "~/models/endpoint.server";
5+
import type { AuthenticatedEnvironment } from "../apiAuth.server";
6+
import { DisableScheduleSourceService } from "../schedules/disableScheduleSource.server";
7+
8+
export type DisableJobServiceOptions = {
9+
slug: string;
10+
version: string;
11+
};
12+
13+
export class DisableJobService {
14+
#prismaClient: PrismaClient;
15+
16+
constructor(prismaClient: PrismaClient = prisma) {
17+
this.#prismaClient = prismaClient;
18+
}
19+
20+
public async call(
21+
endpointIdOrEndpoint: string | ExtendedEndpoint,
22+
options: DisableJobServiceOptions
23+
) {
24+
const endpoint =
25+
typeof endpointIdOrEndpoint === "string"
26+
? await findEndpoint(endpointIdOrEndpoint)
27+
: endpointIdOrEndpoint;
28+
29+
return this.#disableJob(endpoint.environment, options);
30+
}
31+
32+
async #disableJob(
33+
environment: AuthenticatedEnvironment,
34+
options: DisableJobServiceOptions
35+
): Promise<JobVersion | undefined> {
36+
// Find the job
37+
const job = await this.#prismaClient.job.findUnique({
38+
where: {
39+
projectId_slug: {
40+
projectId: environment.projectId,
41+
slug: options.slug,
42+
},
43+
},
44+
});
45+
46+
if (!job) {
47+
return;
48+
}
49+
50+
const jobVersion = await this.#prismaClient.jobVersion.findUnique({
51+
where: {
52+
jobId_version_environmentId: {
53+
jobId: job.id,
54+
version: options.version,
55+
environmentId: environment.id,
56+
},
57+
},
58+
});
59+
60+
if (!jobVersion) {
61+
return;
62+
}
63+
64+
if (jobVersion.status === "DISABLED") {
65+
return;
66+
}
67+
68+
// Upsert the JobVersion
69+
const updatedJobVersion = await this.#prismaClient.jobVersion.update({
70+
where: {
71+
id: jobVersion.id,
72+
},
73+
data: {
74+
status: "DISABLED",
75+
},
76+
});
77+
78+
await this.#disableEventDispatcher(updatedJobVersion);
79+
80+
return updatedJobVersion;
81+
}
82+
83+
async #disableEventDispatcher(jobVersion: JobVersion) {
84+
const eventDispatcher = await this.#prismaClient.eventDispatcher.update({
85+
where: {
86+
dispatchableId_environmentId: {
87+
dispatchableId: jobVersion.jobId,
88+
environmentId: jobVersion.environmentId,
89+
},
90+
},
91+
data: {
92+
enabled: false,
93+
},
94+
});
95+
96+
const service = new DisableScheduleSourceService();
97+
98+
await service.call({
99+
key: jobVersion.jobId,
100+
dispatcher: eventDispatcher,
101+
});
102+
}
103+
}

apps/webapp/app/services/jobs/registerJob.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,10 @@ export class RegisterJobService {
236236
eventSpecification,
237237
preprocessRuns: metadata.preprocessRuns,
238238
startPosition: "LATEST",
239+
status: "ACTIVE",
239240
},
240241
update: {
242+
status: "ACTIVE",
241243
startPosition: "LATEST",
242244
eventSpecification,
243245
preprocessRuns: metadata.preprocessRuns,
@@ -401,6 +403,7 @@ export class RegisterJobService {
401403
type: "JOB_VERSION",
402404
id: jobVersion.id,
403405
},
406+
enabled: true,
404407
},
405408
});
406409

@@ -444,6 +447,7 @@ export class RegisterJobService {
444447
type: "JOB_VERSION",
445448
id: jobVersion.id,
446449
},
450+
enabled: true,
447451
},
448452
});
449453

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import type { EventDispatcher } from "@trigger.dev/database";
2+
import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server";
3+
import { workerQueue } from "../worker.server";
4+
5+
export class DisableScheduleSourceService {
6+
#prismaClient: PrismaClientOrTransaction;
7+
8+
constructor(prismaClient: PrismaClientOrTransaction = prisma) {
9+
this.#prismaClient = prismaClient;
10+
}
11+
12+
public async call({ key, dispatcher }: { key: string; dispatcher: EventDispatcher }) {
13+
const scheduleSourceExists = await this.#prismaClient.scheduleSource.findUnique({
14+
where: {
15+
key_environmentId: {
16+
key,
17+
environmentId: dispatcher.environmentId,
18+
},
19+
},
20+
});
21+
22+
if (!scheduleSourceExists) {
23+
return;
24+
}
25+
26+
return await $transaction(this.#prismaClient, async (tx) => {
27+
const scheduleSource = await this.#prismaClient.scheduleSource.update({
28+
where: {
29+
key_environmentId: {
30+
key,
31+
environmentId: dispatcher.environmentId,
32+
},
33+
},
34+
data: {
35+
active: false,
36+
},
37+
});
38+
39+
await workerQueue.dequeue(`scheduled:${scheduleSource.id}`, { tx });
40+
41+
return scheduleSource;
42+
});
43+
}
44+
}

apps/webapp/app/services/schedules/nextScheduledEvent.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ export class NextScheduledEventService {
5656
},
5757
{
5858
runAt: scheduleTime,
59-
queueName: `scheduler:${scheduleSource.environmentId}`,
6059
tx,
6160
jobKey: `scheduled:${scheduleSource.id}`,
6261
}

apps/webapp/app/services/schedules/registerScheduleSource.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ export class RegisterScheduleSourceService {
7070
},
7171
metadata: schedule.metadata ?? {},
7272
externalAccountId: externalAccount ? externalAccount.id : undefined,
73+
active: environment.autoEnableInternalSources,
7374
},
7475
});
7576

apps/webapp/app/services/schedules/unregisterSchedule.server.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
import { RegisterScheduleBody } from "@trigger.dev/core";
2-
import { $transaction, PrismaClient } from "~/db.server";
3-
import { prisma } from "~/db.server";
1+
import { PrismaClient, prisma } from "~/db.server";
42
import { AuthenticatedEnvironment } from "../apiAuth.server";
5-
import { RegisterScheduleSourceService } from "./registerScheduleSource.server";
63

74
export class UnregisterScheduleService {
85
#prismaClient: PrismaClient;

docs/documentation/guides/cli.mdx

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,23 @@ yarn dlx @trigger.dev/cli@latest whoami
149149
```
150150

151151
</CodeGroup>
152+
153+
## send-event Command
154+
155+
The `send-event` command will send an event to your Trigger.dev project. This is useful for testing your Trigger.dev project locally.
156+
157+
<CodeGroup>
158+
159+
```bash npm
160+
npx @trigger.dev/cli@latest send-event -n "event.name" -p "{ \"key\": \"value\" }"
161+
```
162+
163+
```bash pnpm
164+
pnpm dlx @trigger.dev/cli@latest send-event -n "event.name" -p "{ \"key\": \"value\" }"
165+
```
166+
167+
```bash yarn
168+
yarn dlx @trigger.dev/cli@latest send-event -n "event.name" -p "{ \"key\": \"value\" }"
169+
```
170+
171+
</CodeGroup>

examples/job-catalog/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"supabase": "nodemon --watch src/supabase.ts -r tsconfig-paths/register -r dotenv/config src/supabase.ts",
1111
"supabase:types": "npx supabase gen types typescript --project-id $SUPABASE_PROJECT_ID --schema public --schema auth --schema storage > src/supabase-types.ts",
1212
"events": "nodemon --watch src/events.ts -r tsconfig-paths/register -r dotenv/config src/events.ts",
13+
"schedules": "nodemon --watch src/schedules.ts -r tsconfig-paths/register -r dotenv/config src/schedules.ts",
1314
"stressTest": "nodemon --watch src/stressTest.ts -r tsconfig-paths/register -r dotenv/config src/stressTest.ts",
1415
"delays": "nodemon --watch src/delays.ts -r tsconfig-paths/register -r dotenv/config src/delays.ts",
1516
"dev:trigger": "trigger-cli dev --port 8080"

examples/job-catalog/src/events.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ client.defineJob({
1313
id: "event-example-1",
1414
name: "Event Example 1",
1515
version: "1.0.0",
16+
enabled: true,
1617
trigger: eventTrigger({
1718
name: "event.example",
1819
}),

0 commit comments

Comments
 (0)