Skip to main content
Glama

FastMCP Todo Server

http_server.py12.4 kB
#!/usr/bin/env python3.11 """ FastMCP HTTP Server for Omnispindle with user-scoped databases. This server uses the recommended FastMCP HTTP transport for remote deployments. Run with: fastmcp run src/Omnispindle/http_server.py """ import asyncio import logging import os from typing import Dict, Any, Optional, Union, List from fastmcp import FastMCP, Context as MCPContext from fastmcp.server.dependencies import get_http_headers from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from dotenv import load_dotenv from src.Omnispindle.context import Context from src.Omnispindle.patches import apply_patches from src.Omnispindle.auth_utils import verify_auth0_token, AUTH_CONFIG from src.Omnispindle.auth_flow import ensure_authenticated, run_async_in_thread from src.Omnispindle import tools # Initialize logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) apply_patches() load_dotenv() # Tool loadout configurations TOOL_LOADOUTS = { "full": [ "add_todo", "query_todos", "update_todo", "delete_todo", "get_todo", "mark_todo_complete", "list_todos_by_status", "search_todos", "list_project_todos", "add_lesson", "get_lesson", "update_lesson", "delete_lesson", "search_lessons", "grep_lessons", "list_lessons", "query_todo_logs", "list_projects", "explain", "add_explanation", "point_out_obvious" ], "basic": [ "add_todo", "query_todos", "update_todo", "get_todo", "mark_todo_complete", "list_todos_by_status", "list_project_todos" ], "minimal": [ "add_todo", "query_todos", "get_todo", "mark_todo_complete" ], "lessons": [ "add_lesson", "get_lesson", "update_lesson", "delete_lesson", "search_lessons", "grep_lessons", "list_lessons" ], "admin": [ "query_todos", "update_todo", "delete_todo", "query_todo_logs", "list_projects", "explain", "add_explanation" ] } # Global variable to store current request headers (not ideal but might work) _current_request_headers = {} class HeaderCaptureMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): global _current_request_headers # Capture headers from the request _current_request_headers = dict(request.headers) logger.info(f"Middleware captured headers: {list(_current_request_headers.keys())}") response = await call_next(request) return response # Create the FastMCP instance that fastmcp run will use mcp = FastMCP("Omnispindle 🌪️") # Add middleware to capture headers (if FastMCP supports it) if hasattr(mcp, 'app') and hasattr(mcp.app, 'add_middleware'): mcp.app.add_middleware(HeaderCaptureMiddleware) logger.info("Added HeaderCaptureMiddleware to FastMCP app") else: logger.warning("FastMCP doesn't support app.add_middleware - using fallback header capture") async def get_authenticated_context_from_mcp(mcp_ctx: MCPContext) -> Context: """ Extract and verify Auth0 token from MCP context. Returns authenticated user context or raises an error. """ token = None # Debug: Log what MCP context contains logger.info(f"MCP Context received: {mcp_ctx}") if mcp_ctx: logger.info(f"MCP Context type: {type(mcp_ctx)}") logger.info(f"MCP Context attributes: {dir(mcp_ctx)}") # Try to get headers from MCP context if mcp_ctx and hasattr(mcp_ctx, 'headers'): headers = mcp_ctx.headers logger.info(f"MCP Context headers: {list(headers.keys()) if headers else 'None'}") if headers: auth_header = headers.get("authorization") or headers.get("Authorization") logger.info(f"Authorization header found in MCP context: {'Yes' if auth_header else 'No'}") if auth_header and auth_header.startswith("Bearer "): token = auth_header[7:] logger.info(f"Token extracted from MCP context (length: {len(token)})") # Try other potential MCP context attributes if not token and mcp_ctx: for attr_name in ['request', 'http_headers', 'context', 'session']: if hasattr(mcp_ctx, attr_name): attr = getattr(mcp_ctx, attr_name) logger.info(f"MCP Context has {attr_name}: {attr}") if hasattr(attr, 'headers'): headers = attr.headers logger.info(f"Found headers in {attr_name}: {list(headers.keys()) if headers else 'None'}") if headers: auth_header = headers.get("authorization") or headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): token = auth_header[7:] logger.info(f"Token extracted from {attr_name} (length: {len(token)})") break # Fallback to get_http_headers if not token: try: request_headers = get_http_headers() logger.info(f"Fallback headers from get_http_headers: {list(request_headers.keys()) if request_headers else 'None'}") if request_headers: auth_header = request_headers.get("authorization") or request_headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): token = auth_header[7:] logger.info(f"Token extracted from get_http_headers (length: {len(token)})") except Exception as e: logger.warning(f"Could not get HTTP headers from FastMCP context: {e}") # Final fallback: check global headers variable if not token and _current_request_headers: logger.info(f"Trying global headers: {list(_current_request_headers.keys())}") auth_header = _current_request_headers.get("authorization") or _current_request_headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): token = auth_header[7:] logger.info(f"Token extracted from global headers (length: {len(token)})") if not token: auth_url = f"https://{AUTH_CONFIG.domain}/authorize?client_id={AUTH_CONFIG.client_id}&audience={AUTH_CONFIG.audience}&response_type=token&redirect_uri=http://localhost:8765/callback" raise ValueError( f"Authentication required. No Authorization header found in request.\n" f"Please obtain a token by visiting: {auth_url}\n" f"Then include it in the Authorization header: 'Bearer <your-token>'" ) # Verify the token user_payload = await verify_auth0_token(token) if not user_payload: raise ValueError("Invalid or expired Auth0 token. Please re-authenticate.") user_payload["auth_method"] = "auth0" logger.info(f"HTTP request authenticated via Auth0: {user_payload.get('sub')}") return Context(user=user_payload) async def get_authenticated_context(request_headers: Optional[Dict[str, str]] = None) -> Context: """ Extract and verify Auth0 token from HTTP request context. Returns authenticated user context or raises an error. """ token = None # First try to get token from FastMCP request headers if not request_headers: try: request_headers = get_http_headers() logger.info(f"Retrieved headers from FastMCP context: {list(request_headers.keys()) if request_headers else 'None'}") except Exception as e: logger.warning(f"Could not get HTTP headers from FastMCP context: {e}") request_headers = {} if request_headers: auth_header = request_headers.get("authorization") or request_headers.get("Authorization") logger.info(f"Authorization header found: {'Yes' if auth_header else 'No'}") if auth_header and auth_header.startswith("Bearer "): token = auth_header[7:] # Remove "Bearer " prefix logger.info(f"Token extracted successfully (length: {len(token)})") elif auth_header: logger.warning(f"Authorization header present but doesn't start with 'Bearer ': {auth_header[:20]}...") if not token: auth_url = f"https://{AUTH_CONFIG.domain}/authorize?client_id={AUTH_CONFIG.client_id}&audience={AUTH_CONFIG.audience}&response_type=token&redirect_uri=http://localhost:8765/callback" raise ValueError( f"Authentication required. No Authorization header found in request.\n" f"Please obtain a token by visiting: {auth_url}\n" f"Then include it in the Authorization header: 'Bearer <your-token>'" ) # Verify the token user_payload = await verify_auth0_token(token) if not user_payload: raise ValueError("Invalid or expired Auth0 token. Please re-authenticate.") user_payload["auth_method"] = "auth0" logger.info(f"HTTP request authenticated via Auth0: {user_payload.get('sub')}") return Context(user=user_payload) # Get tool loadout from environment loadout_name = os.getenv("OMNISPINDLE_TOOL_LOADOUT", "full") if loadout_name not in TOOL_LOADOUTS: logger.warning(f"Unknown loadout '{loadout_name}', using 'full'") loadout_name = "full" selected_tools = TOOL_LOADOUTS[loadout_name] logger.info(f"Loading '{loadout_name}' loadout: {selected_tools}") # Register specific tools manually for HTTP transport compatibility if "add_todo" in selected_tools: @mcp.tool() async def add_todo(description: str, project: str, priority: str = "Medium", target_agent: str = "user", metadata: Optional[Dict[str, Any]] = None, ctx: MCPContext = None): """Creates a task in the specified project with the given priority and target agent.""" auth_ctx = await get_authenticated_context_from_mcp(ctx) return await tools.add_todo(description, project, priority, target_agent, metadata, auth_ctx) if "query_todos" in selected_tools: @mcp.tool() async def query_todos(filter: Optional[Dict[str, Any]] = None, projection: Optional[Dict[str, Any]] = None, limit: int = 100, ctx: MCPContext = None): """Query todos with flexible filtering options from user's database.""" auth_ctx = await get_authenticated_context_from_mcp(ctx) return await tools.query_todos(filter, projection, limit, auth_ctx) if "get_todo" in selected_tools: @mcp.tool() async def get_todo(todo_id: str, ctx: MCPContext = None): """Get a specific todo item by its ID.""" auth_ctx = await get_authenticated_context_from_mcp(ctx) return await tools.get_todo(todo_id, auth_ctx) if "mark_todo_complete" in selected_tools: @mcp.tool() async def mark_todo_complete(todo_id: str, comment: Optional[str] = None, ctx: MCPContext = None): """Mark a todo as completed.""" auth_ctx = await get_authenticated_context_from_mcp(ctx) return await tools.mark_todo_complete(todo_id, comment, auth_ctx) if "update_todo" in selected_tools: @mcp.tool() async def update_todo(todo_id: str, updates: Dict[str, Any], ctx: MCPContext = None): """Update an existing todo with the provided changes.""" auth_ctx = await get_authenticated_context_from_mcp(ctx) return await tools.update_todo(todo_id, updates, auth_ctx) if "list_todos_by_status" in selected_tools: @mcp.tool() async def list_todos_by_status(status: str, limit: int = 100, ctx: MCPContext = None): """List todos filtered by status.""" auth_ctx = await get_authenticated_context_from_mcp(ctx) return await tools.list_todos_by_status(status, limit, auth_ctx) if "list_project_todos" in selected_tools: @mcp.tool() async def list_project_todos(project: str, limit: int = 5, ctx: MCPContext = None): """List recent todos for a specific project.""" auth_ctx = await get_authenticated_context_from_mcp(ctx) return await tools.list_project_todos(project, limit, auth_ctx) # Count all registered tools registered_tools = [t for t in selected_tools if t in ['add_todo', 'query_todos', 'get_todo', 'mark_todo_complete', 'update_todo', 'list_todos_by_status', 'list_project_todos']] logger.info(f"Registered {len(registered_tools)} tools for HTTP transport: {registered_tools}") # The mcp instance is now ready for fastmcp run command

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MadnessEngineering/fastmcp-todo-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server