Skip to content

Initial implementation of SSE Support #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions setup_sse.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

# Add required dependencies
echo "Installing dependencies..."
npm install --save uuid express cors

# Copy updated request handler
echo "Updating requestHandler.ts..."
mv src/requestHandler.updated.ts src/requestHandler.ts

# Install TypeScript type definitions
echo "Installing TypeScript type definitions..."
npm install --save-dev @types/uuid @types/express

echo "Setup completed successfully!"
echo "To run the server with SSE support, execute: npm start"
37 changes: 37 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,32 @@

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import express from "express";
import cors from "cors";
import { createToolDefinitions } from "./tools.js";
import { setupRequestHandlers } from "./requestHandler.js";
import { setupSSEEndpoints } from "./sseHandler.js";

async function runServer() {
// Create Express app for HTTP server
const app = express();
const PORT = process.env.PORT || 3000;

// Enable CORS for all routes
app.use(cors());

// Parse JSON bodies
app.use(express.json());

// Setup SSE endpoints
setupSSEEndpoints(app);

// Start HTTP server
const httpServer = app.listen(PORT, () => {
console.log(`HTTP server running on port ${PORT}`);
});

// Initialize MCP server
const server = new Server(
{
name: "executeautomation/playwright-mcp-server",
Expand All @@ -28,6 +50,21 @@ async function runServer() {
// Create transport and connect
const transport = new StdioServerTransport();
await server.connect(transport);

// Handle graceful shutdown
const handleShutdown = async () => {
console.log("Shutting down servers...");

// Close HTTP server
httpServer.close();

// Exit process
process.exit(0);
};

// Register shutdown handlers
process.on("SIGINT", handleShutdown);
process.on("SIGTERM", handleShutdown);
}

runServer().catch((error) => {
Expand Down
70 changes: 70 additions & 0 deletions src/requestHandler.updated.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import {
ListResourcesRequestSchema,
ReadResourceRequestSchema,
ListToolsRequestSchema,
CallToolRequestSchema,
Tool
} from "@modelcontextprotocol/sdk/types.js";
import { handleToolCall, getConsoleLogs, getScreenshots } from "./toolHandler.js";
import { withSSENotifications } from "./sseIntegration.js";

export function setupRequestHandlers(server: Server, tools: Tool[]) {
// List resources handler
server.setRequestHandler(ListResourcesRequestSchema, async () => ({
resources: [
{
uri: "console://logs",
mimeType: "text/plain",
name: "Browser console logs",
},
...Array.from(getScreenshots().keys()).map(name => ({
uri: `screenshot://${name}`,
mimeType: "image/png",
name: `Screenshot: ${name}`,
})),
],
}));

// Read resource handler
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const uri = request.params.uri.toString();

if (uri === "console://logs") {
return {
contents: [{
uri,
mimeType: "text/plain",
text: getConsoleLogs().join("\n"),
}],
};
}

if (uri.startsWith("screenshot://")) {
const name = uri.split("://")[1];
const screenshot = getScreenshots().get(name);
if (screenshot) {
return {
contents: [{
uri,
mimeType: "image/png",
blob: screenshot,
}],
};
}
}

throw new Error(`Resource not found: ${uri}`);
});

// List tools handler
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: tools,
}));

// Call tool handler with SSE notifications
const enhancedToolHandler = withSSENotifications(handleToolCall);
server.setRequestHandler(CallToolRequestSchema, async (request) =>
enhancedToolHandler(request.params.name, request.params.arguments ?? {}, server)
);
}
131 changes: 131 additions & 0 deletions src/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { v4 as uuidv4 } from 'uuid';

interface SSEClient {
id: string;
res: any;
}

interface SSEEvent {
id?: string;
event?: string;
data: string;
retry?: number;
}

class SSEManager {
private static instance: SSEManager;
private clients: Map<string, SSEClient>;

private constructor() {
this.clients = new Map();
}

public static getInstance(): SSEManager {
if (!SSEManager.instance) {
SSEManager.instance = new SSEManager();
}
return SSEManager.instance;
}

/**
* Register a new SSE client connection
* @param res - Express response object
* @returns Client ID
*/
public registerClient(res: any): string {
// Set headers for SSE
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});

// Send initial connection established message
res.write('retry: 10000\n\n');

// Generate unique ID for this client
const clientId = uuidv4();

// Store client connection
this.clients.set(clientId, { id: clientId, res });

// Send initial connection successful event
this.sendEventToClient(clientId, {
event: 'connected',
data: JSON.stringify({ clientId })
});

console.log(`SSE client connected: ${clientId}`);
return clientId;
}

/**
* Remove a client connection
* @param clientId - ID of client to remove
*/
public removeClient(clientId: string): void {
if (this.clients.has(clientId)) {
this.clients.delete(clientId);
console.log(`SSE client disconnected: ${clientId}`);
}
}

/**
* Send an event to a specific client
* @param clientId - ID of client to send to
* @param event - Event to send
*/
public sendEventToClient(clientId: string, event: SSEEvent): boolean {
const client = this.clients.get(clientId);
if (!client) {
return false;
}

try {
// Format SSE message
let message = '';
if (event.id) message += `id: ${event.id}\n`;
if (event.event) message += `event: ${event.event}\n`;
message += `data: ${event.data}\n`;
if (event.retry) message += `retry: ${event.retry}\n`;
message += '\n';

// Send to client
client.res.write(message);
return true;
} catch (error) {
console.error(`Error sending event to client ${clientId}:`, error);
this.removeClient(clientId);
return false;
}
}

/**
* Send an event to all connected clients
* @param event - Event to send
*/
public broadcast(event: SSEEvent): void {
this.clients.forEach((client) => {
this.sendEventToClient(client.id, event);
});
}

/**
* Get the number of connected clients
* @returns Number of connected clients
*/
public getClientCount(): number {
return this.clients.size;
}

/**
* Get all connected client IDs
* @returns Array of client IDs
*/
public getClientIds(): string[] {
return Array.from(this.clients.keys());
}
}

export default SSEManager;
66 changes: 66 additions & 0 deletions src/sseHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { Express, Request, Response } from "express";
import SSEManager from "./sse.js";

/**
* Setup SSE-related endpoints in Express
* @param app - Express application
*/
export function setupSSEEndpoints(app: Express): void {
const sseManager = SSEManager.getInstance();

// SSE connection endpoint
app.get('/sse', (req: Request, res: Response) => {
// Set headers for SSE connection
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
res.flushHeaders();

// Register client
const clientId = sseManager.registerClient(res);

// Handle client disconnect
req.on('close', () => {
sseManager.removeClient(clientId);
});
});

// Endpoint to send an event to a specific client
app.post('/sse/event/:clientId', (req: Request, res: Response) => {
const { clientId } = req.params;
const event = req.body;

if (!event.data) {
return res.status(400).json({ error: 'Event data is required' });
}

const success = sseManager.sendEventToClient(clientId, event);

if (success) {
res.status(200).json({ success: true });
} else {
res.status(404).json({ error: 'Client not found' });
}
});

// Endpoint to broadcast an event to all clients
app.post('/sse/broadcast', (req: Request, res: Response) => {
const event = req.body;

if (!event.data) {
return res.status(400).json({ error: 'Event data is required' });
}

sseManager.broadcast(event);
res.status(200).json({ success: true, clientCount: sseManager.getClientCount() });
});

// Endpoint to get connected client information
app.get('/sse/clients', (req: Request, res: Response) => {
res.status(200).json({
count: sseManager.getClientCount(),
clients: sseManager.getClientIds()
});
});
}
Loading
Loading