Skip to content

v2: Graphile auto-cleanup and auto-endpoint disabling #1103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ const EnvironmentSchema = z.object({

ORG_SLACK_INTEGRATION_CLIENT_ID: z.string().optional(),
ORG_SLACK_INTEGRATION_CLIENT_SECRET: z.string().optional(),

MAX_SEQUENTIAL_INDEX_FAILURE_COUNT: z.coerce.number().default(96),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,17 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
span.recordException(new Error(String(error)));
}

if (job.attempts >= job.max_attempts) {
logger.error("Job failed after max attempts", {
job,
attempts: job.attempts,
max_attempts: job.max_attempts,
error: error instanceof Error ? error.message : error,
});

return;
}

throw error;
} finally {
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export class TriggerEndpointIndexHookService {
},
});

if (!endpoint) {
if (!endpoint || !endpoint.url) {
throw new Error("Endpoint not found");
}

Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/services/apiRateLimit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({
pathMatchers: [/^\/api/],
// Allow /api/v1/tasks/:id/callback/:secret
pathWhiteList: [
"/api/internal/stripe_webhooks",
"/api/v1/authorization-code",
"/api/v1/token",
/^\/api\/v1\/tasks\/[^\/]+\/callback\/[^\/]+$/, // /api/v1/tasks/$id/callback/$secret
Expand Down
154 changes: 124 additions & 30 deletions apps/webapp/app/services/endpoints/performEndpointIndexService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import { IndexEndpointStats } from "@trigger.dev/core";
import { RegisterHttpEndpointService } from "../triggers/registerHttpEndpoint.server";
import { RegisterWebhookService } from "../triggers/registerWebhook.server";
import { EndpointIndex } from "@trigger.dev/database";
import { env } from "~/env.server";

const MAX_SEQUENTIAL_FAILURE_COUNT = env.MAX_SEQUENTIAL_INDEX_FAILURE_COUNT;

export class PerformEndpointIndexService {
#prismaClient: PrismaClient;
Expand Down Expand Up @@ -56,9 +59,16 @@ export class PerformEndpointIndexService {

if (!endpointIndex.endpoint.url) {
logger.debug("Endpoint URL is not set", endpointIndex);
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: "Endpoint URL is not set",
});

return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: "Endpoint URL is not set",
},
false
);
}

// Make a request to the endpoint to fetch a list of jobs
Expand All @@ -69,9 +79,15 @@ export class PerformEndpointIndexService {
const { response, parser, headerParser, errorParser } = await client.indexEndpoint();

if (!response) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: `Could not connect to endpoint ${endpointIndex.endpoint.url}`,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: `Could not connect to endpoint ${endpointIndex.endpoint.url}`,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

if (isRedirect(response.status)) {
Expand All @@ -83,15 +99,27 @@ export class PerformEndpointIndexService {
const location = response.headers.get("location");

if (!location) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: `Endpoint ${endpointIndex.endpoint.url} is redirecting but no location header is present`,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: `Endpoint ${endpointIndex.endpoint.url} is redirecting but no location header is present`,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

if (redirectCount > 5) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: `Endpoint ${endpointIndex.endpoint.url} is redirecting too many times`,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: `Endpoint ${endpointIndex.endpoint.url} is redirecting too many times`,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

await this.#prismaClient.endpoint.update({
Expand All @@ -111,20 +139,38 @@ export class PerformEndpointIndexService {
const body = await safeBodyFromResponse(response, errorParser);

if (body) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: body.message,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: body.message,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

return updateEndpointIndexWithError(this.#prismaClient, id, {
message: "Trigger API key is invalid",
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: "Trigger API key is invalid",
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

if (!response.ok) {
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: `Could not connect to endpoint ${endpointIndex.endpoint.url}. Status code: ${response.status}`,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: `Could not connect to endpoint ${endpointIndex.endpoint.url}. Status code: ${response.status}`,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

const anyBody = await response.json();
Expand Down Expand Up @@ -152,21 +198,33 @@ export class PerformEndpointIndexService {
}).message;
}

return updateEndpointIndexWithError(this.#prismaClient, id, {
message: friendlyError,
raw: fromZodError(bodyResult.error).message,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: friendlyError,
raw: fromZodError(bodyResult.error).message,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

const headerResult = headerParser.safeParse(Object.fromEntries(response.headers.entries()));
if (!headerResult.success) {
const friendlyError = fromZodError(headerResult.error, {
prefix: "Your headers are invalid",
});
return updateEndpointIndexWithError(this.#prismaClient, id, {
message: friendlyError.message,
raw: headerResult.error.issues,
});
return updateEndpointIndexWithError(
this.#prismaClient,
id,
endpointIndex.endpoint.id,
{
message: friendlyError.message,
raw: headerResult.error.issues,
},
endpointIndex.endpoint.environment.type !== "DEVELOPMENT"
);
}

const { jobs, sources, dynamicTriggers, dynamicSchedules, httpEndpoints, webhooks } =
Expand Down Expand Up @@ -407,8 +465,44 @@ export class PerformEndpointIndexService {
async function updateEndpointIndexWithError(
prismaClient: PrismaClient,
id: string,
error: EndpointIndexError
endpointId: string,
error: EndpointIndexError,
checkDisabling = true
) {
// Check here to see if this endpoint has only failed for the last 50 times
// And if so, we disable the endpoint by setting the url to null
if (checkDisabling) {
const recentIndexes = await prismaClient.endpointIndex.findMany({
where: {
endpointId,
id: {
not: id,
},
},
orderBy: {
createdAt: "desc",
},
take: MAX_SEQUENTIAL_FAILURE_COUNT - 1,
select: {
status: true,
},
});

if (
recentIndexes.length === MAX_SEQUENTIAL_FAILURE_COUNT - 1 &&
recentIndexes.every((index) => index.status === "FAILURE")
) {
await prismaClient.endpoint.update({
where: {
id: endpointId,
},
data: {
url: null,
},
});
}
}

return await prismaClient.endpointIndex.update({
where: {
id,
Expand Down
7 changes: 4 additions & 3 deletions apps/webapp/app/services/sources/handleHttpSource.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ export class HandleHttpSourceService {
return { status: 200 };
}

if (!triggerSource.endpoint.url) {
return { status: 404 };
}

if (!triggerSource.organization.runsEnabled) {
logger.debug("HandleHttpSourceService: Runs are disabled for this organization", {
organizationId: triggerSource.organization.id,
});
return { status: 404 };
}

Expand Down
5 changes: 0 additions & 5 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,6 @@ function getWorkerQueue() {
return new ZodWorker({
name: "workerQueue",
prisma,
cleanup: {
frequencyExpression: "13,27,43 * * * *",
ttl: env.WORKER_CLEANUP_TTL_DAYS * 24 * 60 * 60 * 1000, // X days
maxCount: 1000,
},
runnerOptions: {
connectionString: env.DATABASE_URL,
concurrency: env.WORKER_CONCURRENCY,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/tracer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ function getTracer() {
if (env.INTERNAL_OTEL_TRACE_EXPORTER_URL) {
const exporter = new OTLPTraceExporter({
url: env.INTERNAL_OTEL_TRACE_EXPORTER_URL,
timeoutMillis: 1000,
timeoutMillis: 10_000,
headers:
env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_NAME &&
env.INTERNAL_OTEL_TRACE_EXPORTER_AUTH_HEADER_VALUE
Expand Down
Loading