Skip to main content
Glama
CLAUDE.md16.2 kB
# Middleware Layer - Development Memory ## FastMCP Middleware Architecture ### Function-based Middleware Pattern (FastMCP 2.12.2+) ```python from fastmcp import FastMCP from fastmcp.types import Context import asyncio import time from typing import Callable, Any # Modern function-based middleware (FastMCP 2.12.2+ preferred pattern) async def logging_middleware( ctx: Context, next_handler: Callable[[Context], Any] ) -> Any: """Structured logging middleware with modern async patterns.""" start_time = time.perf_counter() # High precision timing # Pre-processing with proper error handling try: await ctx.info( "MCP request started", method=getattr(ctx, "method", "unknown"), params=_sanitize_params(getattr(ctx, "params", {})), request_id=getattr(ctx, "request_id", None) ) # Execute with timeout protection async with asyncio.timeout(60.0): # Prevent hanging requests response = await next_handler(ctx) # Post-processing: Log successful completion duration = time.perf_counter() - start_time await ctx.info( "MCP request completed successfully", duration_ms=round(duration * 1000, 2), response_type=type(response).__name__ ) return response except asyncio.TimeoutError: duration = time.perf_counter() - start_time await ctx.error( "MCP request timed out", duration_ms=round(duration * 1000, 2), timeout_seconds=60.0 ) raise # Preserve timeout context except* (Exception,) as eg: # Exception groups (Python 3.11+) duration = time.perf_counter() - start_time errors = [str(e) for e in eg.exceptions] await ctx.error( "MCP request failed with errors", errors=errors, duration_ms=round(duration * 1000, 2), error_count=len(errors) ) raise # Preserve original exception context except Exception as e: # Fallback for single exceptions duration = time.perf_counter() - start_time await ctx.error( "MCP request failed", error=str(e), error_type=type(e).__name__, duration_ms=round(duration * 1000, 2) ) raise # Always preserve context for FastMCP error handling ``` ### Middleware Dependencies - **FastMCP Context**: For request context and structured logging - **Structlog**: For consistent structured logging across middleware - **Time Module**: For performance measurement and timing ## Middleware Chain Pattern ### Request Flow ``` Client Request → Middleware 1 → Middleware 2 → ... → Handler → Response ↖ ↖ ↖ ↗ Post-process ← Post-process ← Post-process ``` ### Chain Execution ```python async def middleware_template(ctx: Context, next_handler): # 1. Pre-processing (before request) setup_logging() start_timer() try: # 2. Continue chain response = await next_handler(ctx) # 3. Post-processing (after successful request) log_success() record_metrics() return response except Exception as e: # 4. Error processing (after failed request) log_error(e) record_failure() raise # Always re-raise for proper error handling ``` ## Context Usage Patterns ### FastMCP Context Access ```python async def context_aware_middleware(ctx: Context, next_handler): # Access request metadata method = getattr(ctx, "method", "unknown") params = getattr(ctx, "params", {}) # Use context for structured logging await ctx.info("Processing request", method=method, param_count=len(params)) # Continue processing return await next_handler(ctx) ``` ### Context Logging Methods ```python # Information logging await ctx.info("Operation started", operation="deploy", host_id="prod-1") # Warning logging await ctx.warning("Slow operation detected", duration_ms=5000) # Error logging await ctx.error("Operation failed", error=str(e), error_type=type(e).__name__) # Debug logging await ctx.debug("Internal state", cache_size=len(cache)) ``` ## Timing and Performance Middleware ### Performance Measurement ```python async def timing_middleware(ctx: Context, next_handler): """Middleware for timing MCP operations.""" start_time = time.perf_counter() # High precision timing try: response = await next_handler(ctx) # Record successful timing duration = time.perf_counter() - start_time _record_timing(duration, success=True) return response except Exception: # Record failed timing too duration = time.perf_counter() - start_time _record_timing(duration, success=False) raise ``` ### Timing Best Practices ```python # Use perf_counter for precise measurements start_time = time.perf_counter() # Not time.time() # Round to appropriate precision duration_ms = round((time.perf_counter() - start_time) * 1000, 2) duration_seconds = round(time.perf_counter() - start_time, 4) # Always record timing for both success and failure def _record_timing(duration: float, success: bool) -> None: """Record timing metrics for monitoring.""" logger.debug("Request timing", duration_seconds=duration, success=success) # Could extend to send metrics to monitoring system ``` ## Error Handling Middleware ### Comprehensive Error Handling ```python async def error_handling_middleware(ctx: Context, next_handler): """Middleware for handling and logging errors.""" try: return await next_handler(ctx) except Exception as e: # Log with full context and stack trace logger.exception( "Unhandled error in MCP request", error=str(e), error_type=type(e).__name__, method=getattr(ctx, "method", "unknown") ) # Always re-raise - let FastMCP format the error response raise ``` ### Enhanced Error Context Preservation (Python 3.11+) ```python import asyncio from contextlib import AsyncExitStack from typing import Any # CORRECT: Modern error context preservation with full traceability async def error_context_middleware(ctx: Context, next_handler) -> Any: """Enhanced error handling with full context preservation.""" operation_context = { "request_id": getattr(ctx, "request_id", None), "method": getattr(ctx, "method", "unknown"), "start_time": time.perf_counter(), } try: async with AsyncExitStack() as stack: # Add cleanup handlers if needed response = await next_handler(ctx) return response except* (DockerMCPError, SSHError) as eg: # Exception groups # Preserve all error contexts with structured logging for error in eg.exceptions: await ctx.error( "Specific error in middleware chain", error=str(error), error_type=type(error).__name__, **operation_context ) raise # Preserve exception group context except asyncio.TimeoutError: await ctx.error( "Request timeout in middleware", timeout_exceeded=True, **operation_context ) raise # Preserve timeout context except Exception as e: # Preserve original exception with enhanced context enhanced_context = { **operation_context, "error_module": getattr(e, "__module__", "unknown"), "error_class": type(e).__qualname__, "has_cause": e.__cause__ is not None, "has_context": e.__context__ is not None, } await ctx.error( "Unhandled error in middleware chain", error=str(e), **enhanced_context ) # CRITICAL: Always re-raise to preserve full traceback raise # Maintains original exception chain # ERROR ANTI-PATTERNS - DO NOT DO THESE: # ❌ WRONG: Swallowing exceptions loses context async def bad_middleware_swallow(ctx: Context, next_handler): try: return await next_handler(ctx) except Exception as e: await ctx.error("Error occurred", error=str(e)) return {"error": "Generic error"} # LOSES ORIGINAL CONTEXT! # ❌ WRONG: Creating new exceptions loses traceback async def bad_middleware_replace(ctx: Context, next_handler): try: return await next_handler(ctx) except Exception as e: await ctx.error("Error occurred", error=str(e)) raise RuntimeError("Middleware error") # LOSES ORIGINAL TRACEBACK! # ❌ WRONG: Not preserving exception chain async def bad_middleware_chain(ctx: Context, next_handler): try: return await next_handler(ctx) except Exception as e: await ctx.error("Error occurred", error=str(e)) raise RuntimeError("New error") from None # BREAKS EXCEPTION CHAIN! # ✅ CORRECT: Exception chaining when you must wrap async def correct_middleware_wrap(ctx: Context, next_handler): try: return await next_handler(ctx) except Exception as e: await ctx.error("Middleware layer error", original_error=str(e)) # If you must wrap, preserve the chain raise MiddlewareError("Error in middleware") from e # PRESERVES CHAIN ``` ### Context Enrichment Patterns ```python # Automatic context enrichment for all middleware async def context_enrichment_middleware(ctx: Context, next_handler) -> Any: """Automatically enrich context for downstream middleware.""" import contextvars # Create context variables that persist through async calls request_id = contextvars.ContextVar('request_id', default=None) operation_id = contextvars.ContextVar('operation_id', default=None) # Set context for this request chain request_id.set(getattr(ctx, "request_id", f"req_{int(time.time())}")) operation_id.set(getattr(ctx, "method", "unknown_operation")) try: # All downstream operations inherit this context return await next_handler(ctx) except Exception as e: # Context variables automatically available in exception handlers await ctx.error( "Error with full context", error=str(e), request_id=request_id.get(), operation_id=operation_id.get(), context_preserved=True ) raise # Context preserved through exception chain ``` ## Data Sanitization Patterns ### Parameter Sanitization ```python def _sanitize_params(params: dict[str, Any]) -> dict[str, Any]: """Sanitize request parameters for logging.""" if not isinstance(params, dict): return {} sanitized = {} for key, value in params.items(): # Redact sensitive information if any(sensitive in key.lower() for sensitive in ["password", "token", "key", "secret"]): sanitized[key] = "[REDACTED]" elif isinstance(value, str) and len(value) > 1000: # Truncate very long strings (like compose files) sanitized[key] = value[:1000] + "... [TRUNCATED]" else: sanitized[key] = value return sanitized ``` ### Sensitive Data Keywords ```python SENSITIVE_KEYWORDS = [ "password", "passwd", "pwd", "token", "access_token", "refresh_token", "api_token", "key", "api_key", "private_key", "secret_key", "ssh_key", "secret", "client_secret", "auth_secret", "credential", "auth", "authorization" ] def is_sensitive_field(field_name: str) -> bool: """Check if field contains sensitive data.""" field_lower = field_name.lower() return any(sensitive in field_lower for sensitive in SENSITIVE_KEYWORDS) ``` ## Legacy Compatibility Pattern ### Class-based Middleware (Legacy) ```python class LoggingMiddleware: """Legacy class-based middleware for backwards compatibility.""" @staticmethod async def __call__(ctx: Context, next_handler): return await logging_middleware(ctx, next_handler) class TimingMiddleware: """Legacy timing middleware.""" @staticmethod async def __call__(ctx: Context, next_handler): return await timing_middleware(ctx, next_handler) ``` ### Migration Strategy ```python # OLD: Class-based middleware middleware_stack = [LoggingMiddleware(), TimingMiddleware()] # NEW: Function-based middleware (preferred) middleware_stack = [logging_middleware, timing_middleware] # TRANSITION: Both work, but function-based is preferred for new code ``` ## Middleware Registration ### Server Integration ```python from fastmcp import FastMCPServer from .middleware.logging import logging_middleware, timing_middleware, error_handling_middleware # Register middleware in order (executed in sequence) server = FastMCPServer( middleware=[ logging_middleware, # First: Log requests timing_middleware, # Second: Time operations error_handling_middleware, # Last: Handle errors # ... additional middleware ] ) ``` ### Middleware Order Considerations ```python # Typical middleware order: [ logging_middleware, # Always first - logs everything authentication_middleware, # Security before business logic rate_limiting_middleware, # Rate limits after auth timing_middleware, # Performance measurement caching_middleware, # Caching before processing error_handling_middleware, # Error handling (often last) ] ``` ## Structured Logging Integration ### Consistent Log Format ```python # Use structured logging for machine readability logger = structlog.get_logger() async def structured_logging_middleware(ctx: Context, next_handler): # Structured log entries await ctx.info( "MCP request processing", method=getattr(ctx, "method", "unknown"), timestamp=datetime.now().isoformat(), request_id=getattr(ctx, "request_id", None), user_agent=getattr(ctx, "user_agent", None) ) ``` ### Log Context Enrichment ```python # Add context to all logs in this request async def context_enrichment_middleware(ctx: Context, next_handler): # Add request-specific context with structlog.contextvars.bind_contextvars( request_id=generate_request_id(), method=getattr(ctx, "method", "unknown"), timestamp=datetime.now().isoformat() ): return await next_handler(ctx) ``` ## Metrics and Monitoring ### Metrics Collection ```python def _record_timing(duration: float, success: bool) -> None: """Record timing metrics for monitoring systems.""" # Debug logging (always available) logger.debug("Request timing", duration_seconds=duration, success=success) # Metrics system integration (if available) if hasattr(metrics, 'record_request_duration'): metrics.record_request_duration(duration, success) # Prometheus metrics (if configured) if prometheus_metrics: prometheus_metrics.request_duration.observe(duration) prometheus_metrics.request_count.inc(labels={'success': str(success).lower()}) ``` ### Health Check Integration ```python async def health_check_middleware(ctx: Context, next_handler): """Middleware that contributes to health check status.""" try: response = await next_handler(ctx) # Record healthy operation health_tracker.record_success() return response except Exception as e: # Record unhealthy operation health_tracker.record_failure(str(e)) raise ``` Middleware provides cross-cutting concerns like logging, timing, error handling, and security that apply to all MCP requests, ensuring consistent behavior and observability across the entire application.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jmagar/docker-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server