Skip to content

perf: migrate to graphile worker v0.16.6 #1097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ Worker.init().catch((error) => {

function logError(error: unknown, request?: Request) {
console.error(error);

if (error instanceof Error && error.message.startsWith("There are locked jobs present")) {
console.log("⚠️ graphile-worker migration issue detected!");
}
}

const sqsEventConsumer = singleton("sqsEventConsumer", getSharedSqsEventConsumer);
Expand Down
81 changes: 61 additions & 20 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import type {
CronItem,
CronItemOptions,
Job as GraphileJob,
DbJob as GraphileJob,
Runner as GraphileRunner,
JobHelpers,
RunnerOptions,
Task,
TaskList,
TaskSpec,
WorkerUtils,
} from "graphile-worker";
import { run as graphileRun, parseCronItems } from "graphile-worker";
import { run as graphileRun, makeWorkerUtils, parseCronItems } from "graphile-worker";
import { SpanKind, trace } from "@opentelemetry/api";

import omit from "lodash.omit";
import { z } from "zod";
import { PrismaClient, PrismaClientOrTransaction } from "~/db.server";
import { $replica, PrismaClient, PrismaClientOrTransaction } from "~/db.server";
import { PgListenService } from "~/services/db/pgListen.server";
import { workerLogger as logger } from "~/services/logger.server";
import { flattenAttributes } from "@trigger.dev/core/v3";
Expand All @@ -34,8 +35,8 @@ const RawCronPayloadSchema = z.object({

const GraphileJobSchema = z.object({
id: z.coerce.string(),
queue_name: z.string().nullable(),
task_identifier: z.string(),
job_queue_id: z.number().nullable(),
task_id: z.number(),
payload: z.unknown(),
priority: z.number(),
run_at: z.coerce.date(),
Expand Down Expand Up @@ -72,7 +73,7 @@ type RecurringTaskPayload = {

export type ZodRecurringTasks = {
[key: string]: {
pattern: string;
match: string;
options?: CronItemOptions;
handler: (payload: RecurringTaskPayload, job: GraphileJob) => Promise<void>;
};
Expand Down Expand Up @@ -129,6 +130,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
#rateLimiter?: ZodWorkerRateLimiter;
#shutdownTimeoutInMs?: number;
#shuttingDown = false;
#workerUtils?: WorkerUtils;

constructor(options: ZodWorkerOptions<TMessageCatalog>) {
this.#name = options.name;
Expand Down Expand Up @@ -158,6 +160,8 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

const parsedCronItems = parseCronItems(this.#createCronItemsFromRecurringTasks());

this.#workerUtils = await makeWorkerUtils(this.#runnerOptions);

this.#runner = await graphileRun({
...this.#runnerOptions,
noHandleSignals: true,
Expand Down Expand Up @@ -188,7 +192,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
this.#logDebug("Detected incoming migration", { latestMigration });

if (latestMigration > 10) {
// already migrated past v0.14 - nothing to do
this.#logDebug("Already migrated past v0.14 - nothing to do", { latestMigration });
return;
}

Expand Down Expand Up @@ -263,6 +267,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

public async stop() {
await this.#runner?.stop();
await this.#workerUtils?.release();
}

public async enqueue<K extends keyof TMessageCatalog>(
Expand Down Expand Up @@ -442,12 +447,29 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
return taskList;
}

async #getQueueName(queueId: number | null) {
if (queueId === null) {
return;
}

const schema = z.array(z.object({ queue_name: z.string() }));

const rawQueueNameResults = await $replica.$queryRawUnsafe(
`SELECT queue_name FROM ${this.graphileWorkerSchema}._private_job_queues WHERE id = $1`,
queueId
);

const queueNameResults = schema.parse(rawQueueNameResults);

return queueNameResults[0]?.queue_name;
}

async #rescheduleTask(payload: unknown, helpers: JobHelpers) {
this.#logDebug("Rescheduling task", { payload, job: helpers.job });

await this.enqueue(helpers.job.task_identifier, payload, {
runAt: helpers.job.run_at,
queueName: helpers.job.queue_name ?? undefined,
queueName: await this.#getQueueName(helpers.job.job_queue_id),
priority: helpers.job.priority,
jobKey: helpers.job.key ?? undefined,
flags: Object.keys(helpers.job.flags ?? []),
Expand All @@ -460,7 +482,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

if (this.#cleanup) {
cronItems.push({
pattern: this.#cleanup.frequencyExpression,
match: this.#cleanup.frequencyExpression,
identifier: CLEANUP_TASK_NAME,
task: CLEANUP_TASK_NAME,
options: this.#cleanup.taskOptions,
Expand All @@ -469,7 +491,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

if (this.#reporter) {
cronItems.push({
pattern: "50 * * * *", // Every hour at 50 minutes past the hour
match: "50 * * * *", // Every hour at 50 minutes past the hour
identifier: REPORTER_TASK_NAME,
task: REPORTER_TASK_NAME,
});
Expand All @@ -481,7 +503,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {

for (const [key, task] of Object.entries(this.#recurringTasks)) {
const cronItem: CronItem = {
pattern: task.pattern,
match: task.match,
identifier: key,
task: key,
options: task.options,
Expand Down Expand Up @@ -529,7 +551,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
attributes: {
"job.task_identifier": job.task_identifier,
"job.id": job.id,
...(job.queue_name ? { "job.queue_name": job.queue_name } : {}),
...(job.job_queue_id ? { "job.queue_id": job.job_queue_id } : {}),
...flattenAttributes(job.payload as Record<string, unknown>, "job.payload"),
"job.priority": job.priority,
"job.run_at": job.run_at.toISOString(),
Expand Down Expand Up @@ -599,7 +621,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
attributes: {
"job.task_identifier": job.task_identifier,
"job.id": job.id,
...(job.queue_name ? { "job.queue_name": job.queue_name } : {}),
...(job.job_queue_id ? { "job.queue_id": job.job_queue_id } : {}),
...flattenAttributes(job.payload as Record<string, unknown>, "job.payload"),
"job.priority": job.priority,
"job.run_at": job.run_at.toISOString(),
Expand Down Expand Up @@ -638,6 +660,10 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
return;
}

if (!this.#workerUtils) {
throw new Error("WorkerUtils need to be initialized before running job cleanup.");
}

const job = helpers.job;

logger.debug("Received cleanup task", {
Expand All @@ -663,23 +689,38 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
payload,
});

const rawResults = await this.#prisma.$queryRawUnsafe(
`WITH rows AS (SELECT id FROM ${this.graphileWorkerSchema}.jobs WHERE run_at < $1 AND locked_at IS NULL AND max_attempts = attempts LIMIT $2 FOR UPDATE) DELETE FROM ${this.graphileWorkerSchema}.jobs WHERE id IN (SELECT id FROM rows) RETURNING id`,
const rawResults = await $replica.$queryRawUnsafe(
`SELECT id
FROM ${this.graphileWorkerSchema}.jobs
WHERE run_at < $1
AND locked_at IS NULL
AND max_attempts = attempts
LIMIT $2`,
expirationDate,
this.#cleanup.maxCount
);

const results = Array.isArray(rawResults) ? rawResults : [];
const results = z
.array(
z.object({
id: z.coerce.string(),
})
)
.parse(rawResults);

const completedJobs = await this.#workerUtils.completeJobs(results.map((job) => job.id));

logger.debug("Cleaned up old jobs", {
count: results.length,
found: results.length,
deleted: completedJobs.length,
expirationDate,
payload,
});

if (this.#reporter) {
await this.#reporter("cleanup_stats", {
count: results.length,
found: results.length,
deleted: completedJobs.length,
expirationDate,
ts: payload._cron.ts,
});
Expand Down Expand Up @@ -711,7 +752,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
const schema = z.array(z.object({ count: z.coerce.number() }));

// Count the number of jobs that have been added since the startAt date and before the payload._cron.ts date
const rawAddedResults = await this.#prisma.$queryRawUnsafe(
const rawAddedResults = await $replica.$queryRawUnsafe(
`SELECT COUNT(*) FROM ${this.graphileWorkerSchema}.jobs WHERE created_at > $1 AND created_at < $2`,
startAt,
payload._cron.ts
Expand All @@ -720,7 +761,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
const addedCountResults = schema.parse(rawAddedResults)[0];

// Count the total number of jobs in the jobs table
const rawTotalResults = await this.#prisma.$queryRawUnsafe(
const rawTotalResults = await $replica.$queryRawUnsafe(
`SELECT COUNT(*) FROM ${this.graphileWorkerSchema}.jobs`
);

Expand Down
95 changes: 95 additions & 0 deletions apps/webapp/app/services/db/graphileMigrationHelper.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { runMigrations } from "graphile-worker";
import { PrismaClient, prisma } from "~/db.server";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { PgNotifyService } from "./pgNotify.server";
import { z } from "zod";

export class GraphileMigrationHelperService {
#prismaClient: PrismaClient;

constructor(prismaClient: PrismaClient = prisma) {
this.#prismaClient = prismaClient;
}

public async call() {
this.#logDebug("GraphileMigrationHelperService.call");

await this.#detectAndPrepareForMigrations();

await runMigrations({
connectionString: env.DATABASE_URL,
schema: env.WORKER_SCHEMA,
});
}

#logDebug(message: string, args?: any) {
logger.debug(`[migrationHelper] ${message}`, args);
}

async #getLatestMigration() {
const migrationQueryResult = await this.#prismaClient.$queryRawUnsafe(`
SELECT id FROM ${env.WORKER_SCHEMA}.migrations
ORDER BY id DESC LIMIT 1
`);

const MigrationQueryResultSchema = z.array(z.object({ id: z.number() }));

const migrationResults = MigrationQueryResultSchema.parse(migrationQueryResult);

if (!migrationResults.length) {
// no migrations applied yet
return -1;
}

return migrationResults[0].id;
}

async #graphileSchemaExists() {
const schemaCount = await this.#prismaClient.$executeRaw`
SELECT schema_name FROM information_schema.schemata
WHERE schema_name = ${env.WORKER_SCHEMA}
`;

return schemaCount === 1;
}

/** Helper for graphile-worker v0.14.0 migration. No-op if already migrated. */
async #detectAndPrepareForMigrations() {
if (!(await this.#graphileSchemaExists())) {
// no schema yet, likely first start
return;
}

const latestMigration = await this.#getLatestMigration();

if (latestMigration < 0) {
// no migrations found
return;
}

// the first v0.14.0 migration has ID 11
if (latestMigration > 10) {
// already migrated
return;
}

// add 15s to graceful shutdown timeout, just to be safe
const migrationDelayInMs = env.GRACEFUL_SHUTDOWN_TIMEOUT + 15000;

this.#logDebug("Delaying worker startup due to pending migration", {
latestMigration,
migrationDelayInMs,
});

console.log(`⚠️ detected pending graphile migration`);
console.log(`⚠️ notifying running workers`);

const pgNotify = new PgNotifyService();
await pgNotify.call("trigger:graphile:migrate", { latestMigration });

console.log(`⚠️ delaying worker startup by ${migrationDelayInMs}ms`);

await new Promise((resolve) => setTimeout(resolve, migrationDelayInMs));
}
}
12 changes: 6 additions & 6 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { TriggerScheduledTaskService } from "~/v3/services/triggerScheduledTask.
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
import { GraphileMigrationHelperService } from "./db/graphileMigrationHelper.server";

const workerCatalog = {
indexEndpoint: z.object({
Expand Down Expand Up @@ -211,9 +212,8 @@ if (env.NODE_ENV === "production") {
}

export async function init() {
// const pgNotify = new PgNotifyService();
// await pgNotify.call("trigger:graphile:migrate", { latestMigration: 10 });
// await new Promise((resolve) => setTimeout(resolve, 10000))
const migrationHelper = new GraphileMigrationHelperService();
await migrationHelper.call();

if (env.WORKER_ENABLED === "true") {
await workerQueue.initialize();
Expand Down Expand Up @@ -250,7 +250,7 @@ function getWorkerQueue() {
recurringTasks: {
// Run this every 5 minutes
autoIndexProductionEndpoints: {
pattern: "*/5 * * * *",
match: "*/5 * * * *",
handler: async (payload, job) => {
const service = new RecurringEndpointIndexService();

Expand All @@ -259,7 +259,7 @@ function getWorkerQueue() {
},
// Run this every hour
purgeOldIndexings: {
pattern: "0 * * * *",
match: "0 * * * *",
handler: async (payload, job) => {
// Delete indexings that are older than 7 days
await prisma.endpointIndex.deleteMany({
Expand All @@ -273,7 +273,7 @@ function getWorkerQueue() {
},
// Run this every hour at the 13 minute mark
purgeOldTaskEvents: {
pattern: "47 * * * *",
match: "47 * * * *",
handler: async (payload, job) => {
await eventRepository.truncateEvents();
},
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
"evt": "^2.4.13",
"express": "^4.18.1",
"framer-motion": "^10.12.11",
"graphile-worker": "^0.13.0",
"graphile-worker": "0.16.6",
"highlight.run": "^7.3.4",
"humanize-duration": "^3.27.3",
"intl-parse-accept-language": "^1.0.0",
Expand Down
Loading