Skip to main content
Glama
chadkunsman

NetBox MCP Server

by chadkunsman
best-practices.md35.3 kB
# Best Practices Guide This guide covers production-ready patterns, performance optimization, security considerations, and code quality standards for FastMCP servers. ## Code Organization ### Project Structure ``` src/ ├── server.py # Main server entry point ├── config/ │ ├── __init__.py │ ├── settings.py # Configuration management │ └── logging.py # Logging configuration ├── tools/ │ ├── __init__.py │ ├── base.py # Base tool classes │ ├── data_tools.py # Data manipulation tools │ ├── api_tools.py # External API tools │ └── admin_tools.py # Administrative tools ├── resources/ │ ├── __init__.py │ ├── static.py # Static resources │ └── dynamic.py # Dynamic resources ├── models/ │ ├── __init__.py │ ├── requests.py # Request models │ └── responses.py # Response models ├── utils/ │ ├── __init__.py │ ├── auth.py # Authentication utilities │ ├── cache.py # Caching utilities │ └── validation.py # Validation helpers └── exceptions/ ├── __init__.py └── custom.py # Custom exceptions ``` ### Configuration Management ```python # src/config/settings.py import os from functools import lru_cache from pydantic import BaseSettings, Field class Settings(BaseSettings): """Application settings.""" # Server Configuration app_name: str = Field(default="FastMCP Server", env="APP_NAME") version: str = Field(default="1.0.0", env="VERSION") debug: bool = Field(default=False, env="DEBUG") log_level: str = Field(default="INFO", env="LOG_LEVEL") # Transport Configuration port: int = Field(default=8000, env="PORT") host: str = Field(default="0.0.0.0", env="HOST") # Database Configuration database_url: str = Field(default="sqlite:///app.db", env="DATABASE_URL") redis_url: str = Field(default="redis://localhost:6379", env="REDIS_URL") # External APIs openai_api_key: str = Field(default="", env="OPENAI_API_KEY") external_api_key: str = Field(default="", env="EXTERNAL_API_KEY") # Authentication oauth_client_id: str = Field(default="", env="OAUTH_CLIENT_ID") oauth_client_secret: str = Field(default="", env="OAUTH_CLIENT_SECRET") jwt_secret_key: str = Field(default="dev-secret", env="JWT_SECRET_KEY") # Performance max_concurrent_requests: int = Field(default=100, env="MAX_CONCURRENT_REQUESTS") request_timeout: int = Field(default=30, env="REQUEST_TIMEOUT") class Config: env_file = ".env" case_sensitive = False @lru_cache() def get_settings() -> Settings: """Get cached settings instance.""" return Settings() # Usage in your server settings = get_settings() ``` ## Error Handling ### Structured Error Handling ```python # src/exceptions/custom.py from fastmcp.exceptions import ToolError class ValidationError(ToolError): """Raised when input validation fails.""" pass class ExternalAPIError(ToolError): """Raised when external API calls fail.""" pass class AuthenticationError(ToolError): """Raised when authentication fails.""" pass class RateLimitError(ToolError): """Raised when rate limits are exceeded.""" pass # src/utils/error_handler.py import logging from typing import Any, Dict from fastmcp.exceptions import ToolError logger = logging.getLogger(__name__) def handle_external_api_error(e: Exception, context: str = "") -> ToolError: """Convert external API errors to ToolError.""" logger.error(f"External API error in {context}: {str(e)}") if "timeout" in str(e).lower(): return ExternalAPIError(f"Request timeout: {context}") elif "401" in str(e) or "unauthorized" in str(e).lower(): return ExternalAPIError(f"Authentication failed: {context}") elif "429" in str(e) or "rate limit" in str(e).lower(): return RateLimitError(f"Rate limit exceeded: {context}") else: return ExternalAPIError(f"External service unavailable: {context}") def safe_tool_execution(func): """Decorator for safe tool execution.""" async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except ToolError: # Re-raise ToolErrors (they're safe to show to clients) raise except Exception as e: # Log the actual error for debugging logger.exception(f"Unexpected error in {func.__name__}") # Return generic error to client raise ToolError(f"Internal error in {func.__name__}") return wrapper ``` ### Tool Error Handling Patterns ```python # src/tools/data_tools.py from typing import List, Dict, Any from pydantic import ValidationError import aiohttp import asyncio from ..exceptions.custom import ValidationError as CustomValidationError, ExternalAPIError from ..utils.error_handler import handle_external_api_error, safe_tool_execution @mcp.tool() @safe_tool_execution async def process_data(data: List[Dict[str, Any]], operation: str) -> Dict[str, Any]: """Process data with comprehensive error handling.""" # Input validation if not data: raise CustomValidationError("Data list cannot be empty") if operation not in ["sum", "average", "count"]: raise CustomValidationError("Operation must be one of: sum, average, count") try: # Process data if operation == "sum": result = sum(item.get("value", 0) for item in data) elif operation == "average" and data: result = sum(item.get("value", 0) for item in data) / len(data) elif operation == "count": result = len(data) else: raise CustomValidationError(f"Cannot perform {operation} on empty data") return { "operation": operation, "result": result, "count": len(data) } except TypeError as e: raise CustomValidationError(f"Invalid data format: {str(e)}") except ZeroDivisionError: raise CustomValidationError("Cannot calculate average of empty dataset") @mcp.tool() @safe_tool_execution async def fetch_external_data(url: str, timeout: int = 30) -> Dict[str, Any]: """Fetch data from external API with proper error handling.""" try: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session: async with session.get(url) as response: response.raise_for_status() return await response.json() except asyncio.TimeoutError: raise ExternalAPIError(f"Request to {url} timed out after {timeout}s") except aiohttp.ClientResponseError as e: raise handle_external_api_error(e, f"fetching {url}") except aiohttp.ClientError as e: raise handle_external_api_error(e, f"connecting to {url}") except Exception as e: raise handle_external_api_error(e, f"processing response from {url}") ``` ## Input Validation and Data Models ### Comprehensive Pydantic Models ```python # src/models/requests.py from pydantic import BaseModel, Field, validator, root_validator from typing import List, Optional, Union, Literal from datetime import datetime from enum import Enum class Priority(str, Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" class TaskRequest(BaseModel): """Request model for task creation.""" title: str = Field(..., min_length=1, max_length=200, description="Task title") description: Optional[str] = Field(None, max_length=1000, description="Task description") priority: Priority = Field(default=Priority.MEDIUM, description="Task priority") due_date: Optional[datetime] = Field(None, description="Task due date") tags: List[str] = Field(default_factory=list, max_items=10, description="Task tags") assignee: Optional[str] = Field(None, regex=r'^[a-zA-Z0-9_]+$', description="Username") @validator('due_date') def due_date_must_be_future(cls, v): if v and v <= datetime.now(): raise ValueError('Due date must be in the future') return v @validator('tags') def validate_tags(cls, v): if v: # Remove duplicates and validate format unique_tags = list(set(tag.strip().lower() for tag in v if tag.strip())) if len(unique_tags) != len(v): raise ValueError('Tags must be unique') return unique_tags return v class SearchRequest(BaseModel): """Request model for search operations.""" query: str = Field(..., min_length=1, max_length=500, description="Search query") filters: Optional[Dict[str, Union[str, int, bool]]] = Field(default_factory=dict) limit: int = Field(default=10, ge=1, le=100, description="Number of results") offset: int = Field(default=0, ge=0, description="Results offset") sort_by: Optional[str] = Field(None, description="Sort field") sort_order: Literal["asc", "desc"] = Field(default="asc", description="Sort order") @root_validator def validate_search_params(cls, values): sort_by = values.get('sort_by') if sort_by and sort_by not in ['created_at', 'updated_at', 'title', 'priority']: raise ValueError('Invalid sort_by field') return values # Usage in tools @mcp.tool() async def create_task(request: TaskRequest) -> Dict[str, Any]: """Create a task with validated input.""" # The request is automatically validated by Pydantic return { "task_id": generate_task_id(), "title": request.title, "description": request.description, "priority": request.priority.value, "status": "created" } ``` ### Advanced Validation Patterns ```python # src/utils/validation.py from typing import Any, Dict, List from pydantic import BaseModel, validator import re class EmailValidator(BaseModel): """Email validation helper.""" @staticmethod def validate_email(email: str) -> bool: pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool(re.match(pattern, email)) class URLValidator(BaseModel): """URL validation helper.""" @staticmethod def validate_url(url: str) -> bool: pattern = r'^https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:\w*))?)?$' return bool(re.match(pattern, url)) class BusinessRuleValidator: """Custom business rule validations.""" @staticmethod def validate_user_permissions(user_id: str, required_permissions: List[str]) -> bool: """Validate user has required permissions.""" # Implementation depends on your auth system user_permissions = get_user_permissions(user_id) return all(perm in user_permissions for perm in required_permissions) @staticmethod def validate_data_consistency(data: Dict[str, Any]) -> List[str]: """Validate data consistency rules.""" errors = [] # Example: Check date ranges if data.get('start_date') and data.get('end_date'): if data['start_date'] >= data['end_date']: errors.append("Start date must be before end date") # Example: Check numeric ranges if data.get('min_value') and data.get('max_value'): if data['min_value'] >= data['max_value']: errors.append("Min value must be less than max value") return errors # Usage in tools @mcp.tool() async def validated_tool(email: str, url: str, ctx: Context) -> Dict[str, Any]: """Tool with comprehensive validation.""" # Validate email if not EmailValidator.validate_email(email): raise ValidationError("Invalid email format") # Validate URL if not URLValidator.validate_url(url): raise ValidationError("Invalid URL format") # Validate business rules if not BusinessRuleValidator.validate_user_permissions(ctx.client_id, ["read_data"]): raise AuthenticationError("Insufficient permissions") return {"email": email, "url": url, "status": "validated"} ``` ## Performance Optimization ### Async Patterns and Concurrency ```python # src/utils/concurrency.py import asyncio from typing import List, Dict, Any, Callable, Awaitable import aiohttp from contextlib import asynccontextmanager class ConcurrencyManager: """Manage concurrent operations with limits.""" def __init__(self, max_concurrent: int = 10): self.semaphore = asyncio.Semaphore(max_concurrent) async def run_with_limit(self, coro: Awaitable) -> Any: """Run coroutine with concurrency limit.""" async with self.semaphore: return await coro # Global concurrency manager concurrency = ConcurrencyManager(max_concurrent=50) @mcp.tool() async def batch_fetch_data(urls: List[str]) -> List[Dict[str, Any]]: """Fetch data from multiple URLs concurrently.""" async def fetch_single(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: """Fetch data from a single URL.""" try: async with session.get(url) as response: response.raise_for_status() data = await response.json() return {"url": url, "data": data, "status": "success"} except Exception as e: return {"url": url, "error": str(e), "status": "error"} async with aiohttp.ClientSession() as session: # Limit concurrent requests tasks = [ concurrency.run_with_limit(fetch_single(session, url)) for url in urls ] results = await asyncio.gather(*tasks, return_exceptions=False) return results @mcp.tool() async def parallel_processing(data_items: List[Dict[str, Any]]) -> Dict[str, Any]: """Process multiple data items in parallel.""" async def process_item(item: Dict[str, Any]) -> Dict[str, Any]: """Process a single data item.""" # Simulate processing time await asyncio.sleep(0.1) processed_value = item.get("value", 0) * 2 return { "id": item.get("id"), "original_value": item.get("value"), "processed_value": processed_value } # Process items concurrently with limit tasks = [ concurrency.run_with_limit(process_item(item)) for item in data_items ] processed_items = await asyncio.gather(*tasks) return { "total_items": len(data_items), "processed_items": processed_items, "summary": { "total_original": sum(item.get("value", 0) for item in data_items), "total_processed": sum(item["processed_value"] for item in processed_items) } } ``` ### Caching Strategies ```python # src/utils/cache.py import redis.asyncio as redis import json import hashlib from typing import Any, Optional, Union from datetime import timedelta import functools class CacheManager: """Centralized cache management.""" def __init__(self, redis_url: str): self.redis = redis.from_url(redis_url) def _make_key(self, prefix: str, *args, **kwargs) -> str: """Generate cache key from arguments.""" key_data = json.dumps([args, sorted(kwargs.items())], sort_keys=True) key_hash = hashlib.md5(key_data.encode()).hexdigest() return f"{prefix}:{key_hash}" async def get(self, key: str) -> Optional[Any]: """Get value from cache.""" try: value = await self.redis.get(key) return json.loads(value) if value else None except Exception: return None async def set(self, key: str, value: Any, ttl: int = 3600) -> bool: """Set value in cache.""" try: await self.redis.setex(key, ttl, json.dumps(value)) return True except Exception: return False async def delete(self, key: str) -> bool: """Delete value from cache.""" try: await self.redis.delete(key) return True except Exception: return False def cached(self, prefix: str, ttl: int = 3600): """Decorator for caching function results.""" def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): # Generate cache key cache_key = self._make_key(prefix, *args, **kwargs) # Try to get from cache cached_result = await self.get(cache_key) if cached_result is not None: return cached_result # Execute function and cache result result = await func(*args, **kwargs) await self.set(cache_key, result, ttl) return result return wrapper return decorator # Global cache manager cache = CacheManager(get_settings().redis_url) # Usage in tools @mcp.tool() @cache.cached("weather_data", ttl=1800) # Cache for 30 minutes async def get_weather_data(city: str, country: str = "US") -> Dict[str, Any]: """Get weather data with caching.""" # This function will only be called if data is not in cache async with aiohttp.ClientSession() as session: url = f"https://api.weather.com/v1/current?city={city}&country={country}" async with session.get(url) as response: return await response.json() @mcp.tool() async def expensive_calculation(numbers: List[int]) -> Dict[str, Any]: """Perform expensive calculation with manual caching.""" # Check cache first cache_key = cache._make_key("calculation", numbers) cached_result = await cache.get(cache_key) if cached_result: cached_result["from_cache"] = True return cached_result # Perform calculation result = { "sum": sum(numbers), "average": sum(numbers) / len(numbers) if numbers else 0, "max": max(numbers) if numbers else None, "min": min(numbers) if numbers else None, "from_cache": False } # Cache result for 1 hour await cache.set(cache_key, result, ttl=3600) return result ``` ### Database Connection Pooling ```python # src/utils/database.py import asyncpg import asyncio from contextlib import asynccontextmanager from typing import AsyncContextManager, Dict, Any, List class DatabaseManager: """Database connection manager with pooling.""" def __init__(self, database_url: str, min_connections: int = 5, max_connections: int = 20): self.database_url = database_url self.min_connections = min_connections self.max_connections = max_connections self.pool = None async def connect(self): """Initialize connection pool.""" self.pool = await asyncpg.create_pool( self.database_url, min_size=self.min_connections, max_size=self.max_connections ) async def disconnect(self): """Close connection pool.""" if self.pool: await self.pool.close() @asynccontextmanager async def get_connection(self) -> AsyncContextManager[asyncpg.Connection]: """Get database connection from pool.""" async with self.pool.acquire() as connection: yield connection async def execute_query(self, query: str, *args) -> List[Dict[str, Any]]: """Execute query and return results.""" async with self.get_connection() as conn: results = await conn.fetch(query, *args) return [dict(row) for row in results] async def execute_single(self, query: str, *args) -> Dict[str, Any]: """Execute query and return single result.""" async with self.get_connection() as conn: result = await conn.fetchrow(query, *args) return dict(result) if result else {} # Global database manager db = DatabaseManager(get_settings().database_url) # Usage in tools @mcp.tool() async def get_user_profile(user_id: str) -> Dict[str, Any]: """Get user profile from database.""" query = """ SELECT id, username, email, created_at, last_login FROM users WHERE id = $1 AND active = true """ user = await db.execute_single(query, user_id) if not user: raise ValidationError(f"User {user_id} not found") return { "user_id": user["id"], "username": user["username"], "email": user["email"], "member_since": user["created_at"].isoformat(), "last_active": user["last_login"].isoformat() if user["last_login"] else None } ``` ## Security Best Practices ### Input Sanitization ```python # src/utils/security.py import re import html from typing import Any, Dict import bleach class InputSanitizer: """Centralized input sanitization.""" @staticmethod def sanitize_string(value: str, max_length: int = 1000) -> str: """Sanitize string input.""" if not isinstance(value, str): raise ValueError("Input must be a string") # Truncate length value = value[:max_length] # Remove null bytes value = value.replace('\x00', '') # HTML escape value = html.escape(value) return value.strip() @staticmethod def sanitize_html(value: str, allowed_tags: List[str] = None) -> str: """Sanitize HTML content.""" if allowed_tags is None: allowed_tags = ['p', 'br', 'strong', 'em', 'ul', 'ol', 'li'] return bleach.clean(value, tags=allowed_tags, strip=True) @staticmethod def validate_sql_injection(value: str) -> bool: """Check for potential SQL injection patterns.""" dangerous_patterns = [ r"(\bUNION\b.*\bSELECT\b)", r"(\bDROP\b.*\bTABLE\b)", r"(\bINSERT\b.*\bINTO\b)", r"(\bDELETE\b.*\bFROM\b)", r"(\bUPDATE\b.*\bSET\b)", r"('.*OR.*'.*')", r"(\bEXEC\b|\bEXECUTE\b)", ] for pattern in dangerous_patterns: if re.search(pattern, value, re.IGNORECASE): return False return True @staticmethod def sanitize_filename(filename: str) -> str: """Sanitize filename for safe storage.""" # Remove dangerous characters filename = re.sub(r'[<>:"/\\|?*]', '', filename) # Remove leading/trailing dots and spaces filename = filename.strip(' .') # Limit length filename = filename[:255] return filename or "unnamed_file" # Usage in tools @mcp.tool() async def create_document(title: str, content: str, filename: str) -> Dict[str, Any]: """Create document with sanitized input.""" # Sanitize inputs safe_title = InputSanitizer.sanitize_string(title, max_length=200) safe_content = InputSanitizer.sanitize_html(content) safe_filename = InputSanitizer.sanitize_filename(filename) # Additional validation if not InputSanitizer.validate_sql_injection(safe_title): raise ValidationError("Invalid characters in title") # Create document document_id = await create_document_in_db(safe_title, safe_content, safe_filename) return { "document_id": document_id, "title": safe_title, "filename": safe_filename, "status": "created" } ``` ### Rate Limiting ```python # src/utils/rate_limit.py import time from collections import defaultdict, deque from typing import Dict, Optional import asyncio class RateLimiter: """Token bucket rate limiter.""" def __init__(self, max_requests: int = 100, window_seconds: int = 3600): self.max_requests = max_requests self.window_seconds = window_seconds self.buckets: Dict[str, deque] = defaultdict(deque) self.lock = asyncio.Lock() async def is_allowed(self, key: str) -> bool: """Check if request is allowed for the given key.""" async with self.lock: now = time.time() bucket = self.buckets[key] # Remove old requests outside the window while bucket and bucket[0] <= now - self.window_seconds: bucket.popleft() # Check if under limit if len(bucket) >= self.max_requests: return False # Add current request bucket.append(now) return True async def get_remaining(self, key: str) -> int: """Get remaining requests for the key.""" async with self.lock: now = time.time() bucket = self.buckets[key] # Remove old requests while bucket and bucket[0] <= now - self.window_seconds: bucket.popleft() return max(0, self.max_requests - len(bucket)) # Global rate limiters general_limiter = RateLimiter(max_requests=1000, window_seconds=3600) # 1000/hour expensive_limiter = RateLimiter(max_requests=10, window_seconds=60) # 10/minute # Rate limiting decorator def rate_limit(limiter: RateLimiter, key_func: Optional[callable] = None): """Rate limiting decorator.""" def decorator(func): async def wrapper(*args, **kwargs): # Extract context if available ctx = kwargs.get('ctx') or (args[-1] if args and hasattr(args[-1], 'client_id') else None) if ctx and ctx.client_id: rate_key = key_func(ctx) if key_func else ctx.client_id if not await limiter.is_allowed(rate_key): remaining = await limiter.get_remaining(rate_key) raise RateLimitError(f"Rate limit exceeded. {remaining} requests remaining.") return await func(*args, **kwargs) return wrapper return decorator # Usage in tools @mcp.tool() @rate_limit(general_limiter) async def regular_tool(data: str, ctx: Context) -> Dict[str, Any]: """Regular tool with standard rate limiting.""" return {"processed": data, "user": ctx.client_id} @mcp.tool() @rate_limit(expensive_limiter, key_func=lambda ctx: f"expensive:{ctx.client_id}") async def expensive_tool(complex_data: List[Dict], ctx: Context) -> Dict[str, Any]: """Expensive tool with stricter rate limiting.""" # Simulate expensive operation await asyncio.sleep(2) return {"processed_count": len(complex_data)} ``` ## Logging and Monitoring ### Structured Logging ```python # src/config/logging.py import logging import json import sys from datetime import datetime from typing import Any, Dict class StructuredFormatter(logging.Formatter): """JSON formatter for structured logging.""" def format(self, record: logging.LogRecord) -> str: log_obj = { "timestamp": datetime.utcnow().isoformat(), "level": record.levelname, "logger": record.name, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno, } # Add exception info if present if record.exc_info: log_obj["exception"] = self.formatException(record.exc_info) # Add extra fields for key, value in record.__dict__.items(): if key not in ["name", "msg", "args", "levelname", "levelno", "pathname", "filename", "module", "lineno", "funcName", "created", "msecs", "relativeCreated", "thread", "threadName", "processName", "process", "getMessage", "exc_info", "exc_text", "stack_info"]: log_obj[key] = value return json.dumps(log_obj) def setup_logging(level: str = "INFO", structured: bool = True): """Setup application logging.""" # Clear existing handlers root_logger = logging.getLogger() for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) # Create handler handler = logging.StreamHandler(sys.stdout) if structured: handler.setFormatter(StructuredFormatter()) else: handler.setFormatter( logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) ) # Configure root logger root_logger.addHandler(handler) root_logger.setLevel(getattr(logging, level.upper())) # Suppress noisy loggers logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("asyncio").setLevel(logging.WARNING) # Usage in tools logger = logging.getLogger(__name__) @mcp.tool() async def monitored_tool(operation: str, data: Dict[str, Any], ctx: Context) -> Dict[str, Any]: """Tool with comprehensive monitoring.""" start_time = time.time() logger.info( "Tool execution started", extra={ "tool_name": "monitored_tool", "user_id": ctx.client_id, "operation": operation, "data_size": len(str(data)) } ) try: # Simulate processing result = {"operation": operation, "status": "completed"} logger.info( "Tool execution completed", extra={ "tool_name": "monitored_tool", "user_id": ctx.client_id, "operation": operation, "execution_time": time.time() - start_time, "success": True } ) return result except Exception as e: logger.error( "Tool execution failed", extra={ "tool_name": "monitored_tool", "user_id": ctx.client_id, "operation": operation, "execution_time": time.time() - start_time, "error": str(e), "success": False }, exc_info=True ) raise ``` ## Testing Best Practices ### Test Organization ```python # tests/conftest.py import pytest import asyncio from unittest.mock import AsyncMock, MagicMock from fastmcp import FastMCP, Client @pytest.fixture(scope="session") def event_loop(): """Create event loop for async tests.""" loop = asyncio.new_event_loop() yield loop loop.close() @pytest.fixture async def test_server(): """Create test server with common tools.""" server = FastMCP("Test Server") @server.tool() async def test_tool(message: str) -> str: return f"Test: {message}" return server @pytest.fixture async def mock_external_api(): """Mock external API responses.""" mock = AsyncMock() mock.get.return_value.__aenter__.return_value.json.return_value = { "status": "success", "data": "mocked_response" } return mock # Test categories class TestToolFunctionality: """Tests for basic tool functionality.""" async def test_basic_tool_call(self, test_server): async with Client(test_server) as client: result = await client.call_tool("test_tool", {"message": "hello"}) assert result[0].text == "Test: hello" async def test_invalid_parameters(self, test_server): async with Client(test_server) as client: with pytest.raises(Exception): await client.call_tool("test_tool", {"wrong_param": "value"}) class TestErrorHandling: """Tests for error handling.""" async def test_tool_error_propagation(self): server = FastMCP("Error Test Server") @server.tool() async def failing_tool() -> str: raise ValueError("Test error") async with Client(server) as client: with pytest.raises(Exception): await client.call_tool("failing_tool", {}) class TestPerformance: """Performance and load tests.""" async def test_concurrent_requests(self, test_server): async with Client(test_server) as client: tasks = [ client.call_tool("test_tool", {"message": f"msg_{i}"}) for i in range(10) ] results = await asyncio.gather(*tasks) assert len(results) == 10 ``` ## Documentation Standards ### Code Documentation ```python # src/tools/example_tools.py from typing import List, Dict, Any, Optional from fastmcp import FastMCP, Context @mcp.tool() async def comprehensive_example_tool( required_param: str, optional_param: Optional[int] = None, list_param: List[str] = None, ctx: Context = None ) -> Dict[str, Any]: """ Comprehensive example tool demonstrating best practices. This tool serves as an example of proper documentation, error handling, validation, and response formatting for FastMCP tools. Args: required_param: A required string parameter for the operation optional_param: An optional integer parameter (default: None) list_param: Optional list of strings to process (default: empty list) ctx: MCP context for authentication and user information Returns: Dict containing: - status: Operation status ("success" or "error") - data: Processed data based on input parameters - metadata: Additional information about the operation Raises: ValidationError: When input parameters are invalid AuthenticationError: When user lacks required permissions ExternalAPIError: When external service calls fail Example: >>> await comprehensive_example_tool( ... required_param="test_value", ... optional_param=42, ... list_param=["item1", "item2"] ... ) { "status": "success", "data": { "processed_param": "test_value", "calculated_value": 84, "processed_items": ["ITEM1", "ITEM2"] }, "metadata": { "user_id": "user123", "processing_time": 0.023, "items_processed": 2 } } """ # Implementation here... ``` ## Next Steps - **Authentication**: Review [authentication.md](authentication.md) for security implementation - **Deployment**: See [deployment.md](deployment.md) for production deployment - **Testing**: Check [testing.md](testing.md) for comprehensive testing strategies - **Troubleshooting**: Common issues and solutions in your production environment

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chadkunsman/netbox_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server