MCP Web Tools Server

  • docs
# Advanced MCP Features This document explores advanced features and configurations for Model Context Protocol (MCP) servers. These techniques can help you build more powerful, secure, and maintainable MCP implementations. ## Advanced Configuration ### Server Lifecycle Management The MCP server lifecycle can be managed with the `lifespan` parameter to set up resources on startup and clean them up on shutdown: ```python from contextlib import asynccontextmanager from typing import AsyncIterator, Dict, Any from mcp.server.fastmcp import FastMCP @asynccontextmanager async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]: """Manage server lifecycle.""" print("Server starting up...") # Initialize resources db_connection = await initialize_database() cache = initialize_cache() try: # Yield context to server yield { "db": db_connection, "cache": cache } finally: # Clean up resources print("Server shutting down...") await db_connection.close() cache.clear() # Create server with lifespan mcp = FastMCP("AdvancedServer", lifespan=server_lifespan) # Access lifespan context in tools @mcp.tool() async def query_database(sql: str, ctx: Context) -> str: """Run a database query.""" db = ctx.request_context.lifespan_context["db"] results = await db.execute(sql) return results ``` ### Dependency Specification You can specify dependencies for your server to ensure it has everything it needs: ```python # Specify dependencies for the server mcp = FastMCP( "DependentServer", dependencies=[ "pandas>=1.5.0", "numpy>=1.23.0", "scikit-learn>=1.1.0" ] ) ``` This helps with: - Documentation for users - Verification during installation - Clarity about requirements ### Environment Variables Use environment variables for configuration: ```python import os from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Access environment variables API_KEY = os.environ.get("MY_API_KEY") BASE_URL = os.environ.get("MY_BASE_URL", "https://api.default.com") DEBUG = os.environ.get("DEBUG", "false").lower() == "true" # Create server with configuration mcp = FastMCP( "ConfigurableServer", config={ "api_key": API_KEY, "base_url": BASE_URL, "debug": DEBUG } ) # Access configuration in tools @mcp.tool() async def call_api(endpoint: str, ctx: Context) -> str: """Call an API endpoint.""" config = ctx.server.config base_url = config["base_url"] api_key = config["api_key"] # Use configuration async with httpx.AsyncClient() as client: response = await client.get( f"{base_url}/{endpoint}", headers={"Authorization": f"Bearer {api_key}"} ) return response.text ``` ## Advanced Logging ### Structured Logging Implement structured logging for better analysis: ```python import logging import json from datetime import datetime class StructuredFormatter(logging.Formatter): """Format logs as JSON for structured logging.""" def format(self, record): log_data = { "timestamp": datetime.utcnow().isoformat(), "level": record.levelname, "name": record.name, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno } # Add exception info if present if record.exc_info: log_data["exception"] = self.formatException(record.exc_info) # Add custom fields if present if hasattr(record, "data"): log_data.update(record.data) return json.dumps(log_data) # Set up structured logging logger = logging.getLogger("mcp") handler = logging.FileHandler("mcp_server.log") handler.setFormatter(StructuredFormatter()) logger.addHandler(handler) logger.setLevel(logging.DEBUG) # Log with extra data def log_with_data(level, message, **kwargs): record = logging.LogRecord( name="mcp", level=level, pathname="", lineno=0, msg=message, args=(), exc_info=None ) record.data = kwargs logger.handle(record) # Usage log_with_data( logging.INFO, "Tool execution completed", tool="web_scrape", url="example.com", execution_time=1.25, result_size=1024 ) ``` ### Client Notifications Send logging messages to clients: ```python @mcp.tool() async def process_data(data: str, ctx: Context) -> str: """Process data with client notifications.""" try: # Send info message to client ctx.info("Starting data processing") # Process data in steps ctx.info("Step 1: Parsing data") parsed_data = parse_data(data) ctx.info("Step 2: Analyzing data") analysis = analyze_data(parsed_data) ctx.info("Step 3: Generating report") report = generate_report(analysis) ctx.info("Processing complete") return report except Exception as e: # Send error message to client ctx.error(f"Processing failed: {str(e)}") raise ``` ### Progress Reporting Report progress for long-running operations: ```python @mcp.tool() async def process_large_file(file_path: str, ctx: Context) -> str: """Process a large file with progress reporting.""" try: # Get file size file_size = os.path.getsize(file_path) bytes_processed = 0 # Open file async with aiofiles.open(file_path, "rb") as f: # Process in chunks chunk_size = 1024 * 1024 # 1 MB while True: chunk = await f.read(chunk_size) if not chunk: break # Process chunk process_chunk(chunk) # Update progress bytes_processed += len(chunk) progress = min(100, int(bytes_processed * 100 / file_size)) await ctx.report_progress(progress) # Log milestone if progress % 10 == 0: ctx.info(f"Processed {progress}% of file") return f"File processing complete. Processed {file_size} bytes." except Exception as e: ctx.error(f"File processing failed: {str(e)}") return f"Error: {str(e)}" ``` ## Security Features ### Input Validation Implement thorough input validation: ```python from pydantic import BaseModel, Field, validator class SearchParams(BaseModel): """Validated search parameters.""" query: str = Field(..., min_length=1, max_length=100) days: int = Field(7, ge=1, le=30) limit: int = Field(5, ge=1, le=100) @validator('query') def query_must_be_valid(cls, v): import re if not re.match(r'^[a-zA-Z0-9\s\-.,?!]+$', v): raise ValueError('Query contains invalid characters') return v @mcp.tool() async def search_with_validation(params: dict) -> str: """Search with validated parameters.""" try: # Validate parameters validated = SearchParams(**params) # Proceed with validated parameters results = await perform_search( validated.query, validated.days, validated.limit ) return format_results(results) except Exception as e: return f"Validation error: {str(e)}" ``` ### Rate Limiting Implement rate limiting to prevent abuse: ```python import time from functools import wraps # Simple rate limiter class RateLimiter: def __init__(self, calls_per_minute=60): self.calls_per_minute = calls_per_minute self.interval = 60 / calls_per_minute # seconds per call self.last_call_times = {} async def limit(self, key): """Limit calls for a specific key.""" now = time.time() # Initialize if first call if key not in self.last_call_times: self.last_call_times[key] = [now] return # Get calls within the last minute minute_ago = now - 60 recent_calls = [t for t in self.last_call_times[key] if t > minute_ago] # Check if rate limit exceeded if len(recent_calls) >= self.calls_per_minute: oldest_call = min(recent_calls) wait_time = 60 - (now - oldest_call) raise ValueError(f"Rate limit exceeded. Try again in {wait_time:.1f} seconds.") # Update call times self.last_call_times[key] = recent_calls + [now] # Create rate limiter rate_limiter = RateLimiter(calls_per_minute=10) # Apply rate limiting to a tool @mcp.tool() async def rate_limited_api_call(endpoint: str) -> str: """Call API with rate limiting.""" try: # Apply rate limit await rate_limiter.limit("api_call") # Proceed with API call async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/{endpoint}") return response.text except ValueError as e: return f"Error: {str(e)}" ``` ### Access Control Implement access controls for sensitive operations: ```python # Define access levels class AccessLevel: READ = 1 WRITE = 2 ADMIN = 3 # Access control decorator def require_access(level): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): # Get context from args ctx = None for arg in args: if isinstance(arg, Context): ctx = arg break if ctx is None: for arg_name, arg_value in kwargs.items(): if isinstance(arg_value, Context): ctx = arg_value break if ctx is None: return "Error: Context not provided" # Check access level user_level = get_user_access_level(ctx) if user_level < level: return "Error: Insufficient permissions" # Proceed with function return await func(*args, **kwargs) return wrapper return decorator # Get user access level from context def get_user_access_level(ctx): # In practice, this would use authentication information # For demonstration, return READ