Skip to main content
Glama

FastMCP Multi-Tenancy

""" FastMCP Multi-Tenant - A serverless, multi-tenant implementation of MCP servers. This module provides a version of FastMCP that's designed to work with Vercel serverless functions with fluid compute mode enabled. """ from __future__ import annotations as _annotations import inspect import json import logging import re from collections.abc import AsyncIterator, Iterable, Sequence from contextlib import ( AbstractAsyncContextManager, asynccontextmanager, ) from itertools import chain from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar from uuid import UUID import anyio import fastapi import pydantic_core from fastapi import APIRouter, Depends, FastAPI, Request from pydantic import BaseModel, Field from pydantic.networks import AnyUrl from pydantic_settings import BaseSettings, SettingsConfigDict from mcp.types import ( AnyFunction, EmbeddedResource, GetPromptResult, ImageContent, TextContent, Prompt as MCPPrompt, PromptArgument as MCPPromptArgument, Resource as MCPResource, ResourceTemplate as MCPResourceTemplate, Tool as MCPTool, ResourceContents, ) from mcp.server.lowlevel.server import LifespanResultT, Server as MCPServer, lifespan as default_lifespan from mcp.shared.context import LifespanContextT, RequestContext from mcp.server.fastmcp.prompts import Prompt, PromptManager from mcp.server.fastmcp.resources import FunctionResource, Resource, ResourceManager from mcp.server.fastmcp.tools import ToolManager from mcp.server.fastmcp.utilities.logging import configure_logging, get_logger from mcp.server.fastmcp.utilities.types import Image from .context import Context from .exceptions import ConfigurationError, ResourceError from .redis_storage import RedisSessionStorage from .session import MultiTenantServerSession from .vercel_sse import VercelSseServerTransport logger = get_logger(__name__) class Settings(BaseSettings, Generic[LifespanResultT]): """ FastMCP multi-tenant server settings. All settings can be configured via environment variables with the prefix FASTMCP_MT_ """ model_config = SettingsConfigDict( env_prefix="FASTMCP_MT_", env_file=".env", extra="ignore", ) # Server settings debug: bool = False log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "ERROR" # Redis settings redis_url: str = "redis://localhost:6379/0" # HTTP settings (mainly for local development) host: str = "0.0.0.0" port: int = 8000 # Resource settings warn_on_duplicate_resources: bool = True # Tool settings warn_on_duplicate_tools: bool = True # Prompt settings warn_on_duplicate_prompts: bool = True # Dependencies dependencies: List[str] = Field( default_factory=list, description="List of dependencies to install in the server environment", ) # Lifespan lifespan: Optional[ Callable[["FastMCPMultiTenant"], AbstractAsyncContextManager[LifespanResultT]] ] = Field(None, description="Lifespan context manager") def lifespan_wrapper( app: FastMCPMultiTenant, lifespan: Callable[["FastMCPMultiTenant"], AbstractAsyncContextManager[LifespanResultT]], ) -> Callable[[MCPServer[LifespanResultT]], AbstractAsyncContextManager[object]]: """Wrap the lifespan context manager for use with the MCP server.""" @asynccontextmanager async def wrap(s: MCPServer[LifespanResultT]) -> AsyncIterator[object]: async with lifespan(app) as context: yield context return wrap class FastMCPMultiTenant: """ FastMCP Multi-Tenant Server This class provides a serverless-compatible implementation of the FastMCP server designed for multi-tenant usage on Vercel with fluid compute mode enabled. """ def __init__( self, name: str | None = None, instructions: str | None = None, **settings: Any ): """ Initialize a new FastMCPMultiTenant server. Args: name: Server name instructions: Server instructions **settings: Additional settings """ self._settings = Settings(**settings) # Store server metadata directly self._name = name or "FastMCP Multi-Tenant" self._instructions = instructions self.version = "0.1.0" # Could be extracted from package self._tool_manager = ToolManager( warn_on_duplicate_tools=self._settings.warn_on_duplicate_tools ) self._resource_manager = ResourceManager( warn_on_duplicate_resources=self._settings.warn_on_duplicate_resources ) self._prompt_manager = PromptManager( warn_on_duplicate_prompts=self._settings.warn_on_duplicate_prompts ) self.dependencies = self._settings.dependencies # Storage for session data self._storage = RedisSessionStorage(self._settings.redis_url) # Create SSE transport self._sse_transport = VercelSseServerTransport( endpoint_base="/messages/", redis_url=self._settings.redis_url, lifespan=lifespan_wrapper(self, self._settings.lifespan or default_lifespan) ) # Configure logging configure_logging(self._settings.log_level) @property def name(self) -> str: """Get the server name.""" return self._name @property def instructions(self) -> str | None: """Get the server instructions.""" return self._instructions async def list_tools(self) -> List[MCPTool]: """ List all available tools. Returns: List of tools """ tools = self._tool_manager.list_tools() return [ MCPTool( name=info.name, description=info.description, inputSchema=info.parameters, ) for info in tools ] async def call_tool( self, name: str, arguments: Dict[str, Any], context: Context ) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """ Call a tool by name with arguments. Args: name: Tool name arguments: Tool arguments context: Optional context object (injected by request handler) Returns: Tool result """ logger.debug(f"server.call_tool {name} with arguments {arguments} and context={context}") result = await self._tool_manager.call_tool(name, arguments, context=context) return _convert_to_content(result) async def list_resources(self) -> List[MCPResource]: """ List all available resources. Returns: List of resources """ resources = self._resource_manager.list_resources() return [ MCPResource( uri=resource.uri, name=resource.name or "", description=resource.description, mimeType=resource.mime_type, ) for resource in resources ] async def list_resource_templates(self) -> List[MCPResourceTemplate]: """ List all available resource templates. Returns: List of resource templates """ templates = self._resource_manager.list_templates() return [ MCPResourceTemplate( uriTemplate=template.uri_template, name=template.name, description=template.description, ) for template in templates ] async def read_resource(self, uri: AnyUrl | str) -> Iterable[ResourceContents]: """ Read a resource by URI. Args: uri: Resource URI Returns: Resource contents """ resource = await self._resource_manager.get_resource(uri) if not resource: raise ResourceError(f"Unknown resource: {uri}") try: content = await resource.read() return [ResourceContents(content=content, mime_type=resource.mime_type)] except Exception as e: logger.error(f"Error reading resource {uri}: {e}") raise ResourceError(str(e)) def add_tool( self, fn: AnyFunction, name: str | None = None, description: str | None = None, ) -> None: """ Add a tool to the server. Args: fn: Tool function name: Tool name (defaults to function name) description: Tool description (defaults to function docstring) """ self._tool_manager.add_tool(fn, name, description) def tool( self, name: str | None = None, description: str | None = None ) -> Callable[[AnyFunction], AnyFunction]: """ Decorator to add a tool to the server. Example: ```python @server.tool() def my_tool(x: int, ctx: Context) -> str: # Use context for logging, etc. ctx.info(f"Processing {x}") # Access entity ID entity_id = ctx.entity_id return str(x) ``` Args: name: Tool name (defaults to function name) description: Tool description (defaults to function docstring) Returns: Decorator function """ def decorator(fn: AnyFunction) -> AnyFunction: self.add_tool(fn, name, description) return fn return decorator def add_resource(self, resource: Resource) -> None: """ Add a resource to the server. Args: resource: Resource object """ self._resource_manager.add_resource(resource) def resource( self, uri: str, name: str | None = None, description: str | None = None, mime_type: str | None = None, ) -> Callable[[AnyFunction], AnyFunction]: """ Decorator to add a resource to the server. Example: ```python @server.resource("resource://data", mime_type="application/json") def get_data(ctx: Context) -> str: # Access entity ID to customize response entity_id = ctx.entity_id return json.dumps({"entity": entity_id, "data": "value"}) ``` Args: uri: Resource URI name: Resource name description: Resource description mime_type: Resource MIME type Returns: Decorator function """ def decorator(fn: AnyFunction) -> AnyFunction: resource = FunctionResource( uri=uri, fn=fn, name=name, description=description, mime_type=mime_type, ) self.add_resource(resource) return fn return decorator def add_prompt(self, prompt: Prompt) -> None: """ Add a prompt to the server. Args: prompt: Prompt object """ self._prompt_manager.add_prompt(prompt) def prompt( self, name: str | None = None, description: str | None = None ) -> Callable[[AnyFunction], AnyFunction]: """ Decorator to add a prompt to the server. Example: ```python @server.prompt("greeting") def greeting_prompt(name: str, ctx: Context) -> str: # Customize greeting based on entity entity_id = ctx.entity_id return f"Hello, {name}! Welcome to {entity_id}!" ``` Args: name: Prompt name description: Prompt description Returns: Decorator function """ def decorator(fn: AnyFunction) -> AnyFunction: prompt = Prompt.from_function( fn, name=name, description=description, ) self.add_prompt(prompt) return fn return decorator async def list_prompts(self) -> list[MCPPrompt]: """List all available prompts.""" prompts = self._prompt_manager.list_prompts() return [ MCPPrompt( name=prompt.name, description=prompt.description, arguments=[ MCPPromptArgument( name=arg.name, description=arg.description, required=arg.required, ) for arg in (prompt.arguments or []) ], ) for prompt in prompts ] async def get_prompt( self, name: str, arguments: Dict[str, Any] | None = None, context: Optional[Context] = None ) -> GetPromptResult: """Get a prompt by name with arguments.""" try: messages = await self._prompt_manager.render_prompt(name, arguments) return GetPromptResult(messages=pydantic_core.to_jsonable_python(messages)) except Exception as e: logger.error(f"Error getting prompt {name}: {e}") raise ValueError(str(e)) def get_context(self, request_context: Optional[RequestContext] = None, entity_id: Optional[str] = None) -> Context: """ Get a context object for the current request. Args: request_context: Optional request context entity_id: Optional entity ID for multi-tenancy Returns: A Context object """ context = Context(request_context=request_context, fastmcp=self, entity_id=entity_id) return context def create_fastapi_app(self) -> FastAPI: """Create a FastAPI app for this server.""" app = FastAPI( title=self.name, description=self.instructions or "", debug=self._settings.debug, ) # Create a router for our endpoints router = APIRouter() # Add the router to the app with a prefix for entity IDs app.include_router(router, prefix="/{entity_id}") # Set up SSE transport @app.on_event("startup") async def setup_sse(): await self._sse_transport.setup_fastapi(app) # Add health check endpoint @router.get("/health") async def health_check(entity_id: str): return {"status": "ok", "server": self.name, "entity_id": entity_id} # Add startup and shutdown handlers @app.on_event("startup") async def startup(): """Startup event handler.""" logger.info(f"Starting {self.name} server") @app.on_event("shutdown") async def shutdown(): """Shutdown event handler.""" logger.info(f"Shutting down {self.name} server") if hasattr(self, "_sse_transport"): await self._sse_transport.close() if hasattr(self, "_storage"): await self._storage.close() return app def get_handler_map(self) -> Dict[str, Callable]: """ Get a map of handler methods that can be registered with an MCP server. Returns: Dictionary mapping handler names to handler methods """ return { "list_tools": self.list_tools, "call_tool": self.call_tool, "list_resources": self.list_resources, "read_resource": self.read_resource, "list_prompts": self.list_prompts, "get_prompt": self.get_prompt, "list_resource_templates": self.list_resource_templates, } def _convert_to_content( result: Any, ) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """Convert a result to a sequence of content objects.""" if result is None: return [] if isinstance(result, (TextContent, ImageContent, EmbeddedResource)): return [result] if isinstance(result, Image): return [result.to_image_content()] if isinstance(result, (list, tuple)): return list(chain.from_iterable(_convert_to_content(item) for item in result)) if not isinstance(result, str): try: result = json.dumps(pydantic_core.to_jsonable_python(result)) except Exception: result = str(result) return [TextContent(type="text", text=result)]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timothywangdev/McpToolKit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server