Skip to main content
Glama

FastAPI SSE MCP Random

by hk4crprasad
sse.py1.22 kB
from mcp.server.fastmcp import FastMCP from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.routing import Mount, Route def create_sse_server(mcp: FastMCP) -> Starlette: """ Build a Starlette app to handle SSE MCP streams. """ # Initialize the SSE transport, mounting POSTs under /messages/ transport = SseServerTransport("/messages/") # Handle incoming SSE connections async def handle_sse(request): # Establish an SSE connection, returning a pair of (read_stream, write_stream) async with transport.connect_sse( request.scope, request.receive, request._send ) as (read_stream, write_stream): # Delegate to the MCP server’s run loop await mcp._mcp_server.run( read_stream, write_stream, mcp._mcp_server.create_initialization_options(), ) # Configure routes: one for the EventSource, one for client POSTs routes = [ Route("/sse/", endpoint=handle_sse), Mount("/messages/", app=transport.handle_post_message), ] # Return a standalone Starlette app return Starlette(routes=routes)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hk4crprasad/fastapi_sse_mcp_random'

If you have feedback or need assistance with the MCP directory API, please join our Discord server