Skip to content

Commit 5c9a305

Browse files
committed
Auto-reload the queue page every 10 seconds
1 parent 46eed03 commit 5c9a305

File tree

4 files changed

+89
-2
lines changed

4 files changed

+89
-2
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,6 @@ const EnvironmentSchema = z.object({
568568
/** How long should the presence ttl last */
569569
DEV_PRESENCE_TTL_MS: z.coerce.number().int().default(30_000),
570570
DEV_PRESENCE_POLL_INTERVAL_MS: z.coerce.number().int().default(5_000),
571-
DEV_PRESENCE_RECONNECT_THRESHOLD_MS: z.coerce.number().int().default(2_000),
572571
/** How many ms to wait until dequeuing again, if there was a run last time */
573572
DEV_DEQUEUE_INTERVAL_WITH_RUN: z.coerce.number().int().default(250),
574573
/** How many ms to wait until dequeuing again, if there was no run last time */
@@ -660,6 +659,9 @@ const EnvironmentSchema = z.object({
660659

661660
TASK_EVENT_PARTITIONING_ENABLED: z.string().default("0"),
662661
TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS: z.coerce.number().int().default(60), // 1 minute
662+
663+
QUEUE_SSE_AUTORELOAD_INTERVAL_MS: z.coerce.number().int().default(10_000),
664+
QUEUE_SSE_AUTORELOAD_TIMEOUT_MS: z.coerce.number().int().default(60_000),
663665
});
664666

665667
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
PlayIcon,
77
RectangleStackIcon,
88
} from "@heroicons/react/20/solid";
9-
import { Await, Form, useNavigation, type MetaFunction } from "@remix-run/react";
9+
import { Await, Form, useNavigation, useRevalidator, type MetaFunction } from "@remix-run/react";
1010
import { type ActionFunctionArgs, type LoaderFunctionArgs } from "@remix-run/server-runtime";
1111
import { Suspense, useEffect, useState } from "react";
1212
import { TypedAwait, typeddefer, useTypedLoaderData } from "remix-typedjson";
@@ -58,6 +58,8 @@ import { Callout } from "~/components/primitives/Callout";
5858
import upgradeForQueuesPath from "~/assets/images/queues-dashboard.png";
5959
import { PauseQueueService } from "~/v3/services/pauseQueue.server";
6060
import { Badge } from "~/components/primitives/Badge";
61+
import { useEventSource } from "~/hooks/useEventSource";
62+
import { useProject } from "~/hooks/useProject";
6163

6264
const SearchParamsSchema = z.object({
6365
page: z.coerce.number().min(1).default(1),
@@ -198,9 +200,26 @@ export default function Page() {
198200
const { environment, queues, success, pagination, code } = useTypedLoaderData<typeof loader>();
199201

200202
const organization = useOrganization();
203+
const project = useProject();
201204
const env = useEnvironment();
202205
const plan = useCurrentPlan();
203206

207+
const revalidation = useRevalidator();
208+
209+
const streamedEvents = useEventSource(
210+
`/resources/orgs/${organization.slug}/projects/${project.slug}/env/${env.slug}/queues/stream`,
211+
{
212+
event: "update",
213+
}
214+
);
215+
216+
useEffect(() => {
217+
if (streamedEvents) {
218+
console.log("streamedEvents", streamedEvents);
219+
revalidation.revalidate();
220+
}
221+
}, [streamedEvents]);
222+
204223
return (
205224
<PageContainer>
206225
<NavBar>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { $replica } from "~/db.server";
2+
import { env } from "~/env.server";
3+
import { logger } from "~/services/logger.server";
4+
import { requireUserId } from "~/services/session.server";
5+
import { EnvironmentParamSchema } from "~/utils/pathBuilder";
6+
import { createSSELoader } from "~/utils/sse";
7+
8+
export const loader = createSSELoader({
9+
timeout: env.QUEUE_SSE_AUTORELOAD_TIMEOUT_MS,
10+
interval: env.QUEUE_SSE_AUTORELOAD_INTERVAL_MS,
11+
debug: true,
12+
handler: async ({ request, params }) => {
13+
const userId = await requireUserId(request);
14+
const { projectParam, envParam } = EnvironmentParamSchema.parse(params);
15+
16+
const environment = await $replica.runtimeEnvironment.findFirst({
17+
where: {
18+
slug: envParam,
19+
type: "DEVELOPMENT",
20+
orgMember: {
21+
userId,
22+
},
23+
project: {
24+
slug: projectParam,
25+
},
26+
},
27+
});
28+
29+
if (!environment) {
30+
throw new Response("Not Found", { status: 404 });
31+
}
32+
33+
return {
34+
beforeStream: async () => {
35+
logger.debug("Start queue page SSE session", {
36+
environmentId: environment.id,
37+
});
38+
},
39+
initStream: async ({ send }) => {
40+
send({ event: "time", data: new Date().toISOString() });
41+
},
42+
iterator: async ({ send }) => {
43+
send({
44+
event: "update",
45+
data: new Date().toISOString(),
46+
});
47+
},
48+
cleanup: async () => {
49+
logger.debug("End queue page SSE session", {
50+
environmentId: environment.id,
51+
});
52+
},
53+
};
54+
},
55+
});

references/hello-world/src/trigger/queues.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,14 @@ export const queuesTester = task({
3939
logger.log("Resumed queue", { resumedQueue });
4040
},
4141
});
42+
43+
export const otherQueueTask = task({
44+
id: "other-queue-task",
45+
queue: {
46+
name: "my-custom-queue",
47+
concurrencyLimit: 1,
48+
},
49+
run: async (payload: any, { ctx }) => {
50+
logger.log("Other queue task", { payload });
51+
},
52+
});

0 commit comments

Comments
 (0)