Skip to main content
Glama

Flight Simulator MCP Server

by jabir366
http.py•11.7 kB
from __future__ import annotations from collections.abc import AsyncGenerator, Callable, Generator from contextlib import asynccontextmanager, contextmanager from contextvars import ContextVar from typing import TYPE_CHECKING from mcp.server.auth.middleware.auth_context import AuthContextMiddleware from mcp.server.auth.middleware.bearer_auth import ( BearerAuthBackend, RequireAuthMiddleware, ) from mcp.server.auth.routes import create_auth_routes from mcp.server.lowlevel.server import LifespanResultT from mcp.server.sse import SseServerTransport from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.middleware.authentication import AuthenticationMiddleware from starlette.requests import Request from starlette.responses import Response from starlette.routing import BaseRoute, Mount, Route from starlette.types import Lifespan, Receive, Scope, Send from fastmcp.server.auth.auth import OAuthProvider from fastmcp.utilities.logging import get_logger if TYPE_CHECKING: from fastmcp.server.server import FastMCP logger = get_logger(__name__) _current_http_request: ContextVar[Request | None] = ContextVar( "http_request", default=None, ) class StarletteWithLifespan(Starlette): @property def lifespan(self) -> Lifespan: return self.router.lifespan_context @contextmanager def set_http_request(request: Request) -> Generator[Request, None, None]: token = _current_http_request.set(request) try: yield request finally: _current_http_request.reset(token) class RequestContextMiddleware: """ Middleware that stores each request in a ContextVar """ def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] == "http": with set_http_request(Request(scope)): await self.app(scope, receive, send) else: await self.app(scope, receive, send) def setup_auth_middleware_and_routes( auth: OAuthProvider, ) -> tuple[list[Middleware], list[BaseRoute], list[str]]: """Set up authentication middleware and routes if auth is enabled. Args: auth: The OAuthProvider authorization server provider Returns: Tuple of (middleware, auth_routes, required_scopes) """ middleware: list[Middleware] = [] auth_routes: list[BaseRoute] = [] required_scopes: list[str] = [] middleware = [ Middleware( AuthenticationMiddleware, backend=BearerAuthBackend(auth), ), Middleware(AuthContextMiddleware), ] required_scopes = auth.required_scopes or [] auth_routes.extend( create_auth_routes( provider=auth, issuer_url=auth.issuer_url, service_documentation_url=auth.service_documentation_url, client_registration_options=auth.client_registration_options, revocation_options=auth.revocation_options, ) ) return middleware, auth_routes, required_scopes def create_base_app( routes: list[BaseRoute], middleware: list[Middleware], debug: bool = False, lifespan: Callable | None = None, ) -> StarletteWithLifespan: """Create a base Starlette app with common middleware and routes. Args: routes: List of routes to include in the app middleware: List of middleware to include in the app debug: Whether to enable debug mode lifespan: Optional lifespan manager for the app Returns: A Starlette application """ # Always add RequestContextMiddleware as the outermost middleware middleware.append(Middleware(RequestContextMiddleware)) return StarletteWithLifespan( routes=routes, middleware=middleware, debug=debug, lifespan=lifespan, ) def create_sse_app( server: FastMCP[LifespanResultT], message_path: str, sse_path: str, auth: OAuthProvider | None = None, debug: bool = False, routes: list[BaseRoute] | None = None, middleware: list[Middleware] | None = None, ) -> StarletteWithLifespan: """Return an instance of the SSE server app. Args: server: The FastMCP server instance message_path: Path for SSE messages sse_path: Path for SSE connections auth: Optional auth provider debug: Whether to enable debug mode routes: Optional list of custom routes middleware: Optional list of middleware Returns: A Starlette application with RequestContextMiddleware """ server_routes: list[BaseRoute] = [] server_middleware: list[Middleware] = [] # Set up SSE transport sse = SseServerTransport(message_path) # Create handler for SSE connections async def handle_sse(scope: Scope, receive: Receive, send: Send) -> Response: async with sse.connect_sse(scope, receive, send) as streams: await server._mcp_server.run( streams[0], streams[1], server._mcp_server.create_initialization_options(), ) return Response() # Get auth middleware and routes # Add SSE routes with or without auth if auth: auth_middleware, auth_routes, required_scopes = ( setup_auth_middleware_and_routes(auth) ) server_routes.extend(auth_routes) server_middleware.extend(auth_middleware) # Auth is enabled, wrap endpoints with RequireAuthMiddleware server_routes.append( Route( sse_path, endpoint=RequireAuthMiddleware(handle_sse, required_scopes), methods=["GET"], ) ) server_routes.append( Mount( message_path, app=RequireAuthMiddleware(sse.handle_post_message, required_scopes), ) ) else: # No auth required async def sse_endpoint(request: Request) -> Response: return await handle_sse(request.scope, request.receive, request._send) # type: ignore[reportPrivateUsage] server_routes.append( Route( sse_path, endpoint=sse_endpoint, methods=["GET"], ) ) server_routes.append( Mount( message_path, app=sse.handle_post_message, ) ) # Add custom routes with lowest precedence if routes: server_routes.extend(routes) server_routes.extend(server._additional_http_routes) # Add middleware if middleware: server_middleware.extend(middleware) # Create and return the app app = create_base_app( routes=server_routes, middleware=server_middleware, debug=debug, ) # Store the FastMCP server instance on the Starlette app state app.state.fastmcp_server = server app.state.path = sse_path return app def create_streamable_http_app( server: FastMCP[LifespanResultT], streamable_http_path: str, event_store: EventStore | None = None, auth: OAuthProvider | None = None, json_response: bool = False, stateless_http: bool = False, debug: bool = False, routes: list[BaseRoute] | None = None, middleware: list[Middleware] | None = None, ) -> StarletteWithLifespan: """Return an instance of the StreamableHTTP server app. Args: server: The FastMCP server instance streamable_http_path: Path for StreamableHTTP connections event_store: Optional event store for session management auth: Optional auth provider json_response: Whether to use JSON response format stateless_http: Whether to use stateless mode (new transport per request) debug: Whether to enable debug mode routes: Optional list of custom routes middleware: Optional list of middleware Returns: A Starlette application with StreamableHTTP support """ server_routes: list[BaseRoute] = [] server_middleware: list[Middleware] = [] # Create session manager using the provided event store session_manager = StreamableHTTPSessionManager( app=server._mcp_server, event_store=event_store, json_response=json_response, stateless=stateless_http, ) # Create the ASGI handler async def handle_streamable_http( scope: Scope, receive: Receive, send: Send ) -> None: try: await session_manager.handle_request(scope, receive, send) except RuntimeError as e: if str(e) == "Task group is not initialized. Make sure to use run().": logger.error( f"Original RuntimeError from mcp library: {e}", exc_info=True ) new_error_message = ( "FastMCP's StreamableHTTPSessionManager task group was not initialized. " "This commonly occurs when the FastMCP application's lifespan is not " "passed to the parent ASGI application (e.g., FastAPI or Starlette). " "Please ensure you are setting `lifespan=mcp_app.lifespan` in your " "parent app's constructor, where `mcp_app` is the application instance " "returned by `fastmcp_instance.http_app()`. \\n" "For more details, see the FastMCP ASGI integration documentation: " "https://gofastmcp.com/deployment/asgi" ) # Raise a new RuntimeError that includes the original error's message # for full context, but leads with the more helpful guidance. raise RuntimeError(f"{new_error_message}\\nOriginal error: {e}") from e else: # Re-raise other RuntimeErrors if they don't match the specific message raise # Add StreamableHTTP routes with or without auth if auth: auth_middleware, auth_routes, required_scopes = ( setup_auth_middleware_and_routes(auth) ) server_routes.extend(auth_routes) server_middleware.extend(auth_middleware) # Auth is enabled, wrap endpoint with RequireAuthMiddleware server_routes.append( Mount( streamable_http_path, app=RequireAuthMiddleware(handle_streamable_http, required_scopes), ) ) else: # No auth required server_routes.append( Mount( streamable_http_path, app=handle_streamable_http, ) ) # Add custom routes with lowest precedence if routes: server_routes.extend(routes) server_routes.extend(server._additional_http_routes) # Add middleware if middleware: server_middleware.extend(middleware) # Create a lifespan manager to start and stop the session manager @asynccontextmanager async def lifespan(app: Starlette) -> AsyncGenerator[None, None]: async with session_manager.run(): yield # Create and return the app with lifespan app = create_base_app( routes=server_routes, middleware=server_middleware, debug=debug, lifespan=lifespan, ) # Store the FastMCP server instance on the Starlette app state app.state.fastmcp_server = server app.state.path = streamable_http_path return app

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jabir366/MCPFligh'

If you have feedback or need assistance with the MCP directory API, please join our Discord server