Skip to content

Commit a6f215f

Browse files
committed
feature: add socket support
1 parent b2fca82 commit a6f215f

File tree

7 files changed

+253
-113
lines changed

7 files changed

+253
-113
lines changed

apps/webapp/app/entry.server.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ import {
1515
OperatingSystemPlatform,
1616
} from "./components/primitives/OperatingSystemProvider";
1717

18+
import { initializeWebSocketServer } from "./v3/handleWebsockets.server";
19+
initializeWebSocketServer();
20+
1821
const ABORT_DELAY = 30000;
1922

2023
export default function handleRequest(

apps/webapp/app/express.server.ts

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,9 @@ import { createExpressApp } from "remix-create-express-app";
22
import http from "http";
33
import morgan from "morgan";
44
import compression from "compression";
5-
6-
import type { Server as EngineServer } from "engine.io";
7-
85
import { registryProxy } from "./v3/registryProxy.server";
96
import { apiRateLimiter } from "./services/apiRateLimit.server";
10-
import { socketIo } from "./v3/handleSocketIo.server";
11-
import { wss } from "./v3/handleWebsockets.server";
7+
import { registerSocketIo } from "./socket.server";
128

139
export const express = createExpressApp({
1410
configure(app) {
@@ -62,62 +58,7 @@ export const express = createExpressApp({
6258
createServer(app) {
6359
const server = http.createServer(app);
6460

65-
server.keepAliveTimeout = 65 * 1000;
66-
67-
process.on("SIGTERM", () => {
68-
server.close((err) => {
69-
if (err) {
70-
console.error("Error closing express server:", err);
71-
} else {
72-
console.log("Express server closed gracefully.");
73-
}
74-
});
75-
});
76-
77-
socketIo?.io.attach(server);
78-
// prevent duplicate upgrades from listeners created by io.attach()
79-
server.removeAllListeners("upgrade");
80-
81-
server.on("upgrade", async (req, socket, head) => {
82-
console.log(
83-
`Attemping to upgrade connection at url ${req.url} with headers: ${JSON.stringify(
84-
req.headers
85-
)}`
86-
);
87-
88-
socket.on("error", (err) => {
89-
console.error("Connection upgrade error:", err);
90-
});
91-
92-
const url = new URL(req.url ?? "", "http://localhost");
93-
94-
// Upgrade socket.io connection
95-
if (url.pathname.startsWith("/socket.io/")) {
96-
console.log(`Socket.io client connected, upgrading their connection...`);
97-
98-
// https://github.com/socketio/socket.io/issues/4693
99-
(socketIo?.io.engine as EngineServer).handleUpgrade(req, socket, head);
100-
return;
101-
}
102-
103-
// Only upgrade the connecting if the path is `/ws`
104-
if (url.pathname !== "/ws") {
105-
// Setting the socket.destroy() error param causes an error event to be emitted which needs to be handled with socket.on("error") to prevent uncaught exceptions.
106-
socket.destroy(
107-
new Error(
108-
"Cannot connect because of invalid path: Please include `/ws` in the path of your upgrade request."
109-
)
110-
);
111-
return;
112-
}
113-
114-
console.log(`Client connected, upgrading their connection...`);
115-
116-
// Handle the WebSocket connection
117-
wss?.handleUpgrade(req, socket, head, (ws) => {
118-
wss?.emit("connection", ws, req);
119-
});
120-
});
61+
registerSocketIo(server);
12162

12263
return server;
12364
},

apps/webapp/app/socket.server.ts

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import type http from "http";
2+
import type { Server as EngineServer } from "engine.io";
3+
import { singleton } from "./utils/singleton";
4+
import { env } from "./env.server";
5+
import { Redis } from "ioredis";
6+
import { createAdapter } from "@socket.io/redis-adapter";
7+
import { Server } from "socket.io";
8+
import { WebSocketServer } from "ws";
9+
10+
// wss and io are used directly in express.server.ts without bundling
11+
// so don't import from /app here
12+
// entry.server.ts gets bundled, so see initializeWebSocketServer() there
13+
// to add extra handlers that register on first request
14+
15+
export const wss = singleton("wss", () => {
16+
return new WebSocketServer({ noServer: true });
17+
});
18+
19+
export const io = singleton("socketIo", () => {
20+
if (!env.REDIS_HOST || !env.REDIS_PORT) {
21+
console.warn("No redis config found, skipping socket.io");
22+
return new Server();
23+
}
24+
25+
const pubClient = new Redis({
26+
port: env.REDIS_PORT,
27+
host: env.REDIS_HOST,
28+
username: env.REDIS_USERNAME,
29+
password: env.REDIS_PASSWORD,
30+
enableAutoPipelining: true,
31+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
32+
});
33+
const subClient = pubClient.duplicate();
34+
35+
const io = new Server({
36+
adapter: createAdapter(pubClient, subClient, {
37+
key: "tr:socket.io:",
38+
publishOnSpecificResponseChannel: true,
39+
}),
40+
});
41+
42+
return io;
43+
});
44+
45+
export function registerSocketIo(server: http.Server) {
46+
server.keepAliveTimeout = 65 * 1000;
47+
48+
process.on("SIGTERM", () => {
49+
server.close((err) => {
50+
if (err) {
51+
console.error("Error closing express server:", err);
52+
} else {
53+
console.log("Express server closed gracefully.");
54+
}
55+
});
56+
});
57+
58+
io.on("connection", (socket) => {
59+
console.log(`[socket.io][${socket.id}] connection at url: ${socket.request.url}`);
60+
});
61+
62+
console.log("Attaching socket.io");
63+
64+
io.attach(server);
65+
// prevent duplicate upgrades from listeners created by io.attach()
66+
server.removeAllListeners("upgrade");
67+
68+
server.on("upgrade", async (req, socket, head) => {
69+
console.log(
70+
`Attemping to upgrade connection at url ${req.url} with headers: ${JSON.stringify(
71+
req.headers
72+
)}`
73+
);
74+
75+
socket.on("error", (err) => {
76+
console.error("Connection upgrade error:", err);
77+
});
78+
79+
const url = new URL(req.url ?? "", "http://localhost");
80+
81+
// Upgrade socket.io connection
82+
if (url.pathname.startsWith("/socket.io/")) {
83+
console.log(`Socket.io client connected, upgrading their connection...`);
84+
85+
// https://github.com/socketio/socket.io/issues/4693
86+
(io.engine as EngineServer).handleUpgrade(req, socket, head);
87+
return;
88+
}
89+
90+
// Only upgrade the connecting if the path is `/ws`
91+
if (url.pathname !== "/ws") {
92+
// Setting the socket.destroy() error param causes an error event to be emitted which needs to be handled with socket.on("error") to prevent uncaught exceptions.
93+
socket.destroy(
94+
new Error(
95+
"Cannot connect because of invalid path: Please include `/ws` in the path of your upgrade request."
96+
)
97+
);
98+
return;
99+
}
100+
101+
console.log(`Client connected, upgrading their connection...`);
102+
103+
// Handle the WebSocket connection
104+
wss.handleUpgrade(req, socket, head, (ws) => {
105+
wss.emit("connection", ws, req);
106+
});
107+
});
108+
}

apps/webapp/app/v3/handleSocketIo.server.ts

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,12 @@ import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
1919
import { CreateDeployedBackgroundWorkerService } from "./services/createDeployedBackgroundWorker.server";
2020
import { ResumeAttemptService } from "./services/resumeAttempt.server";
2121
import { DeploymentIndexFailed } from "./services/deploymentIndexFailed.server";
22-
import { Redis } from "ioredis";
23-
import { createAdapter } from "@socket.io/redis-adapter";
2422
import { CrashTaskRunService } from "./services/crashTaskRun.server";
23+
import { io } from "~/socket.server";
2524

2625
export const socketIo = singleton("socketIo", initalizeIoServer);
2726

2827
function initalizeIoServer() {
29-
const io = initializeSocketIOServerInstance();
30-
31-
io.on("connection", (socket) => {
32-
logger.log(`[socket.io][${socket.id}] connection at url: ${socket.request.url}`);
33-
});
34-
3528
const coordinatorNamespace = createCoordinatorNamespace(io);
3629
const providerNamespace = createProviderNamespace(io);
3730
const sharedQueueConsumerNamespace = createSharedQueueConsumerNamespace(io);
@@ -44,31 +37,6 @@ function initalizeIoServer() {
4437
};
4538
}
4639

47-
function initializeSocketIOServerInstance() {
48-
if (env.REDIS_HOST && env.REDIS_PORT) {
49-
const pubClient = new Redis({
50-
port: env.REDIS_PORT,
51-
host: env.REDIS_HOST,
52-
username: env.REDIS_USERNAME,
53-
password: env.REDIS_PASSWORD,
54-
enableAutoPipelining: true,
55-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
56-
});
57-
const subClient = pubClient.duplicate();
58-
59-
const io = new Server({
60-
adapter: createAdapter(pubClient, subClient, {
61-
key: "tr:socket.io:",
62-
publishOnSpecificResponseChannel: true,
63-
}),
64-
});
65-
66-
return io;
67-
}
68-
69-
return new Server();
70-
}
71-
7240
function createCoordinatorNamespace(io: Server) {
7341
const coordinator = new ZodNamespace({
7442
// @ts-ignore - for some reason the built ZodNamespace Server type is not compatible with the Server type here, but only when doing typechecking

apps/webapp/app/v3/handleWebsockets.server.ts

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,32 @@
11
import { IncomingMessage } from "node:http";
2-
import { WebSocketServer, type WebSocket } from "ws";
2+
import type { WebSocket } from "ws";
33
import { authenticateApiKey } from "~/services/apiAuth.server";
44
import { logger } from "~/services/logger.server";
55
import { singleton } from "../utils/singleton";
66
import { AuthenticatedSocketConnection } from "./authenticatedSocketConnection.server";
77
import { Gauge } from "prom-client";
88
import { metricsRegister } from "~/metrics.server";
9+
import { wss } from "~/socket.server";
910

1011
let authenticatedConnections: Map<string, AuthenticatedSocketConnection>;
11-
export const wss = singleton("wss", initalizeWebSocketServer);
1212

13-
function initalizeWebSocketServer() {
14-
const server = new WebSocketServer({ noServer: true });
13+
export function initializeWebSocketServer() {
14+
return singleton("wss:register", () => {
15+
wss.on("connection", handleWebSocketConnection);
1516

16-
server.on("connection", handleWebSocketConnection);
17+
authenticatedConnections = new Map();
1718

18-
authenticatedConnections = new Map();
19+
new Gauge({
20+
name: "dev_authenticated_connections",
21+
help: "Number of authenticated dev connections",
22+
collect() {
23+
this.set(authenticatedConnections.size);
24+
},
25+
registers: [metricsRegister],
26+
});
1927

20-
new Gauge({
21-
name: "dev_authenticated_connections",
22-
help: "Number of authenticated dev connections",
23-
collect() {
24-
this.set(authenticatedConnections.size);
25-
},
26-
registers: [metricsRegister],
28+
return wss;
2729
});
28-
29-
return server;
3030
}
3131

3232
async function handleWebSocketConnection(ws: WebSocket, req: IncomingMessage) {

apps/webapp/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@
227227
"prettier": "^2.8.8",
228228
"prettier-plugin-tailwindcss": "^0.3.0",
229229
"prop-types": "^15.8.1",
230-
"remix-express-dev-server": "^0.2.4",
230+
"remix-express-dev-server": "^0.2.5",
231231
"rimraf": "^3.0.2",
232232
"style-loader": "^3.3.4",
233233
"tailwind-scrollbar": "^3.0.1",

0 commit comments

Comments
 (0)