Skip to content

feat: realtime #1402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
7d99012
Denormalize run tags, increase character limit to 128
ericallam Sep 29, 2024
394ef77
WIP realtime subscribing to runs
ericallam Sep 29, 2024
e01c9d2
extracted the stream stuff into core, made it more reusable
ericallam Sep 29, 2024
67dfa23
WIP tags
ericallam Sep 29, 2024
a32ba39
Remove tags for now because it’s not support in electric
ericallam Sep 30, 2024
70fd57b
Support async iterables, readable stream, and callback style subscrip…
ericallam Sep 30, 2024
cf212e2
Remove tags streaming endpoint
ericallam Sep 30, 2024
2a30af0
Add realtime rate limits and scope them to the /realtime path
ericallam Sep 30, 2024
de0de4d
WIP rate limt per org
ericallam Oct 1, 2024
8bb23bd
Introduce per org rate limits
ericallam Oct 1, 2024
e2ab984
WIP JWT auth
ericallam Oct 9, 2024
680ec68
Move migrations into new internal db package
ericallam Oct 9, 2024
0f179c5
Resolve pnpm lock file
ericallam Oct 9, 2024
04ab0dd
Authenticating to the realtime API with JWTs are working
ericallam Oct 9, 2024
480b192
realtime in the client
ericallam Oct 9, 2024
b7566e6
Created react-hooks package and starting to move stuff in there
ericallam Oct 10, 2024
49a39b9
Improve types for hooks
ericallam Oct 10, 2024
c492ca9
schema tasks
ericallam Oct 10, 2024
7937c4f
Added useBatch hook
ericallam Oct 10, 2024
502077c
build uploadthing/fal demo and change how run metadata is synced to t…
ericallam Oct 11, 2024
1fe0346
tweaks
ericallam Oct 12, 2024
2d69ffc
WIL realtime concurrency tracking
ericallam Oct 14, 2024
090c4a6
Implement test for realtime client using testcontainers
ericallam Oct 15, 2024
0d31dca
Allow customizing the expiration time of the automatic JWT created af…
ericallam Oct 15, 2024
279c0ac
Add support for subscribing to run tags
ericallam Oct 15, 2024
e21eadb
Improve auth types and API
ericallam Oct 16, 2024
44e68a4
finalize the realtime API
ericallam Oct 17, 2024
b687d28
Fixed some example stuff
ericallam Oct 17, 2024
63397ad
Allow up to 10 run tags
ericallam Oct 17, 2024
b22615e
Remove core from docker-provider tsconfig paths to prevent it from be…
ericallam Oct 17, 2024
0055dcf
do the same for the kubernetes provider
ericallam Oct 17, 2024
8f04737
Fixing some typecheck errors
ericallam Oct 17, 2024
5c8e7a1
Fix webapp type errors
ericallam Oct 18, 2024
ca01927
Update @trigger.dev/platform to 1.0.13
ericallam Oct 18, 2024
d274dde
Fix attw error
ericallam Oct 18, 2024
2145613
Remove from/to in subscribeToRuns query params
ericallam Oct 18, 2024
5cbc0e9
Add tests for the rate limit middleware and add custom JWT rate limits
ericallam Oct 18, 2024
63b9941
turn off webapp test parallelism
ericallam Oct 21, 2024
a5cc2fc
Finish renaming jwt -> publicAccessToken and automatically give the J…
ericallam Oct 21, 2024
a19baa0
Add changeset
ericallam Oct 21, 2024
d83c35d
Attempt to fix unit tests in CI
ericallam Oct 21, 2024
e69847d
Skip running the auth rate limit middleware tests for now
ericallam Oct 21, 2024
74c6447
Try a beefier machine
ericallam Oct 21, 2024
c7ada46
Try and run webapp tests separately
ericallam Oct 21, 2024
bc91b48
Setup env vars
ericallam Oct 21, 2024
85e8f47
Make sliding window test more reliabile
ericallam Oct 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/brave-forks-compare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/react-hooks": minor
"@trigger.dev/sdk": minor
"@trigger.dev/core": minor
---

Access run status updates in realtime, from your server or from your frontend
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ concurrency:
jobs:
release:
name: 🦋 Changesets Release
runs-on: buildjet-8vcpu-ubuntu-2204
runs-on: ubuntu-latest
if: github.repository == 'triggerdotdev/trigger.dev'
outputs:
published: ${{ steps.changesets.outputs.published }}
Expand Down
16 changes: 13 additions & 3 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
jobs:
unitTests:
name: "🧪 Unit Tests"
runs-on: buildjet-8vcpu-ubuntu-2204
runs-on: buildjet-16vcpu-ubuntu-2204
steps:
- name: ⬇️ Checkout repo
uses: actions/checkout@v4
Expand All @@ -30,5 +30,15 @@ jobs:
- name: 📀 Generate Prisma Client
run: pnpm run generate

- name: 🧪 Run Unit Tests
run: pnpm run test
- name: 🧪 Run Webapp Unit Tests
run: pnpm run test --filter webapp
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/postgres
DIRECT_URL: postgresql://postgres:postgres@localhost:5432/postgres
SESSION_SECRET: "secret"
MAGIC_LINK_SECRET: "secret"
ENCRYPTION_KEY: "secret"


- name: 🧪 Run Internal Unit Tests
run: pnpm run test --filter "@internal/*"
3 changes: 2 additions & 1 deletion .npmrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
link-workspace-packages=false
public-hoist-pattern[]=*prisma*
public-hoist-pattern[]=*prisma*
prefer-workspace-packages=true
8 changes: 2 additions & 6 deletions .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
{
"recommendations": [
"denoland.vscode-deno"
],
"unwantedRecommendations": [

]
"recommendations": ["bierner.comment-tagged-templates"],
"unwantedRecommendations": []
}
6 changes: 1 addition & 5 deletions apps/docker-provider/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"strict": true,
"skipLibCheck": true,
"paths": {
"@trigger.dev/core/v3": ["../../packages/core/src/v3"],
"@trigger.dev/core/v3/*": ["../../packages/core/src/v3/*"]
}
"skipLibCheck": true
}
}
6 changes: 1 addition & 5 deletions apps/kubernetes-provider/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"strict": true,
"skipLibCheck": true,
"paths": {
"@trigger.dev/core/v3": ["../../packages/core/src/v3"],
"@trigger.dev/core/v3/*": ["../../packages/core/src/v3/*"]
}
"skipLibCheck": true
}
}
21 changes: 20 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const EnvironmentSchema = z.object({
REMIX_APP_PORT: z.string().optional(),
LOGIN_ORIGIN: z.string().default("http://localhost:3030"),
APP_ORIGIN: z.string().default("http://localhost:3030"),
ELECTRIC_ORIGIN: z.string(),
ELECTRIC_ORIGIN: z.string().default("http://localhost:3060"),
APP_ENV: z.string().default(process.env.NODE_ENV),
SERVICE_NAME: z.string().default("trigger.dev webapp"),
SECRET_STORE: SecretStoreOptionsSchema.default("DATABASE"),
Expand Down Expand Up @@ -103,6 +103,25 @@ const EnvironmentSchema = z.object({
API_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(250), // refix 250 tokens every 10 seconds
API_RATE_LIMIT_REQUEST_LOGS_ENABLED: z.string().default("0"),
API_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
API_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),

API_RATE_LIMIT_JWT_WINDOW: z.string().default("1m"),
API_RATE_LIMIT_JWT_TOKENS: z.coerce.number().int().default(60),

//Realtime rate limiting
/**
* @example "60s"
* @example "1m"
* @example "1h"
* @example "1d"
* @example "1000ms"
* @example "1000s"
*/
REALTIME_RATE_LIMIT_WINDOW: z.string().default("1m"),
REALTIME_RATE_LIMIT_TOKENS: z.coerce.number().int().default(100),
REALTIME_RATE_LIMIT_REQUEST_LOGS_ENABLED: z.string().default("0"),
REALTIME_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
REALTIME_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),

//Ingesting event rate limit
INGEST_EVENT_RATE_LIMIT_WINDOW: z.string().default("60s"),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/models/taskRunTag.server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { prisma } from "~/db.server";
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";

export const MAX_TAGS_PER_RUN = 5;
export const MAX_TAGS_PER_RUN = 10;

export async function createTag({ tag, projectId }: { tag: string; projectId: string }) {
if (tag.trim().length === 0) return;
Expand Down
102 changes: 44 additions & 58 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ type CommonRelatedRun = Prisma.Result<
export class ApiRetrieveRunPresenter extends BasePresenter {
public async call(
friendlyId: string,
env: AuthenticatedEnvironment,
showSecretDetails: boolean
env: AuthenticatedEnvironment
): Promise<RetrieveRunResponse | undefined> {
return this.traceWithEnv("call", env, async (span) => {
const taskRun = await this._replica.taskRun.findFirst({
Expand All @@ -72,11 +71,7 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
runtimeEnvironmentId: env.id,
},
include: {
attempts: {
orderBy: {
createdAt: "desc",
},
},
attempts: true,
lockedToVersion: true,
schedule: true,
tags: true,
Expand Down Expand Up @@ -111,50 +106,48 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
let $output: any;
let $outputPresignedUrl: string | undefined;

if (showSecretDetails) {
const payloadPacket = await conditionallyImportPacket({
data: taskRun.payload,
dataType: taskRun.payloadType,
});
const payloadPacket = await conditionallyImportPacket({
data: taskRun.payload,
dataType: taskRun.payloadType,
});

if (
payloadPacket.dataType === "application/store" &&
typeof payloadPacket.data === "string"
) {
$payloadPresignedUrl = await generatePresignedUrl(
env.project.externalRef,
env.slug,
payloadPacket.data,
"GET"
);
} else {
$payload = await parsePacket(payloadPacket);
}
if (
payloadPacket.dataType === "application/store" &&
typeof payloadPacket.data === "string"
) {
$payloadPresignedUrl = await generatePresignedUrl(
env.project.externalRef,
env.slug,
payloadPacket.data,
"GET"
);
} else {
$payload = await parsePacket(payloadPacket);
}

if (taskRun.status === "COMPLETED_SUCCESSFULLY") {
const completedAttempt = taskRun.attempts.find(
(a) => a.status === "COMPLETED" && typeof a.output !== null
);
if (taskRun.status === "COMPLETED_SUCCESSFULLY") {
const completedAttempt = taskRun.attempts.find(
(a) => a.status === "COMPLETED" && typeof a.output !== null
);

if (completedAttempt && completedAttempt.output) {
const outputPacket = await conditionallyImportPacket({
data: completedAttempt.output,
dataType: completedAttempt.outputType,
});
if (completedAttempt && completedAttempt.output) {
const outputPacket = await conditionallyImportPacket({
data: completedAttempt.output,
dataType: completedAttempt.outputType,
});

if (
outputPacket.dataType === "application/store" &&
typeof outputPacket.data === "string"
) {
$outputPresignedUrl = await generatePresignedUrl(
env.project.externalRef,
env.slug,
outputPacket.data,
"GET"
);
} else {
$output = await parsePacket(outputPacket);
}
if (
outputPacket.dataType === "application/store" &&
typeof outputPacket.data === "string"
) {
$outputPresignedUrl = await generatePresignedUrl(
env.project.externalRef,
env.slug,
outputPacket.data,
"GET"
);
} else {
$output = await parsePacket(outputPacket);
}
}
}
Expand All @@ -165,6 +158,7 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
payloadPresignedUrl: $payloadPresignedUrl,
output: $output,
outputPresignedUrl: $outputPresignedUrl,
error: ApiRetrieveRunPresenter.apiErrorFromError(taskRun.error),
schedule: taskRun.schedule
? {
id: taskRun.schedule.friendlyId,
Expand All @@ -179,17 +173,9 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
},
}
: undefined,
attempts: !showSecretDetails
? []
: taskRun.attempts.map((a) => ({
id: a.friendlyId,
status: ApiRetrieveRunPresenter.apiStatusFromAttemptStatus(a.status),
createdAt: a.createdAt ?? undefined,
updatedAt: a.updatedAt ?? undefined,
startedAt: a.startedAt ?? undefined,
completedAt: a.completedAt ?? undefined,
error: ApiRetrieveRunPresenter.apiErrorFromError(a.error),
})),
// We're removing attempts from the API
attemptCount: taskRun.attempts.length,
attempts: [],
relatedRuns: {
root: taskRun.rootTaskRun
? await createCommonRunStructure(taskRun.rootTaskRun)
Expand Down
Loading