Skip to main content
Glama

Flight Simulator MCP Server

by jabir366
proxy.py•21.9 kB
from __future__ import annotations import warnings from collections.abc import Callable from pathlib import Path from typing import TYPE_CHECKING, Any, cast from urllib.parse import quote import mcp.types from mcp.client.session import ClientSession from mcp.shared.context import LifespanContextT, RequestContext from mcp.shared.exceptions import McpError from mcp.types import ( METHOD_NOT_FOUND, BlobResourceContents, GetPromptResult, TextResourceContents, ) from pydantic.networks import AnyUrl import fastmcp from fastmcp.client.client import Client, FastMCP1Server from fastmcp.client.elicitation import ElicitResult from fastmcp.client.logging import LogMessage from fastmcp.client.roots import RootsList from fastmcp.client.transports import ClientTransportT from fastmcp.exceptions import NotFoundError, ResourceError, ToolError from fastmcp.mcp_config import MCPConfig from fastmcp.prompts import Prompt, PromptMessage from fastmcp.prompts.prompt import PromptArgument from fastmcp.prompts.prompt_manager import PromptManager from fastmcp.resources import Resource, ResourceTemplate from fastmcp.resources.resource_manager import ResourceManager from fastmcp.server.context import Context from fastmcp.server.dependencies import get_context from fastmcp.server.server import FastMCP from fastmcp.tools.tool import Tool, ToolResult from fastmcp.tools.tool_manager import ToolManager from fastmcp.utilities.components import MirroredComponent from fastmcp.utilities.logging import get_logger if TYPE_CHECKING: from fastmcp.server import Context logger = get_logger(__name__) class ProxyToolManager(ToolManager): """A ToolManager that sources its tools from a remote client in addition to local and mounted tools.""" def __init__(self, client_factory: Callable[[], Client], **kwargs): super().__init__(**kwargs) self.client_factory = client_factory async def get_tools(self) -> dict[str, Tool]: """Gets the unfiltered tool inventory including local, mounted, and proxy tools.""" # First get local and mounted tools from parent all_tools = await super().get_tools() # Then add proxy tools, but don't overwrite existing ones try: client = self.client_factory() async with client: client_tools = await client.list_tools() for tool in client_tools: if tool.name not in all_tools: all_tools[tool.name] = ProxyTool.from_mcp_tool(client, tool) except McpError as e: if e.error.code == METHOD_NOT_FOUND: pass # No tools available from proxy else: raise e return all_tools async def list_tools(self) -> list[Tool]: """Gets the filtered list of tools including local, mounted, and proxy tools.""" tools_dict = await self.get_tools() return list(tools_dict.values()) async def call_tool(self, key: str, arguments: dict[str, Any]) -> ToolResult: """Calls a tool, trying local/mounted first, then proxy if not found.""" try: # First try local and mounted tools return await super().call_tool(key, arguments) except NotFoundError: # If not found locally, try proxy client = self.client_factory() async with client: result = await client.call_tool(key, arguments) return ToolResult( content=result.content, structured_content=result.structured_content, ) class ProxyResourceManager(ResourceManager): """A ResourceManager that sources its resources from a remote client in addition to local and mounted resources.""" def __init__(self, client_factory: Callable[[], Client], **kwargs): super().__init__(**kwargs) self.client_factory = client_factory async def get_resources(self) -> dict[str, Resource]: """Gets the unfiltered resource inventory including local, mounted, and proxy resources.""" # First get local and mounted resources from parent all_resources = await super().get_resources() # Then add proxy resources, but don't overwrite existing ones try: client = self.client_factory() async with client: client_resources = await client.list_resources() for resource in client_resources: if str(resource.uri) not in all_resources: all_resources[str(resource.uri)] = ( ProxyResource.from_mcp_resource(client, resource) ) except McpError as e: if e.error.code == METHOD_NOT_FOUND: pass # No resources available from proxy else: raise e return all_resources async def get_resource_templates(self) -> dict[str, ResourceTemplate]: """Gets the unfiltered template inventory including local, mounted, and proxy templates.""" # First get local and mounted templates from parent all_templates = await super().get_resource_templates() # Then add proxy templates, but don't overwrite existing ones try: client = self.client_factory() async with client: client_templates = await client.list_resource_templates() for template in client_templates: if template.uriTemplate not in all_templates: all_templates[template.uriTemplate] = ( ProxyTemplate.from_mcp_template(client, template) ) except McpError as e: if e.error.code == METHOD_NOT_FOUND: pass # No templates available from proxy else: raise e return all_templates async def list_resources(self) -> list[Resource]: """Gets the filtered list of resources including local, mounted, and proxy resources.""" resources_dict = await self.get_resources() return list(resources_dict.values()) async def list_resource_templates(self) -> list[ResourceTemplate]: """Gets the filtered list of templates including local, mounted, and proxy templates.""" templates_dict = await self.get_resource_templates() return list(templates_dict.values()) async def read_resource(self, uri: AnyUrl | str) -> str | bytes: """Reads a resource, trying local/mounted first, then proxy if not found.""" try: # First try local and mounted resources return await super().read_resource(uri) except NotFoundError: # If not found locally, try proxy client = self.client_factory() async with client: result = await client.read_resource(uri) if isinstance(result[0], TextResourceContents): return result[0].text elif isinstance(result[0], BlobResourceContents): return result[0].blob else: raise ResourceError(f"Unsupported content type: {type(result[0])}") class ProxyPromptManager(PromptManager): """A PromptManager that sources its prompts from a remote client in addition to local and mounted prompts.""" def __init__(self, client_factory: Callable[[], Client], **kwargs): super().__init__(**kwargs) self.client_factory = client_factory async def get_prompts(self) -> dict[str, Prompt]: """Gets the unfiltered prompt inventory including local, mounted, and proxy prompts.""" # First get local and mounted prompts from parent all_prompts = await super().get_prompts() # Then add proxy prompts, but don't overwrite existing ones try: client = self.client_factory() async with client: client_prompts = await client.list_prompts() for prompt in client_prompts: if prompt.name not in all_prompts: all_prompts[prompt.name] = ProxyPrompt.from_mcp_prompt( client, prompt ) except McpError as e: if e.error.code == METHOD_NOT_FOUND: pass # No prompts available from proxy else: raise e return all_prompts async def list_prompts(self) -> list[Prompt]: """Gets the filtered list of prompts including local, mounted, and proxy prompts.""" prompts_dict = await self.get_prompts() return list(prompts_dict.values()) async def render_prompt( self, name: str, arguments: dict[str, Any] | None = None, ) -> GetPromptResult: """Renders a prompt, trying local/mounted first, then proxy if not found.""" try: # First try local and mounted prompts return await super().render_prompt(name, arguments) except NotFoundError: # If not found locally, try proxy client = self.client_factory() async with client: result = await client.get_prompt(name, arguments) return result class ProxyTool(Tool, MirroredComponent): """ A Tool that represents and executes a tool on a remote server. """ def __init__(self, client: Client, **kwargs): super().__init__(**kwargs) self._client = client @classmethod def from_mcp_tool(cls, client: Client, mcp_tool: mcp.types.Tool) -> ProxyTool: """Factory method to create a ProxyTool from a raw MCP tool schema.""" return cls( client=client, name=mcp_tool.name, description=mcp_tool.description, parameters=mcp_tool.inputSchema, annotations=mcp_tool.annotations, output_schema=mcp_tool.outputSchema, _mirrored=True, ) async def run( self, arguments: dict[str, Any], context: Context | None = None, ) -> ToolResult: """Executes the tool by making a call through the client.""" async with self._client: result = await self._client.call_tool_mcp( name=self.name, arguments=arguments, ) if result.isError: raise ToolError(cast(mcp.types.TextContent, result.content[0]).text) return ToolResult( content=result.content, structured_content=result.structuredContent, ) class ProxyResource(Resource, MirroredComponent): """ A Resource that represents and reads a resource from a remote server. """ _client: Client _value: str | bytes | None = None def __init__( self, client: Client, *, _value: str | bytes | None = None, **kwargs, ): super().__init__(**kwargs) self._client = client self._value = _value @classmethod def from_mcp_resource( cls, client: Client, mcp_resource: mcp.types.Resource, ) -> ProxyResource: """Factory method to create a ProxyResource from a raw MCP resource schema.""" return cls( client=client, uri=mcp_resource.uri, name=mcp_resource.name, description=mcp_resource.description, mime_type=mcp_resource.mimeType or "text/plain", _mirrored=True, ) async def read(self) -> str | bytes: """Read the resource content from the remote server.""" if self._value is not None: return self._value async with self._client: result = await self._client.read_resource(self.uri) if isinstance(result[0], TextResourceContents): return result[0].text elif isinstance(result[0], BlobResourceContents): return result[0].blob else: raise ResourceError(f"Unsupported content type: {type(result[0])}") class ProxyTemplate(ResourceTemplate, MirroredComponent): """ A ResourceTemplate that represents and creates resources from a remote server template. """ def __init__(self, client: Client, **kwargs): super().__init__(**kwargs) self._client = client @classmethod def from_mcp_template( cls, client: Client, mcp_template: mcp.types.ResourceTemplate ) -> ProxyTemplate: """Factory method to create a ProxyTemplate from a raw MCP template schema.""" return cls( client=client, uri_template=mcp_template.uriTemplate, name=mcp_template.name, description=mcp_template.description, mime_type=mcp_template.mimeType or "text/plain", parameters={}, # Remote templates don't have local parameters _mirrored=True, ) async def create_resource( self, uri: str, params: dict[str, Any], context: Context | None = None, ) -> ProxyResource: """Create a resource from the template by calling the remote server.""" # don't use the provided uri, because it may not be the same as the # uri_template on the remote server. # quote params to ensure they are valid for the uri_template parameterized_uri = self.uri_template.format( **{k: quote(v, safe="") for k, v in params.items()} ) async with self._client: result = await self._client.read_resource(parameterized_uri) if isinstance(result[0], TextResourceContents): value = result[0].text elif isinstance(result[0], BlobResourceContents): value = result[0].blob else: raise ResourceError(f"Unsupported content type: {type(result[0])}") return ProxyResource( client=self._client, uri=parameterized_uri, name=self.name, description=self.description, mime_type=result[0].mimeType, _value=value, ) class ProxyPrompt(Prompt, MirroredComponent): """ A Prompt that represents and renders a prompt from a remote server. """ _client: Client def __init__(self, client: Client, **kwargs): super().__init__(**kwargs) self._client = client @classmethod def from_mcp_prompt( cls, client: Client, mcp_prompt: mcp.types.Prompt ) -> ProxyPrompt: """Factory method to create a ProxyPrompt from a raw MCP prompt schema.""" arguments = [ PromptArgument( name=arg.name, description=arg.description, required=arg.required or False, ) for arg in mcp_prompt.arguments or [] ] return cls( client=client, name=mcp_prompt.name, description=mcp_prompt.description, arguments=arguments, _mirrored=True, ) async def render(self, arguments: dict[str, Any]) -> list[PromptMessage]: """Render the prompt by making a call through the client.""" async with self._client: result = await self._client.get_prompt(self.name, arguments) return result.messages class FastMCPProxy(FastMCP): """ A FastMCP server that acts as a proxy to a remote MCP-compliant server. It uses specialized managers that fulfill requests via a client factory. """ def __init__( self, client: Client | None = None, *, client_factory: Callable[[], Client] | None = None, **kwargs, ): """ Initializes the proxy server. FastMCPProxy requires explicit session management via client_factory. Use FastMCP.as_proxy() for convenience with automatic session strategy. Args: client: [DEPRECATED] A Client instance. Use client_factory instead for explicit session management. When provided, a client_factory will be automatically created that provides session isolation for backwards compatibility. client_factory: A callable that returns a Client instance when called. This gives you full control over session creation and reuse. **kwargs: Additional settings for the FastMCP server. """ super().__init__(**kwargs) # Handle client and client_factory parameters if client is not None and client_factory is not None: raise ValueError("Cannot specify both 'client' and 'client_factory'") if client is not None: # Deprecated in 2.10.3 if fastmcp.settings.deprecation_warnings: warnings.warn( "Passing 'client' to FastMCPProxy is deprecated. Use 'client_factory' instead for explicit session management. " "For automatic session strategy, use FastMCP.as_proxy().", DeprecationWarning, stacklevel=2, ) # Create a factory that provides session isolation for backwards compatibility def deprecated_client_factory(): return client.new() self.client_factory = deprecated_client_factory elif client_factory is not None: self.client_factory = client_factory else: raise ValueError("Must specify 'client_factory'") # Replace the default managers with our specialized proxy managers. self._tool_manager = ProxyToolManager(client_factory=self.client_factory) self._resource_manager = ProxyResourceManager( client_factory=self.client_factory ) self._prompt_manager = ProxyPromptManager(client_factory=self.client_factory) async def default_proxy_roots_handler( context: RequestContext[ClientSession, LifespanContextT], ) -> RootsList: """ A handler that forwards the list roots request from the remote server to the proxy's connected clients and relays the response back to the remote server. """ ctx = get_context() return await ctx.list_roots() class ProxyClient(Client[ClientTransportT]): """ A proxy client that forwards advanced interactions between a remote MCP server and the proxy's connected clients. Supports forwarding roots, sampling, elicitation, logging, and progress. """ def __init__( self, transport: ClientTransportT | FastMCP | FastMCP1Server | AnyUrl | Path | MCPConfig | dict[str, Any] | str, **kwargs, ): if "roots" not in kwargs: kwargs["roots"] = default_proxy_roots_handler if "sampling_handler" not in kwargs: kwargs["sampling_handler"] = ProxyClient.default_sampling_handler if "elicitation_handler" not in kwargs: kwargs["elicitation_handler"] = ProxyClient.default_elicitation_handler if "log_handler" not in kwargs: kwargs["log_handler"] = ProxyClient.default_log_handler if "progress_handler" not in kwargs: kwargs["progress_handler"] = ProxyClient.default_progress_handler super().__init__(**kwargs | dict(transport=transport)) @classmethod async def default_sampling_handler( cls, messages: list[mcp.types.SamplingMessage], params: mcp.types.CreateMessageRequestParams, context: RequestContext[ClientSession, LifespanContextT], ) -> mcp.types.CreateMessageResult: """ A handler that forwards the sampling request from the remote server to the proxy's connected clients and relays the response back to the remote server. """ ctx = get_context() content = await ctx.sample( [msg for msg in messages], system_prompt=params.systemPrompt, temperature=params.temperature, max_tokens=params.maxTokens, model_preferences=params.modelPreferences, ) if isinstance(content, mcp.types.ResourceLink | mcp.types.EmbeddedResource): raise RuntimeError("Content is not supported") return mcp.types.CreateMessageResult( role="assistant", model="fastmcp-client", content=content, ) @classmethod async def default_elicitation_handler( cls, message: str, response_type: type, params: mcp.types.ElicitRequestParams, context: RequestContext[ClientSession, LifespanContextT], ) -> ElicitResult: """ A handler that forwards the elicitation request from the remote server to the proxy's connected clients and relays the response back to the remote server. """ ctx = get_context() result = await ctx.elicit(message, response_type) if result.action == "accept": return result.data else: return ElicitResult(action=result.action) @classmethod async def default_log_handler(cls, message: LogMessage) -> None: """ A handler that forwards the log notification from the remote server to the proxy's connected clients. """ ctx = get_context() await ctx.log(message.data, level=message.level, logger_name=message.logger) @classmethod async def default_progress_handler( cls, progress: float, total: float | None, message: str | None, ) -> None: """ A handler that forwards the progress notification from the remote server to the proxy's connected clients. """ ctx = get_context() await ctx.report_progress(progress, total, message)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jabir366/MCPFligh'

If you have feedback or need assistance with the MCP directory API, please join our Discord server