Skip to main content
Glama

Damn Vulnerable Model Context Protocol (DVMCP)

sse_server.py1.99 kB
import sys import os # Add the project root to the Python path sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) from mcp.server.fastmcp import FastMCP, Context from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Mount, Route from fastapi import FastAPI # Create a base class for SSE-compatible MCP servers class SseCompatibleMcpServer: def __init__(self, name, port): self.mcp = FastMCP(name) self.port = port self.app = FastAPI() def mount_sse_server(self): """Mount the SSE server to the FastAPI app""" self.app.mount("/", self.create_sse_server()) def create_sse_server(self): """Create a Starlette app that handles SSE connections and message handling""" transport = SseServerTransport("/messages/") # Define handler functions async def handle_sse(request): async with transport.connect_sse( request.scope, request.receive, request._send ) as streams: await self.mcp._mcp_server.run( streams[0], streams[1], self.mcp._mcp_server.create_initialization_options() ) # Create Starlette routes for SSE and message handling routes = [ Route("/sse", endpoint=handle_sse), Mount("/messages", app=transport.handle_post_message), ] # Create a Starlette app return Starlette(routes=routes) def run(self): """Run the server with uvicorn""" import uvicorn print(f"Starting {self.mcp.name} MCP Server") print("Connect to this server using an MCP client (e.g., Claude Desktop or Cursor)") print(f"Server running at http://localhost:{self.port}") print(f"SSE endpoint available at http://localhost:{self.port}/sse") uvicorn.run(self.app, host="0.0.0.0", port=self.port)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/harishsg993010/damn-vulnerable-MCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server