Skip to main content
Glama
server.ts6.07 kB
/** * Example: Streaming with Decorators * * This example shows how to use decorators for streaming responses * with Express-MCP, supporting Server-Sent Events (SSE). */ import express, { type Request, type Response } from "express"; import { z } from "zod"; import { ApiOperation, Controller, Get, Post } from "../../src/decorators"; import { ExpressMCP } from "../../src/index"; // Streaming controller using decorators @Controller("/stream") export class StreamingController { @Get("/events") @ApiOperation({ summary: "Stream server-sent events", description: "Streams a series of events to the client using SSE", tags: ["streaming"], operationId: "streamEvents", }) async streamEvents(req: Request, res: Response) { // Set SSE headers res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Streaming", "true"); let counter = 0; const interval = setInterval(() => { counter++; const data = { id: counter, timestamp: new Date().toISOString(), message: `Event ${counter}`, }; res.write(`data: ${JSON.stringify(data)}\n\n`); if (counter >= 5) { clearInterval(interval); res.write("event: done\ndata: Stream complete\n\n"); res.end(); } }, 1000); // Handle client disconnect req.on("close", () => { clearInterval(interval); res.end(); }); } @Get("/numbers") @ApiOperation({ summary: "Stream numbers", description: "Streams a sequence of numbers", tags: ["streaming"], operationId: "streamNumbers", }) async streamNumbers(req: Request, res: Response) { res.setHeader("Content-Type", "text/plain"); res.setHeader("Transfer-Encoding", "chunked"); for (let i = 1; i <= 10; i++) { res.write(`${i}\n`); await new Promise((resolve) => setTimeout(resolve, 500)); } res.end(); } @Post("/chat") @ApiOperation({ summary: "Stream chat response", description: "Simulates a streaming chat response like an AI assistant", tags: ["streaming", "chat"], operationId: "streamChat", }) async streamChat(req: Request, res: Response) { const { message } = req.body; res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("X-Streaming", "true"); // Simulate typing response word by word const response = `Hello! You said: "${message}". This is a simulated streaming response that arrives word by word.`; const words = response.split(" "); for (const word of words) { res.write(`data: ${word} `); await new Promise((resolve) => setTimeout(resolve, 200)); } res.write("\n\ndata: [DONE]\n\n"); res.end(); } @Get("/progress/:taskId") @ApiOperation({ summary: "Stream task progress", description: "Streams progress updates for a long-running task", tags: ["streaming", "progress"], operationId: "streamProgress", }) async streamProgress(req: Request, res: Response) { const { taskId } = req.params; res.setHeader("Content-Type", "application/x-ndjson"); res.setHeader("X-Streaming", "true"); // Simulate task progress const stages = [ { stage: "Initializing", progress: 0 }, { stage: "Loading data", progress: 20 }, { stage: "Processing", progress: 40 }, { stage: "Analyzing", progress: 60 }, { stage: "Finalizing", progress: 80 }, { stage: "Complete", progress: 100 }, ]; for (const update of stages) { const data = { taskId, ...update, timestamp: new Date().toISOString(), }; res.write(`${JSON.stringify(data)}\n`); await new Promise((resolve) => setTimeout(resolve, 1000)); } res.end(); } } // Create Express app and register controller const app = express(); app.use(express.json()); // Register streaming controller import { registerController } from "../../src/decorators"; const streamingController = new StreamingController(); registerController(app, streamingController); // Add non-streaming endpoints for comparison app.get("/regular/data", (req: Request, res: Response) => { res.json({ message: "This is a regular non-streaming response", timestamp: new Date().toISOString(), }); }); // Create ExpressMCP with streaming support const mcp = new ExpressMCP(app, { mountPath: "/mcp", logging: { info: (...args) => console.log("[MCP]", ...args), error: (...args) => console.error("[MCP ERROR]", ...args), }, }); async function start() { await mcp.init(); mcp.mount(); const PORT = process.env.PORT || 3010; app.listen(PORT, () => { console.log(`✅ Express server running on http://localhost:${PORT}`); console.log(`📡 MCP tools available at http://localhost:${PORT}/mcp/tools`); console.log("\n🌊 Streaming Endpoints:"); console.log(" GET /stream/events - Server-Sent Events"); console.log(" GET /stream/numbers - Plain text streaming"); console.log(" POST /stream/chat - Chat-like streaming"); console.log(" GET /stream/progress/:id - Progress updates (NDJSON)"); console.log("\n📝 Test Streaming via MCP:"); console.log("\n1. Server-Sent Events:"); console.log(` curl -X POST http://localhost:${PORT}/mcp/invoke \\`); console.log(` -H "Content-Type: application/json" \\`); console.log( ` -d '{"toolName": "GET_/stream/events", "args": {}, "streaming": true}'`, ); console.log("\n2. Chat Streaming:"); console.log(` curl -X POST http://localhost:${PORT}/mcp/invoke \\`); console.log(` -H "Content-Type: application/json" \\`); console.log( ` -d '{"toolName": "POST_/stream/chat", "args": {"message": "Hello!"}, "streaming": true}'`, ); console.log("\n3. Progress Streaming:"); console.log(` curl -X POST http://localhost:${PORT}/mcp/invoke \\`); console.log(` -H "Content-Type: application/json" \\`); console.log( ` -d '{"toolName": "GET_/stream/progress/:taskId", "args": {"taskId": "task-123"}, "streaming": true}'`, ); console.log("\n💡 Note: Add 'streaming: true' to enable streaming mode"); }); } start().catch(console.error);

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bowen31337/expressjs_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server