"""FastAPI MCP Server - Tools for introspecting FastAPI applications."""
import sys
import json
import inspect
from typing import Any, Optional
from pathlib import Path
from mcp.server import Server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
import mcp.server.stdio
from fastapi import FastAPI
from fastapi.routing import APIRoute
from pydantic import BaseModel
# Global FastAPI app instance
_app: Optional[FastAPI] = None
def load_app(app_path: str) -> str:
"""Load a FastAPI application from a module path."""
global _app
if ":" not in app_path:
raise ValueError("app_path must be in format 'module.path:app_variable'")
module_path, app_var = app_path.split(":", 1)
# Add current directory to Python path if needed
if "." not in sys.path:
sys.path.insert(0, ".")
# Import the module
parts = module_path.split(".")
module = __import__(module_path)
for part in parts[1:]:
module = getattr(module, part)
# Get the app instance
app = getattr(module, app_var)
if not isinstance(app, FastAPI):
raise ValueError(f"{app_var} is not a FastAPI instance")
_app = app
return f"Successfully loaded FastAPI app from {app_path}"
def get_app() -> FastAPI:
"""Get the currently loaded FastAPI application."""
if _app is None:
raise RuntimeError("No FastAPI application loaded. Use load_app() first.")
return _app
def format_route_info(route: APIRoute) -> dict[str, Any]:
"""Format route information into a dictionary."""
# Get parameter information
params = []
if hasattr(route, 'dependant') and route.dependant:
for param in route.dependant.path_params:
params.append({
"name": param.name,
"type": "path",
"annotation": str(param.type_),
"required": param.required,
})
for param in route.dependant.query_params:
params.append({
"name": param.name,
"type": "query",
"annotation": str(param.type_),
"required": param.required,
"default": str(param.default) if param.default else None,
})
for param in route.dependant.header_params:
params.append({
"name": param.name,
"type": "header",
"annotation": str(param.type_),
"required": param.required,
})
for param in route.dependant.cookie_params:
params.append({
"name": param.name,
"type": "cookie",
"annotation": str(param.type_),
"required": param.required,
})
if route.dependant.body_params:
for param in route.dependant.body_params:
params.append({
"name": param.name,
"type": "body",
"annotation": str(param.type_),
"required": param.required,
})
# Get response model info
response_model = None
if hasattr(route, 'response_model') and route.response_model:
response_model = {
"type": str(route.response_model),
"schema": None,
}
try:
if hasattr(route.response_model, "model_json_schema"):
response_model["schema"] = route.response_model.model_json_schema()
except Exception:
pass
return {
"path": route.path,
"methods": list(route.methods),
"name": route.name,
"operation_id": route.operation_id,
"summary": route.summary,
"description": route.description,
"tags": route.tags or [],
"deprecated": route.deprecated,
"parameters": params,
"response_model": response_model,
"status_code": route.status_code,
}
def get_model_info(model: type[BaseModel]) -> dict[str, Any]:
"""Get information about a Pydantic model."""
try:
schema = model.model_json_schema()
except Exception:
schema = {}
fields_info = {}
if hasattr(model, 'model_fields'):
for field_name, field_info in model.model_fields.items():
fields_info[field_name] = {
"annotation": str(field_info.annotation),
"required": field_info.is_required(),
"default": str(field_info.default) if field_info.default else None,
"description": field_info.description,
}
return {
"name": model.__name__,
"module": model.__module__,
"docstring": model.__doc__,
"fields": fields_info,
"schema": schema,
}
# Initialize MCP server
server = Server("fastapi-mcp-server")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List all available tools."""
return [
Tool(
name="load_fastapi_app",
description="Load a FastAPI application from a module path (e.g., 'main:app' or 'myapp.api:application')",
inputSchema={
"type": "object",
"properties": {
"app_path": {
"type": "string",
"description": "Module path to the FastAPI app in format 'module.path:variable_name'",
},
},
"required": ["app_path"],
},
),
Tool(
name="list_routes",
description="List all routes in the loaded FastAPI application",
inputSchema={
"type": "object",
"properties": {
"method_filter": {
"type": "string",
"description": "Filter by HTTP method (GET, POST, PUT, DELETE, etc.)",
},
"path_filter": {
"type": "string",
"description": "Filter by path pattern (supports wildcards)",
},
"tag_filter": {
"type": "string",
"description": "Filter by OpenAPI tag",
},
},
},
),
Tool(
name="get_route_details",
description="Get detailed information about a specific route",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The route path (e.g., '/users/{user_id}')",
},
"method": {
"type": "string",
"description": "HTTP method (GET, POST, etc.)",
},
},
"required": ["path", "method"],
},
),
Tool(
name="get_openapi_schema",
description="Get the OpenAPI schema for the FastAPI application",
inputSchema={
"type": "object",
"properties": {
"include_only": {
"type": "array",
"items": {"type": "string"},
"description": "List of paths to include (optional)",
},
},
},
),
Tool(
name="list_models",
description="List all Pydantic models used in the FastAPI application",
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(
name="get_model_schema",
description="Get the schema for a specific Pydantic model",
inputSchema={
"type": "object",
"properties": {
"model_name": {
"type": "string",
"description": "Name of the Pydantic model",
},
},
"required": ["model_name"],
},
),
Tool(
name="search_routes",
description="Search for routes by various criteria",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (searches in path, name, summary, description)",
},
"has_auth": {
"type": "boolean",
"description": "Filter by whether route has authentication",
},
},
"required": ["query"],
},
),
Tool(
name="analyze_dependencies",
description="Analyze dependencies used across all routes",
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(
name="get_route_source",
description="Get the source code of a route handler function",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The route path",
},
"method": {
"type": "string",
"description": "HTTP method",
},
},
"required": ["path", "method"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Handle tool calls."""
if name == "load_fastapi_app":
try:
app_path = arguments["app_path"]
app = load_app(app_path)
routes_count = len([r for r in app.routes if isinstance(r, APIRoute)])
return [
TextContent(
type="text",
text=f"✅ Successfully loaded FastAPI app from '{app_path}'\n"
f"Title: {app.title}\n"
f"Version: {app.version}\n"
f"Routes: {routes_count}\n"
f"Debug mode: {app.debug}"
)
]
except Exception as e:
return [TextContent(type="text", text=f"❌ Error loading app: {str(e)}")]
# For all other tools, we need an app loaded
try:
app = get_app()
except RuntimeError as e:
return [TextContent(type="text", text=f"❌ {str(e)}")]
if name == "list_routes":
method_filter = arguments.get("method_filter", "").upper()
path_filter = arguments.get("path_filter", "")
tag_filter = arguments.get("tag_filter", "")
routes = [r for r in app.routes if isinstance(r, APIRoute)]
# Apply filters
if method_filter:
routes = [r for r in routes if method_filter in r.methods]
if path_filter:
routes = [r for r in routes if path_filter in r.path]
if tag_filter:
routes = [r for r in routes if tag_filter in (r.tags or [])]
result = []
for route in routes:
result.append(f"• {', '.join(route.methods)} {route.path} - {route.name}")
if route.summary:
result.append(f" Summary: {route.summary}")
if route.tags:
result.append(f" Tags: {', '.join(route.tags)}")
result.append("")
return [TextContent(type="text", text="\n".join(result) or "No routes found")]
elif name == "get_route_details":
path = arguments["path"]
method = arguments["method"].upper()
route = None
for r in app.routes:
if isinstance(r, APIRoute) and r.path == path and method in r.methods:
route = r
break
if not route:
return [TextContent(type="text", text=f"❌ Route not found: {method} {path}")]
route_info = format_route_info(route)
return [TextContent(type="text", text=json.dumps(route_info, indent=2))]
elif name == "get_openapi_schema":
include_only = arguments.get("include_only")
schema = app.openapi()
if include_only:
# Filter schema to only include specified paths
filtered_paths = {k: v for k, v in schema.get("paths", {}).items() if k in include_only}
schema["paths"] = filtered_paths
return [TextContent(type="text", text=json.dumps(schema, indent=2))]
elif name == "list_models":
models = set()
for route in app.routes:
if isinstance(route, APIRoute):
# Collect response models
if route.response_model and hasattr(route.response_model, "model_json_schema"):
models.add(route.response_model)
# Collect request body models
if route.dependant and route.dependant.body_params:
for param in route.dependant.body_params:
if hasattr(param.type_, "model_json_schema"):
models.add(param.type_)
result = []
for model in sorted(models, key=lambda m: m.__name__):
result.append(f"• {model.__name__} ({model.__module__})")
return [TextContent(type="text", text="\n".join(result) or "No Pydantic models found")]
elif name == "get_model_schema":
model_name = arguments["model_name"]
# Find the model
target_model = None
for route in app.routes:
if isinstance(route, APIRoute):
if route.response_model and hasattr(route.response_model, "__name__"):
if route.response_model.__name__ == model_name:
target_model = route.response_model
break
if route.dependant and route.dependant.body_params:
for param in route.dependant.body_params:
if hasattr(param.type_, "__name__") and param.type_.__name__ == model_name:
target_model = param.type_
break
if not target_model:
return [TextContent(type="text", text=f"❌ Model not found: {model_name}")]
model_info = get_model_info(target_model)
return [TextContent(type="text", text=json.dumps(model_info, indent=2))]
elif name == "search_routes":
query = arguments["query"].lower()
has_auth = arguments.get("has_auth")
routes = [r for r in app.routes if isinstance(r, APIRoute)]
matching_routes = []
for route in routes:
# Search in various fields
searchable = [
route.path.lower(),
route.name.lower(),
(route.summary or "").lower(),
(route.description or "").lower(),
]
if any(query in s for s in searchable):
# Apply auth filter if specified
if has_auth is not None:
# Check if route has dependencies (simplified auth check)
has_deps = route.dependant and len(route.dependant.dependencies) > 0
if has_auth != has_deps:
continue
matching_routes.append(route)
result = []
for route in matching_routes:
result.append(f"• {', '.join(route.methods)} {route.path}")
if route.summary:
result.append(f" {route.summary}")
result.append("")
return [TextContent(type="text", text="\n".join(result) or "No matching routes found")]
elif name == "analyze_dependencies":
dependencies = {}
for route in app.routes:
if isinstance(route, APIRoute) and route.dependant:
for dep in route.dependant.dependencies:
dep_name = dep.call.__name__ if hasattr(dep.call, "__name__") else str(dep.call)
if dep_name not in dependencies:
dependencies[dep_name] = []
dependencies[dep_name].append(f"{', '.join(route.methods)} {route.path}")
result = ["# Dependency Analysis\n"]
for dep_name, routes in sorted(dependencies.items()):
result.append(f"## {dep_name}")
result.append(f"Used in {len(routes)} route(s):")
for route_desc in routes:
result.append(f" • {route_desc}")
result.append("")
return [TextContent(type="text", text="\n".join(result) or "No dependencies found")]
elif name == "get_route_source":
path = arguments["path"]
method = arguments["method"].upper()
route = None
for r in app.routes:
if isinstance(r, APIRoute) and r.path == path and method in r.methods:
route = r
break
if not route:
return [TextContent(type="text", text=f"❌ Route not found: {method} {path}")]
try:
source = inspect.getsource(route.endpoint)
return [TextContent(type="text", text=f"```python\n{source}\n```")]
except Exception as e:
return [TextContent(type="text", text=f"❌ Could not get source: {str(e)}")]
return [TextContent(type="text", text=f"❌ Unknown tool: {name}")]
async def main():
"""Run the MCP server."""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())