runners.py•1.16 kB
import uvicorn
from mcp.server.lowlevel import Server
from mcp.server.sse import SseServerTransport
from mcp.server.stdio import stdio_server
from starlette.applications import Starlette
from starlette.routing import Mount, Route
async def run_stdio_async(server: Server):
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
async def run_sse_async(server: Server, host: str, port: int):
sse = SseServerTransport("/messages/")
async def handle_sse(request):
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
await server.run(streams[0], streams[1], server.create_initialization_options())
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
config = uvicorn.Config(
starlette_app,
host=host,
port=port,
log_level="debug",
)
uvicorn_server = uvicorn.Server(config)
await uvicorn_server.serve()