Skip to main content
Glama

Security MCP Server

by nordeim
MCP_Server_Design_Documents_draft.md78.5 kB
# MCP Server Architecture Design Document **Version:** 2.0 **Last Updated:** 2024 **Status:** Production-Ready ## Table of Contents 1. [Executive Summary](#executive-summary) 2. [Architecture Overview](#architecture-overview) 3. [Core Components](#core-components) 4. [Design Patterns & Principles](#design-patterns--principles) 5. [Security Architecture](#security-architecture) 6. [Reliability & Resilience](#reliability--resilience) 7. [Transport Layer](#transport-layer) 8. [Data Flow](#data-flow) 9. [Configuration System](#configuration-system) 10. [Monitoring & Observability](#monitoring--observability) 11. [Extension Points](#extension-points) 12. [Deployment Considerations](#deployment-considerations) --- ## Executive Summary The MCP (Model Context Protocol) Server is a production-ready, extensible framework for building secure, monitored network diagnostic tools accessible via dual transport mechanisms (stdio and HTTP). The architecture emphasizes: - **Security-first design** with multi-layered validation and sandboxing - **Resilience** through circuit breakers, rate limiting, and graceful degradation - **Observability** via comprehensive metrics and health monitoring - **Extensibility** through abstract base classes and plugin-style tool discovery - **Production readiness** with resource limits, timeout controls, and error recovery The system is designed to be integrated with AI assistants (like Claude Desktop via stdio) or accessed programmatically via HTTP APIs. --- ## Architecture Overview ### High-Level Architecture ┌─────────────────────────────────────────────────────────────┐ │ Client Layer │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ Claude Desktop│ │ HTTP Clients │ │ │ │ (stdio) │ │ (REST API) │ │ │ └──────┬───────┘ └───────┬──────┘ │ └─────────┼──────────────────────────────┼───────────────────┘ │ │ │ │ ┌─────────▼───────────────────────────────▼───────────────────┐ │ Transport Layer │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ stdio_server │ │ FastAPI │ │ │ │ (MCP SDK) │ │ (HTTP/SSE) │ │ │ └──────┬───────┘ └───────┬──────┘ │ └─────────┼──────────────────────────────┼───────────────────┘ │ │ │ ┌─────────────────────┘ │ │ ┌─────────▼─────────▼─────────────────────────────────────────┐ │ EnhancedMCPServer (Orchestrator) │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ • Tool Registry (enable/disable/discovery) │ │ │ │ • Health Manager (monitoring) │ │ │ │ • Metrics Manager (Prometheus/JSON) │ │ │ │ • Rate Limiter (token bucket) │ │ │ │ • Shutdown Coordinator (graceful cleanup) │ │ │ └──────────────────────────────────────────────────────┘ │ └──────────────────────────┬───────────────────────────────────┘ │ ┌──────────────────────────▼───────────────────────────────────┐ │ Tool Layer │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ NmapTool │ │ TracertTool│ │ CustomTool │ ... │ │ │ │ │ │ │ │ │ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │ │ │ │ │ │ └───────────────┴───────────────┘ │ │ │ │ │ ┌───────────▼──────────────┐ │ │ │ MCPBaseTool (Abstract) │ │ │ │ • Input Validation │ │ │ │ • Circuit Breaker │ │ │ │ • Metrics Collection │ │ │ │ • Resource Limiting │ │ │ │ • Error Handling │ │ │ │ • Concurrency Control │ │ │ └──────────────────────────┘ │ └───────────────────────────────────────────────────────────────┘ text ### Layer Responsibilities #### 1. **Client Layer** - Claude Desktop: Uses stdio transport for seamless AI integration - HTTP Clients: RESTful API for programmatic access, web UIs, monitoring #### 2. **Transport Layer** - **stdio_server**: MCP SDK-based bidirectional JSON-RPC over stdin/stdout - **FastAPI**: HTTP endpoints with SSE (Server-Sent Events) for real-time updates #### 3. **Orchestrator Layer (EnhancedMCPServer)** - Tool lifecycle management (discovery, registration, enable/disable) - Cross-cutting concerns (health, metrics, rate limiting) - Transport abstraction and routing - Graceful shutdown coordination #### 4. **Tool Layer** - Concrete implementations (NmapTool, TracertTool, etc.) - Inherit security, resilience, and observability from base class - Focus on tool-specific logic and validation --- ## Core Components ### 1. MCPBaseTool (Abstract Base Class) **Location:** `mcp_server/base_tool.py` **Purpose:** Foundation for all tools, providing production-ready infrastructure. **Key Responsibilities:** - **Input/Output Validation** via Pydantic models (v1/v2 compatible) - **Security Enforcement**: Command sanitization, argument whitelisting, metacharacter blocking - **Resource Management**: CPU, memory, file descriptor limits (Unix/Linux) - **Concurrency Control**: Per-tool semaphores with automatic cleanup - **Error Handling**: Typed errors with recovery suggestions and context - **Execution Pipeline**: Async subprocess spawning with timeout and truncation - **Metrics Integration**: Optional Prometheus metrics collection - **Circuit Breaker**: Optional failure threshold protection **State Machine:** Input Received → Validation → Semaphore Acquire → Circuit Breaker Check → Command Resolution → Argument Sanitization → Resource Limit Setup → Subprocess Spawn → Timeout Monitor → Output Capture → Metrics Recording → Error Handling → Cleanup → Output Return text **Critical Features:** 1. **Pydantic Compatibility Layer** ```python # Supports both Pydantic v1 and v2 if _PD_V2: @field_validator("target", mode='after') else: @field_validator("target") Thread-Safe Semaphore Registry Python # Per-event-loop semaphore with weak references for cleanup _semaphore_registry: Dict[str, asyncio.Semaphore] _loop_refs: weakref.WeakValueDictionary Security Validation Python # Target must be RFC1918 or .lab.internal _is_private_or_lab(value: str) -> bool # Block shell metacharacters _DENY_CHARS = re.compile(r"[;&|`$><\n\r]") Resource Limits (Unix/Linux only) Python resource.setrlimit(resource.RLIMIT_CPU, ...) resource.setrlimit(resource.RLIMIT_AS, ...) # Memory resource.setrlimit(resource.RLIMIT_NOFILE, ...) # FDs resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) # No core dumps Extension Points: Subclass must define: command_name, optionally allowed_flags Override _execute_tool() for custom validation/optimization Override get_tool_info() for tool-specific metadata 2. EnhancedMCPServer (Orchestrator) Location: mcp_server/server.py Purpose: Central coordinator managing tools, transports, and cross-cutting concerns. Key Responsibilities: Tool discovery via package scanning (pkgutil) Tool registry with enable/disable functionality Transport abstraction (stdio vs HTTP) Health monitoring via HealthCheckManager Metrics aggregation via MetricsManager Rate limiting (token bucket algorithm) Graceful shutdown with background task cleanup Signal handling (SIGINT, SIGTERM) Architecture Patterns: Tool Discovery Pattern Python # Scan package for MCPBaseTool subclasses _load_tools_from_package(package_path, include, exclude) # Pattern-based exclusion EXCLUDED_PREFIXES = {'Test', 'Mock', 'Abstract', '_', 'Example'} EXCLUDED_SUFFIXES = {'Base', 'Mixin', 'Interface'} Registry Pattern Python class ToolRegistry: tools: Dict[str, MCPBaseTool] # All registered tools enabled_tools: Set[str] # Currently enabled subset Health Check Integration Python # Per-tool health checks HealthCheckManager.register_check( name=f"tool_{tool_name}", check_func=self._create_tool_health_check(tool), priority=HealthCheckPriority.INFORMATIONAL ) Background Task Management Python _background_tasks: Set[asyncio.Task] # Auto-cleanup on task completion task.add_done_callback(self._background_tasks.discard) Dual Transport Support: stdio Transport: For Claude Desktop integration Uses MCP SDK's stdio_server() context manager JSON-RPC over stdin/stdout Graceful shutdown via shutdown_event HTTP Transport: For programmatic/web access FastAPI with CORS middleware RESTful endpoints: /tools, /health, /metrics SSE endpoint /events for real-time updates Rate limiting per client IP + tool combination 3. ToolRegistry Purpose: Centralized tool management with lifecycle control. Features: Tool registration from discovery process Enable/disable without restart Filter-based inclusion/exclusion (env vars) Tool information aggregation Metrics/circuit breaker initialization per tool API: Python registry.get_tool(tool_name) -> Optional[MCPBaseTool] registry.get_enabled_tools() -> Dict[str, MCPBaseTool] registry.enable_tool(tool_name) registry.disable_tool(tool_name) registry.get_tool_info() -> List[Dict[str, Any]] 4. RateLimiter Algorithm: Token bucket with per-client tracking Features: Configurable rate (requests per time window) Automatic cleanup of stale clients Thread-safe operation (asyncio.Lock) Client limit to prevent memory exhaustion Implementation: Python # Token bucket: clients start with full allowance allowance: Dict[str, float] = defaultdict(lambda: rate) # Tokens regenerate over time allowance[key] += time_passed * (rate / per) # Request consumes a token if allowance[key] < 1.0: return False # Rate limited allowance[key] -= 1.0 Configuration: Python RateLimiter(rate=10, per=60.0, max_clients=1000) # 10 requests per 60 seconds, track up to 1000 clients Design Patterns & Principles 1. Template Method Pattern MCPBaseTool.run() orchestrates execution while allowing subclass customization: Python async def run(self, inp: ToolInput) -> ToolOutput: # Template method with hooks: # 1. Circuit breaker check # 2. Semaphore acquire # 3. _execute_tool() [customizable] # 4. Metrics recording # 5. Cleanup 2. Strategy Pattern Transport strategies (stdio vs HTTP) are swapped at runtime: Python if transport == "stdio": await self.run_stdio_original() elif transport == "http": await self.run_http_enhanced() 3. Registry Pattern ToolRegistry centralizes tool management with runtime enable/disable. 4. Circuit Breaker Pattern Optional per-tool circuit breakers prevent cascading failures: Python States: CLOSED → OPEN (failures exceed threshold) → HALF_OPEN → CLOSED 5. Fallback Pattern Graceful degradation when optional dependencies missing: Python if not FASTAPI_AVAILABLE: # Fallback to stdio if MCP available # Or raise clear error with installation instructions 6. Observer Pattern Health checks and metrics are observers of tool execution events. 7. Immutability for Safety Python BASE_ALLOWED_FLAGS: Tuple[str, ...] # Immutable allowed_flags property returns new list each time Security Architecture Multi-Layer Defense Layer 1: Input Validation (Pydantic Models) Python class ToolInput(BaseModel): target: str # Validated by _is_private_or_lab() extra_args: str # Length and character class validation Layer 2: Target Restriction Python _is_private_or_lab(value: str): # RFC1918 private IPs: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16 # CIDR networks with same restrictions # .lab.internal hostnames with RFC-compliant format Layer 3: Argument Sanitization Python # Block shell metacharacters _DENY_CHARS = re.compile(r"[;&|`$><\n\r]") # Whitelist allowed tokens _TOKEN_ALLOWED = re.compile(r"^[A-Za-z0-9.:/=+,\-@%_]+$") # Flag whitelisting per tool allowed_flags: Optional[Sequence[str]] Layer 4: Command Resolution Python # Only use shutil.which() - no shell execution resolved_cmd = shutil.which(self.command_name) Layer 5: Resource Sandboxing Python # Unix resource limits RLIMIT_CPU, RLIMIT_AS (memory), RLIMIT_NOFILE, RLIMIT_CORE # Process isolation start_new_session=True # Separate process group Layer 6: Policy Enforcement (Tool-Specific) Python # NmapTool example allow_intrusive: bool # Gates -A flag and vuln scripts _validate_and_filter_scripts() # Category-based filtering Security Principles Least Privilege: Tools run with minimal permissions, resource limits Defense in Depth: Multiple validation layers Fail Secure: Errors block execution, don't bypass checks Whitelist > Blacklist: Explicitly allowed flags/targets only Immutable Defaults: Base configurations are constants Audit Trail: Comprehensive logging of security events Reliability & Resilience Circuit Breaker Integration Purpose: Prevent cascading failures from repeated tool failures Configuration: Python circuit_breaker_failure_threshold: int = 5 circuit_breaker_recovery_timeout: float = 120.0 circuit_breaker_expected_exception: tuple = (Exception,) State Machine: text CLOSED (normal) → [5 failures] → OPEN (fail fast) → [120s timeout] → HALF_OPEN (test) → [success] → CLOSED → [failure] → OPEN Error Handling: Python if circuit_breaker.state == OPEN: return ToolOutput(error_type=ToolErrorType.CIRCUIT_BREAKER_OPEN) Timeout Management Multi-Level Timeouts: Tool Default: default_timeout_sec (e.g., 600s for nmap) Input Override: ToolInput.timeout_sec Global Max: Environment variable MCP_DEFAULT_TIMEOUT_SEC Enforcement: Python await asyncio.wait_for(proc.communicate(), timeout=timeout_sec) # On timeout: SIGKILL entire process group os.killpg(os.getpgid(proc.pid), signal.SIGKILL) Concurrency Control Per-Tool Semaphores: Python concurrency: ClassVar[int] = 2 # Max 2 concurrent executions async with self._ensure_semaphore(): # Execute tool Automatic Cleanup: Python # Weak references to event loops _loop_refs: weakref.WeakValueDictionary # Clean dead loop semaphores dead_keys = [k for k in registry if loop_id not in _loop_refs] Graceful Shutdown Shutdown Sequence: Signal Handler sets shutdown_event Health Manager stops monitoring Metrics Manager performs cleanup Background Tasks cancelled with gather(return_exceptions=True) Circuit Breakers cleanup (if implemented) Server waits for grace period before force-stop Python async def cleanup(self): await self.health_manager.stop_monitoring() await self.metrics_manager.cleanup() # Cancel background tasks for task in self._background_tasks: if not task.done(): task.cancel() await asyncio.gather(*tasks, return_exceptions=True) Error Recovery Typed Errors with Context: Python class ToolErrorType(Enum): TIMEOUT, NOT_FOUND, VALIDATION_ERROR, EXECUTION_ERROR, RESOURCE_EXHAUSTED, CIRCUIT_BREAKER_OPEN, UNKNOWN class ErrorContext: error_type: ToolErrorType message: str recovery_suggestion: str # Actionable guidance metadata: Dict[str, Any] Example Error: Python ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message="Network range too large: 4096 addresses (max: 1024)", recovery_suggestion="Use /22 or smaller prefix (max 1024 hosts)", metadata={ "suggested_cidr": "/22", "example": "192.168.0.0/22" } ) Transport Layer stdio Transport (MCP Protocol) Use Case: Claude Desktop integration Protocol: JSON-RPC 2.0 over stdin/stdout Registration: Python server.register_tool( name="NmapTool", description="Network scanner", input_schema={...}, # JSON Schema handler=async_handler_function ) Execution Flow: text Claude → JSON-RPC Request → stdio_server → handler → MCPBaseTool.run() → TextContent Response → Claude Response Format: Python [TextContent(type="text", text=json.dumps(result.dict()))] HTTP Transport (FastAPI) Use Case: API access, monitoring, web UIs Endpoints: Endpoint Method Purpose / GET Server info, available endpoints /health GET Health checks (200/207/503) /tools GET List tools with metadata /tools/{name}/execute POST Execute tool (rate limited) /tools/{name}/enable POST Enable tool /tools/{name}/disable POST Disable tool /metrics GET Prometheus or JSON metrics /events GET SSE for real-time updates /config GET Current config (redacted) Rate Limiting: Python # Per client IP + tool combination rate_limit_key = f"{client_ip}:{tool_name}" if not await rate_limiter.check_rate_limit(rate_limit_key): raise HTTPException(status_code=429, detail={...}) SSE Events: Python # Real-time health and metrics async def event_generator(): while not disconnected: yield {"type": "health", "data": {...}} yield {"type": "metrics", "data": {...}} await asyncio.sleep(5) Data Flow Typical Execution Flow text 1. CLIENT REQUEST ├─ stdio: JSON-RPC over stdin └─ HTTP: POST /tools/NmapTool/execute 2. TRANSPORT LAYER ├─ stdio: MCP handler → ToolInput └─ HTTP: FastAPI endpoint → Rate Limit → ToolInput 3. ORCHESTRATOR (EnhancedMCPServer) ├─ Tool lookup in registry ├─ Check if enabled └─ Route to tool instance 4. TOOL (e.g., NmapTool) ├─ MCPBaseTool.run(inp) │ ├─ Increment active execution counter │ ├─ Check circuit breaker state │ ├─ Acquire semaphore (concurrency control) │ └─ Call _execute_tool(inp) │ ├─ NmapTool._execute_tool(inp) │ ├─ Validate nmap-specific requirements │ ├─ Parse and validate arguments │ ├─ Optimize arguments (add smart defaults) │ └─ Call super()._execute_tool(enhanced_input) │ └─ MCPBaseTool._execute_tool(enhanced_input) ├─ Resolve command (shutil.which) ├─ Parse and sanitize arguments └─ Call _spawn(cmd, timeout) 5. SUBPROCESS EXECUTION ├─ Set resource limits (Unix) ├─ Create subprocess (start_new_session=True) ├─ Monitor with timeout ├─ Capture stdout/stderr └─ Truncate if exceeds limits 6. RESULT PROCESSING ├─ Parse output (tool-specific, e.g., nmap parser) ├─ Record metrics (success, duration, error type) ├─ Update circuit breaker state └─ Create ToolOutput with metadata 7. RESPONSE ├─ stdio: TextContent with JSON └─ HTTP: JSONResponse with ToolOutput.dict() 8. CLEANUP ├─ Decrement active execution counter ├─ Release semaphore └─ Log execution summary Configuration System Configuration Sources (Priority Order) Environment Variables (highest priority) Configuration File (MCP_CONFIG_FILE) Code Defaults (lowest priority) Key Configuration Objects Assumed structure based on usage: Python class SecurityConfig: allow_intrusive: bool = False class ToolConfig: default_timeout: float = 300.0 default_concurrency: int = 2 class CircuitBreakerConfig: failure_threshold: int = 5 recovery_timeout: float = 60.0 class ServerConfig: host: str = "0.0.0.0" port: int = 8080 class Config: security: SecurityConfig tool: ToolConfig circuit_breaker: CircuitBreakerConfig server: ServerConfig def to_dict(self, redact_sensitive=False) -> Dict Environment Variables Variable Default Purpose MCP_SERVER_TRANSPORT stdio stdio or http MCP_SERVER_PORT 8080 HTTP server port MCP_SERVER_HOST 0.0.0.0 HTTP server host MCP_CONFIG_FILE - Path to config file TOOLS_PACKAGE mcp_server.tools Package to scan TOOL_INCLUDE - CSV of tools to include TOOL_EXCLUDE - CSV of tools to exclude LOG_LEVEL INFO Logging level MCP_MAX_ARGS_LEN 2048 Max argument length MCP_MAX_STDOUT_BYTES 1048576 Max stdout (1MB) MCP_MAX_STDERR_BYTES 262144 Max stderr (256KB) MCP_DEFAULT_TIMEOUT_SEC 300 Default timeout MCP_DEFAULT_CONCURRENCY 2 Default concurrency MCP_MAX_MEMORY_MB 512 Memory limit MCP_MAX_FILE_DESCRIPTORS 256 FD limit Configuration Application Safe Clamping Pattern: Python def _apply_config(self): # Clamp values to safe ranges self.timeout = max(60.0, min(3600.0, config.timeout)) self.concurrency = max(1, min(5, config.concurrency)) # Log when clamped if clamped: log.info("config_clamped param=%s original=%s new=%s") Monitoring & Observability Health Monitoring Architecture: Python HealthCheckManager ├─ system_health_check (CPU, memory, disk) ├─ tool_availability_check (commands in PATH) └─ per_tool_health_checks (circuit breaker state) Health States: Python class HealthStatus(Enum): HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" Priority Levels: Python class HealthCheckPriority(Enum): CRITICAL = 1 # System-level HIGH = 2 # Important tools MEDIUM = 3 # Optional tools LOW = 4 INFORMATIONAL = 5 # Metrics, stats HTTP Response Codes: 200: All HEALTHY 207: Some DEGRADED (multi-status) 503: Any UNHEALTHY Metrics Collection Prometheus Metrics: Python mcp_tool_execution_total{tool="NmapTool",status="success"} mcp_tool_execution_duration_seconds{tool="NmapTool"} mcp_tool_active_executions{tool="NmapTool"} mcp_tool_timeouts_total{tool="NmapTool"} mcp_circuit_breaker_state{tool="NmapTool",state="open"} Fallback JSON Metrics: Python { "system": {"cpu": ..., "memory": ...}, "tools": { "NmapTool": { "executions": 150, "successes": 145, "failures": 5, "avg_duration": 12.3, "active": 2 } } } Logging Strategy Structured Logging: Python log.info("tool.start command=%s timeout=%.1f", cmd, timeout) log.error("tool.error tool=%s error_type=%s", name, error_type) Log Levels: DEBUG: Configuration, cache operations, internal state INFO: Execution lifecycle, optimizations, state changes WARNING: Non-fatal issues, fallbacks, deprecated usage ERROR: Failures, exceptions, security violations CRITICAL: System-level failures Extension Points Creating a New Tool Minimum Implementation: Python from mcp_server.base_tool import MCPBaseTool, ToolInput, ToolOutput class MyTool(MCPBaseTool): command_name = "mytool" # Required allowed_flags = ["-flag1", "-flag2"] # Optional whitelist # Optional overrides default_timeout_sec = 300.0 concurrency = 2 circuit_breaker_failure_threshold = 5 Custom Validation: Python async def _execute_tool(self, inp: ToolInput, timeout_sec) -> ToolOutput: # 1. Custom validation if not self._my_validation(inp.target): return self._create_error_output(error_context, ...) # 2. Parse/optimize args parsed = self._parse_my_args(inp.extra_args) # 3. Call base implementation enhanced = ToolInput(target=inp.target, extra_args=parsed, ...) return await super()._execute_tool(enhanced, timeout_sec) Tool-Specific Metadata: Python def get_tool_info(self) -> Dict[str, Any]: base_info = super().get_tool_info() base_info.update({ "my_feature": "enabled", "supported_modes": ["fast", "thorough"] }) return base_info Adding Custom Health Checks Python from mcp_server.health import HealthCheck, HealthStatus, HealthCheckPriority class MyHealthCheck(HealthCheck): async def check(self) -> HealthStatus: # Custom logic if condition: return HealthStatus.HEALTHY return HealthStatus.DEGRADED # Register in server server.health_manager.add_health_check( MyHealthCheck(), priority=HealthCheckPriority.HIGH ) Adding Custom Metrics Python # Assumed MetricsManager API metrics_manager.register_counter("my_metric_total") metrics_manager.increment("my_metric_total", labels={"status": "success"}) Deployment Considerations Running the Server stdio Mode (Claude Desktop): Bash # Minimal python -m mcp_server.server # With config MCP_CONFIG_FILE=config.yaml python -m mcp_server.server # With tool filtering TOOL_INCLUDE=NmapTool,TracertTool python -m mcp_server.server HTTP Mode: Bash MCP_SERVER_TRANSPORT=http \ MCP_SERVER_PORT=8080 \ python -m mcp_server.server Docker Deployment Dockerfile considerations: Dockerfile # Install system tools (nmap, traceroute, etc.) RUN apt-get update && apt-get install -y nmap traceroute # Non-root user for security RUN useradd -m -u 1000 mcpuser USER mcpuser # Resource limits via docker-compose services: mcp-server: mem_limit: 512m cpus: 1.0 pids_limit: 100 Kubernetes Deployment Resource Limits: YAML resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" Health Probes: YAML livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 5 periodSeconds: 5 Security Considerations Network Policies: Restrict egress to private networks only RBAC: Limit tool execution permissions Secrets Management: Use env vars or secret managers for sensitive config Image Scanning: Regularly scan for vulnerabilities Log Aggregation: Ship logs to SIEM for security monitoring Rate Limiting: Configure per deployment size TLS: Use reverse proxy (nginx, traefik) for HTTPS in HTTP mode Scaling Considerations Horizontal Scaling (HTTP mode): Stateless design allows multiple replicas Use load balancer with session affinity for SSE Share metrics backend (Prometheus pushgateway) Vertical Scaling: Increase concurrency per tool Adjust resource limits Monitor with /metrics endpoint Performance Tuning: Python # Use uvloop for better async performance import uvloop uvloop.install() # Increase concurrency for fast tools class FastTool(MCPBaseTool): concurrency = 10 # Decrease timeout for quick scans default_timeout_sec = 60.0 Appendix: Component Interaction Matrix Component Depends On Depended By Purpose MCPBaseTool Pydantic, asyncio, resource All tools Base functionality EnhancedMCPServer FastAPI, MCP SDK, ToolRegistry Main entry Orchestration ToolRegistry MCPBaseTool EnhancedMCPServer Tool management RateLimiter asyncio HTTP endpoints Request throttling HealthCheckManager asyncio EnhancedMCPServer Health monitoring MetricsManager prometheus_client Tools, Server Observability NmapTool MCPBaseTool, ipaddress ToolRegistry Network scanning Document End text --- # Document 2: Tool Development Programming Guide ```markdown # MCP Server Tool Development Programming Guide **Version:** 2.0 **Audience:** Developers and AI Coding Agents **Purpose:** Authoritative guide for creating new tools ## Table of Contents 1. [Quick Start](#quick-start) 2. [Tool Anatomy](#tool-anatomy) 3. [Development Workflow](#development-workflow) 4. [Implementation Patterns](#implementation-patterns) 5. [Security Requirements](#security-requirements) 6. [Validation & Sanitization](#validation--sanitization) 7. [Error Handling](#error-handling) 8. [Testing Your Tool](#testing-your-tool) 9. [Configuration Integration](#configuration-integration) 10. [Best Practices](#best-practices) 11. [Common Pitfalls](#common-pitfalls) 12. [Reference Examples](#reference-examples) --- ## Quick Start ### Minimal Tool Implementation **File:** `mcp_server/tools/hello_tool.py` ```python """ HelloTool - Minimal example tool. Production-ready with all security and reliability features inherited from base. """ from mcp_server.base_tool import MCPBaseTool, ToolInput, ToolOutput class HelloTool(MCPBaseTool): """ Echo tool that validates targets and returns greeting. Usage: from mcp_server.tools.hello_tool import HelloTool tool = HelloTool() result = await tool.run(ToolInput(target="192.168.1.1")) """ # REQUIRED: Command name (must be in PATH or /usr/bin, etc.) command_name = "echo" # OPTIONAL: Whitelist allowed flags (security best practice) allowed_flags = ["-n", "-e"] # OPTIONAL: Override defaults default_timeout_sec = 10.0 # Short timeout for fast command concurrency = 5 # Allow more concurrent executions # OPTIONAL: Circuit breaker tuning circuit_breaker_failure_threshold = 3 circuit_breaker_recovery_timeout = 30.0 That's it! This tool is production-ready with: ✅ Target validation (RFC1918 or .lab.internal) ✅ Argument sanitization (shell metacharacter blocking) ✅ Resource limits (CPU, memory, FDs) ✅ Timeout enforcement ✅ Concurrency control ✅ Circuit breaker protection ✅ Metrics collection ✅ Comprehensive error handling Discovery & Registration Place your tool in mcp_server/tools/ directory. The server will: Auto-discover via package scanning Instantiate your class Register with MCP server (stdio) or FastAPI (HTTP) Enable unless filtered by TOOL_INCLUDE/TOOL_EXCLUDE Exclusion Patterns (automatic): Python # Tools with these names are automatically excluded EXCLUDED_PREFIXES = {'Test', 'Mock', 'Abstract', '_', 'Example'} EXCLUDED_SUFFIXES = {'Base', 'Mixin', 'Interface'} EXCLUDED_EXACT = {'MCPBaseTool'} # Examples: TestNmapTool # ❌ Excluded (prefix) NmapToolBase # ❌ Excluded (suffix) _InternalTool # ❌ Excluded (prefix) NmapTool # ✅ Included Testing Your Tool Python import asyncio from mcp_server.tools.hello_tool import HelloTool from mcp_server.base_tool import ToolInput async def test_hello(): tool = HelloTool() # Basic execution result = await tool.run(ToolInput( target="192.168.1.1", extra_args="-n" )) assert result.returncode == 0 assert "192.168.1.1" in result.stdout print(f"✅ Success: {result.stdout}") # Run test asyncio.run(test_hello()) Tool Anatomy Required Attributes Python class MyTool(MCPBaseTool): # ⚠️ REQUIRED: Command to execute command_name: ClassVar[str] = "mytool" # Must be available via shutil.which() # Examples: "nmap", "ping", "traceroute", "dig" Optional Attributes Python class MyTool(MCPBaseTool): # Security: Whitelist allowed flags (HIGHLY RECOMMENDED) allowed_flags: ClassVar[Optional[Sequence[str]]] = [ "-v", "-vv", # Verbosity "-o", "--output", # Output control "-t", "--timeout" # Timeout ] # Performance: Concurrency limit concurrency: ClassVar[int] = 2 # Max parallel executions # Performance: Default timeout default_timeout_sec: ClassVar[float] = 300.0 # Reliability: Circuit breaker settings circuit_breaker_failure_threshold: ClassVar[int] = 5 circuit_breaker_recovery_timeout: ClassVar[float] = 60.0 circuit_breaker_expected_exception: ClassVar[tuple] = (Exception,) Optional Methods Python class MyTool(MCPBaseTool): async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: """ Override for custom validation, parsing, or optimization. MUST call super()._execute_tool() or implement full execution. """ # Custom logic here return await super()._execute_tool(inp, timeout_sec) def get_tool_info(self) -> Dict[str, Any]: """ Override to add tool-specific metadata. """ info = super().get_tool_info() info.update({"custom_field": "value"}) return info def _parse_args(self, extra_args: str) -> Sequence[str]: """ Override for custom argument parsing. MUST still call _sanitize_tokens() for security. """ tokens = shlex.split(extra_args) return self._sanitize_tokens(tokens) Development Workflow Step 1: Define Tool Requirements Planning Checklist: What command does this tool execute? Is the command available on target systems? What flags should be allowed? (security) What's the expected execution time? (timeout) How many concurrent executions are safe? (concurrency) Are there intrusive operations? (circuit breaker) What validation is needed beyond base class? Step 2: Create Tool File Naming Convention: text mcp_server/tools/{command}_tool.py Examples: mcp_server/tools/nmap_tool.py mcp_server/tools/traceroute_tool.py mcp_server/tools/dig_tool.py File Template: Python """ {ToolName} - {Brief description} {Detailed description of what this tool does} Features: - Feature 1 - Feature 2 Safety Controls: - Control 1 - Control 2 Usage: from mcp_server.tools.{module} import {ToolName} tool = {ToolName}() result = await tool.run(ToolInput(target="192.168.1.1")) Configuration: # config.yaml security: allow_intrusive: false # If applicable """ import logging from typing import Optional from mcp_server.base_tool import MCPBaseTool, ToolInput, ToolOutput log = logging.getLogger(__name__) class {ToolName}(MCPBaseTool): """ {One-line description} {Multi-line detailed description if needed} """ command_name = "{command}" allowed_flags = [...] # Define whitelist # Override defaults if needed default_timeout_sec = 300.0 concurrency = 2 Step 3: Implement Custom Logic (if needed) Decision Tree: text Do you need custom validation beyond base class? ├─ YES → Override _execute_tool() └─ NO → Use base class as-is Do you need custom argument parsing? ├─ YES → Override _parse_args() └─ NO → Use base class as-is Do you need tool-specific metadata? ├─ YES → Override get_tool_info() └─ NO → Base metadata sufficient Do you have configuration settings? ├─ YES → Implement _apply_config() └─ NO → Configuration not needed Step 4: Test Locally Python # tests/test_{tool_name}.py import pytest from mcp_server.tools.{module} import {ToolName} from mcp_server.base_tool import ToolInput @pytest.mark.asyncio async def test_{tool_name}_basic(): tool = {ToolName}() result = await tool.run(ToolInput(target="192.168.1.1")) assert result.returncode == 0 assert result.stdout @pytest.mark.asyncio async def test_{tool_name}_validation(): tool = {ToolName}() # Test target validation result = await tool.run(ToolInput(target="8.8.8.8")) # Public IP assert result.error_type == "validation_error" @pytest.mark.asyncio async def test_{tool_name}_timeout(): tool = {ToolName}() result = await tool.run(ToolInput( target="192.168.1.1", timeout_sec=0.001 # Force timeout )) assert result.timed_out == True Step 5: Integration Testing Bash # Start server with your tool TOOL_INCLUDE={YourTool} python -m mcp_server.server # Test via HTTP (if using HTTP transport) curl -X POST http://localhost:8080/tools/{YourTool}/execute \ -H "Content-Type: application/json" \ -d '{"target": "192.168.1.1", "extra_args": "-v"}' Step 6: Documentation Update your tool's docstring with: Python """ {ToolName} - {Brief description} {Detailed description} Features: - List key features Safety Controls: - List security measures - List validation rules Usage: tool = {ToolName}() result = await tool.run(ToolInput( target="192.168.1.0/24", extra_args="--flag value" )) Allowed Flags: -flag1: Description -flag2: Description Configuration: # Environment variables MY_TOOL_SETTING=value # config.yaml tool: my_setting: value Examples: # Example 1 result = await tool.run(ToolInput(target="192.168.1.1")) # Example 2 result = await tool.run(ToolInput( target="192.168.1.0/24", extra_args="--verbose", timeout_sec=600 )) """ Implementation Patterns Pattern 1: Simple Passthrough (No Custom Logic) When to use: Command is safe, base validation sufficient Python class SimpleTool(MCPBaseTool): command_name = "whoami" allowed_flags = [] # No flags allowed default_timeout_sec = 5.0 concurrency = 10 # Fast, can run many concurrently Pattern 2: Custom Validation When to use: Tool-specific target or argument validation needed Python class CustomValidationTool(MCPBaseTool): command_name = "mytool" allowed_flags = ["-v", "-o"] async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: # Step 1: Custom validation validation_error = self._validate_custom_requirements(inp) if validation_error: return validation_error # Step 2: Call base implementation return await super()._execute_tool(inp, timeout_sec) def _validate_custom_requirements(self, inp: ToolInput) -> Optional[ToolOutput]: """Perform tool-specific validation.""" # Example: Check target format if not inp.target.startswith("192.168."): from mcp_server.base_tool import ErrorContext, ToolErrorType error_ctx = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=f"Target must be in 192.168.0.0/16 range: {inp.target}", recovery_suggestion="Use 192.168.x.x addresses only", timestamp=datetime.now(timezone.utc), tool_name=self.tool_name, target=inp.target, metadata={"allowed_range": "192.168.0.0/16"} ) return self._create_error_output(error_ctx, inp.correlation_id or "") return None # Validation passed Pattern 3: Argument Optimization When to use: Adding smart defaults or optimizing for performance Python class OptimizedTool(MCPBaseTool): command_name = "mytool" allowed_flags = ["-v", "-t", "-p", "--fast", "--slow"] async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: # Parse and optimize arguments optimized_args = self._optimize_args(inp.extra_args or "") # Create enhanced input enhanced_input = ToolInput( target=inp.target, extra_args=optimized_args, timeout_sec=timeout_sec or inp.timeout_sec, correlation_id=inp.correlation_id ) return await super()._execute_tool(enhanced_input, timeout_sec) def _optimize_args(self, extra_args: str) -> str: """Add smart defaults.""" tokens = shlex.split(extra_args) if extra_args else [] optimized = [] # Check what's already specified has_verbosity = any(t == "-v" for t in tokens) has_timeout = any(t == "-t" or t.startswith("-t=") for t in tokens) # Add defaults if not has_verbosity: optimized.append("-v") if not has_timeout: optimized.extend(["-t", "30"]) # Append original args optimized.extend(tokens) return " ".join(optimized) Pattern 4: Configuration-Driven Behavior When to use: Tool behavior changes based on configuration Python class ConfigurableTool(MCPBaseTool): command_name = "mytool" # Dynamic allowed_flags based on config BASE_FLAGS = ["-v", "-o"] def __init__(self): super().__init__() self.allow_advanced = False self._apply_config() def _apply_config(self): """Apply configuration with safe defaults.""" from mcp_server.config import get_config config = get_config() # Check security settings if hasattr(config, 'security') and config.security: self.allow_advanced = getattr( config.security, 'allow_advanced_features', False ) # Apply tool-specific config if hasattr(config, 'tool') and config.tool: self.default_timeout_sec = max( 60.0, min(3600.0, getattr(config.tool, 'default_timeout', 300.0)) ) @property def allowed_flags(self) -> List[str]: """Dynamic flags based on configuration.""" flags = list(self.BASE_FLAGS) if self.allow_advanced: flags.extend(["--advanced-flag1", "--advanced-flag2"]) return flags Pattern 5: Complex Argument Parsing When to use: Custom argument syntax or complex validation Python class ComplexArgsTool(MCPBaseTool): command_name = "mytool" allowed_flags = ["-p", "--ports", "-s", "--scripts"] # Compiled regex patterns for performance _PORT_PATTERN = re.compile(r'^[\d,\-]+$') _SCRIPT_PATTERN = re.compile(r'^[a-zA-Z0-9_,\-]+$') async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: # Parse and validate complex arguments try: validated_args = self._parse_and_validate_args(inp.extra_args or "") except ValueError as e: from mcp_server.base_tool import ErrorContext, ToolErrorType error_ctx = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=f"Argument validation failed: {str(e)}", recovery_suggestion="Check argument syntax and allowed values", timestamp=datetime.now(timezone.utc), tool_name=self.tool_name, target=inp.target, metadata={"error": str(e)} ) return self._create_error_output(error_ctx, inp.correlation_id or "") enhanced_input = ToolInput( target=inp.target, extra_args=validated_args, timeout_sec=timeout_sec or inp.timeout_sec, correlation_id=inp.correlation_id ) return await super()._execute_tool(enhanced_input, timeout_sec) def _parse_and_validate_args(self, extra_args: str) -> str: """Parse and validate arguments with custom logic.""" if not extra_args: return "" tokens = shlex.split(extra_args) validated = [] i = 0 while i < len(tokens): token = tokens[i] # Security: Only allow flags if not token.startswith("-"): raise ValueError(f"Non-flag token not allowed: {token}") # Handle port specification if token in ("-p", "--ports"): if i + 1 >= len(tokens): raise ValueError(f"{token} requires a value") port_spec = tokens[i + 1] if not self._validate_ports(port_spec): raise ValueError(f"Invalid port specification: {port_spec}") validated.extend([token, port_spec]) i += 2 continue # Handle script specification if token in ("-s", "--scripts"): if i + 1 >= len(tokens): raise ValueError(f"{token} requires a value") script_spec = tokens[i + 1] if not self._validate_scripts(script_spec): raise ValueError(f"Invalid script specification: {script_spec}") validated.extend([token, script_spec]) i += 2 continue # Handle other flags if token not in self.allowed_flags: raise ValueError(f"Flag not allowed: {token}") validated.append(token) i += 1 return " ".join(validated) def _validate_ports(self, port_spec: str) -> bool: """Validate port specification.""" if not self._PORT_PATTERN.match(port_spec): return False # Parse and validate ranges for part in port_spec.split(','): if '-' in part: start, end = part.split('-') if not (1 <= int(start) <= 65535 and 1 <= int(end) <= 65535): return False else: if not (1 <= int(part) <= 65535): return False return True def _validate_scripts(self, script_spec: str) -> bool: """Validate script specification.""" return bool(self._SCRIPT_PATTERN.match(script_spec)) Pattern 6: Result Parsing When to use: Need structured data from command output Python from dataclasses import dataclass, field from typing import List, Dict, Any @dataclass class ParsedResult: """Structured result.""" raw_output: str items: List[Dict[str, Any]] = field(default_factory=list) summary: Dict[str, Any] = field(default_factory=dict) class ParsingTool(MCPBaseTool): command_name = "mytool" async def run(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: """Execute and enhance output with parsing.""" result = await super().run(inp, timeout_sec) # Parse output if successful if result.is_success(): parsed = self.parse_output(result.stdout) result.ensure_metadata() result.metadata['parsed'] = { 'items': parsed.items, 'summary': parsed.summary } return result def parse_output(self, output: str) -> ParsedResult: """Parse command output into structured data.""" result = ParsedResult(raw_output=output) # Example: Parse line-by-line for line in output.split('\n'): if line.startswith('ITEM:'): # Extract item data item = self._parse_item_line(line) result.items.append(item) # Generate summary result.summary = { 'total_items': len(result.items), 'timestamp': datetime.now(timezone.utc).isoformat() } return result def _parse_item_line(self, line: str) -> Dict[str, Any]: """Parse a single item line.""" # Custom parsing logic parts = line.split(':', 1) return {'type': parts[0], 'data': parts[1] if len(parts) > 1 else ''} Security Requirements Rule 1: Always Use Whitelisting ❌ WRONG (Blacklisting): Python # Don't do this! blocked_flags = ["-x", "--dangerous"] if flag in blocked_flags: raise ValueError(f"Flag not allowed: {flag}") ✅ CORRECT (Whitelisting): Python allowed_flags = ["-v", "-o", "--safe-flag"] if flag not in allowed_flags: raise ValueError(f"Flag not allowed: {flag}") Rule 2: Block Non-Flag Tokens ❌ WRONG: Python # Allowing arbitrary tokens is dangerous if token.isalnum(): validated.append(token) ✅ CORRECT: Python # Only allow tokens starting with '-' if not token.startswith("-"): raise ValueError(f"Non-flag token not allowed: {token}") Rule 3: Never Use Shell Execution ❌ WRONG: Python # NEVER use shell=True subprocess.run(f"command {args}", shell=True) ✅ CORRECT: Python # Always use argument list with shell=False (default) await asyncio.create_subprocess_exec( command, *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) Rule 4: Validate All Input Values ❌ WRONG: Python # Trusting user input port = tokens[i + 1] validated.extend(["-p", port]) ✅ CORRECT: Python # Validate before accepting port = tokens[i + 1] if not self._validate_port(port): raise ValueError(f"Invalid port: {port}") validated.extend(["-p", port]) Rule 5: Use Compiled Patterns for Performance ❌ SLOW: Python # Recompiling regex every time if re.match(r'^\d+$', value): ... ✅ FAST: Python # Compile once, use many times class MyTool(MCPBaseTool): _NUMERIC_PATTERN = re.compile(r'^\d+$') def _validate(self, value): return bool(self._NUMERIC_PATTERN.match(value)) Rule 6: Gate Intrusive Operations When allowing dangerous operations: Python class IntrusiveTool(MCPBaseTool): command_name = "mytool" BASE_FLAGS = ["-v", "--safe-scan"] def __init__(self): super().__init__() self.allow_intrusive = False self._apply_config() def _apply_config(self): from mcp_server.config import get_config config = get_config() if hasattr(config, 'security') and config.security: self.allow_intrusive = getattr( config.security, 'allow_intrusive', False ) @property def allowed_flags(self) -> List[str]: flags = list(self.BASE_FLAGS) if self.allow_intrusive: flags.append("--intrusive-flag") log.warning("intrusive_operations_enabled tool=%s", self.tool_name) return flags Validation & Sanitization Target Validation Already handled by base class: Python # Base class validates: # - RFC1918 private IPs: 10.x.x.x, 172.16-31.x.x, 192.168.x.x # - Private networks (CIDR): 192.168.1.0/24 # - .lab.internal hostnames with RFC-compliant format Additional validation: Python def _validate_custom_target(self, target: str) -> bool: """Add tool-specific target validation.""" # Example: Only allow specific subnet if not target.startswith("192.168.1."): return False # Example: Validate hostname format if target.endswith(".lab.internal"): hostname = target[:-len(".lab.internal")] if len(hostname) > 63: # DNS label limit return False return True Argument Sanitization Base class provides: Python _DENY_CHARS = re.compile(r"[;&|`$><\n\r]") # Shell metacharacters _TOKEN_ALLOWED = re.compile(r"^[A-Za-z0-9.:/=+,\-@%_]+$") Custom sanitization: Python def _sanitize_custom_value(self, value: str) -> str: """Sanitize tool-specific values.""" # Remove/escape dangerous characters value = value.strip() # Validate length if len(value) > 255: raise ValueError("Value too long") # Validate character set if not re.match(r'^[a-zA-Z0-9_\-]+$', value): raise ValueError("Invalid characters in value") return value Size Limits Enforce limits on collections: Python class NetworkTool(MCPBaseTool): MAX_TARGETS = 100 MAX_PORT_RANGES = 50 MAX_NETWORK_SIZE = 1024 # Max hosts in CIDR def _validate_network_size(self, target: str) -> Optional[ToolOutput]: """Validate network isn't too large.""" if "/" in target: import ipaddress network = ipaddress.ip_network(target, strict=False) if network.num_addresses > self.MAX_NETWORK_SIZE: max_cidr = 32 - math.ceil(math.log2(self.MAX_NETWORK_SIZE)) from mcp_server.base_tool import ErrorContext, ToolErrorType error_ctx = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=f"Network too large: {network.num_addresses} hosts (max: {self.MAX_NETWORK_SIZE})", recovery_suggestion=f"Use /{max_cidr} or smaller prefix", timestamp=datetime.now(timezone.utc), tool_name=self.tool_name, target=target, metadata={ "network_size": network.num_addresses, "max_allowed": self.MAX_NETWORK_SIZE, "suggested_cidr": f"/{max_cidr}" } ) return self._create_error_output(error_ctx, "") return None # Validation passed Error Handling Creating Informative Errors Use ErrorContext for rich error information: Python from mcp_server.base_tool import ErrorContext, ToolErrorType from datetime import datetime, timezone def _create_validation_error(self, message: str, inp: ToolInput, **metadata) -> ToolOutput: """Helper to create validation error.""" error_ctx = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=message, recovery_suggestion=self._get_recovery_suggestion(message), timestamp=datetime.now(timezone.utc), tool_name=self.tool_name, target=inp.target, metadata=metadata ) return self._create_error_output(error_ctx, inp.correlation_id or "") def _get_recovery_suggestion(self, message: str) -> str: """Generate helpful recovery suggestion.""" if "network too large" in message.lower(): return "Use smaller CIDR prefix or split into multiple scans" elif "invalid port" in message.lower(): return "Use port numbers 1-65535, ranges (80-443), or lists (80,443,8080)" else: return "Check input parameters and try again" Error Types Use appropriate error types: Python from mcp_server.base_tool import ToolErrorType # Validation errors (input problems) ToolErrorType.VALIDATION_ERROR # Command not found ToolErrorType.NOT_FOUND # Execution failures ToolErrorType.EXECUTION_ERROR # Timeout ToolErrorType.TIMEOUT # Resource limits exceeded ToolErrorType.RESOURCE_EXHAUSTED # Circuit breaker open ToolErrorType.CIRCUIT_BREAKER_OPEN # Unknown/unexpected ToolErrorType.UNKNOWN Logging Errors Structured logging with context: Python import logging log = logging.getLogger(__name__) # Info level for normal execution log.info("tool.executing target=%s args=%s", target, args) # Warning for recoverable issues log.warning("tool.validation_failed target=%s error=%s", target, error) # Error for failures log.error("tool.execution_failed target=%s error=%s returncode=%d", target, error, returncode) # Include extra context log.error("tool.error", extra={ "tool": self.tool_name, "target": target, "error_type": error_type.value, "correlation_id": correlation_id }) Testing Your Tool Unit Tests File: tests/test_{tool_name}.py Python import pytest from mcp_server.tools.mytool import MyTool from mcp_server.base_tool import ToolInput, ToolErrorType class TestMyTool: """Unit tests for MyTool.""" @pytest.fixture def tool(self): """Create tool instance.""" return MyTool() @pytest.mark.asyncio async def test_basic_execution(self, tool): """Test basic successful execution.""" result = await tool.run(ToolInput(target="192.168.1.1")) assert result.returncode == 0 assert result.stdout assert not result.timed_out assert not result.error @pytest.mark.asyncio async def test_with_flags(self, tool): """Test execution with allowed flags.""" result = await tool.run(ToolInput( target="192.168.1.1", extra_args="-v --flag value" )) assert result.returncode == 0 @pytest.mark.asyncio async def test_invalid_target_public_ip(self, tool): """Test rejection of public IP.""" result = await tool.run(ToolInput(target="8.8.8.8")) assert result.error_type == ToolErrorType.VALIDATION_ERROR.value assert "private" in result.stderr.lower() or "rfc1918" in result.stderr.lower() @pytest.mark.asyncio async def test_invalid_target_hostname(self, tool): """Test rejection of non-.lab.internal hostname.""" result = await tool.run(ToolInput(target="google.com")) assert result.error_type == ToolErrorType.VALIDATION_ERROR.value assert "lab.internal" in result.stderr.lower() @pytest.mark.asyncio async def test_invalid_flag(self, tool): """Test rejection of disallowed flag.""" result = await tool.run(ToolInput( target="192.168.1.1", extra_args="--forbidden-flag" )) assert result.error_type == ToolErrorType.VALIDATION_ERROR.value assert "not allowed" in result.stderr.lower() @pytest.mark.asyncio async def test_timeout(self, tool): """Test timeout handling.""" result = await tool.run(ToolInput( target="192.168.1.1", timeout_sec=0.001 # Force timeout )) assert result.timed_out == True assert result.error_type == ToolErrorType.TIMEOUT.value @pytest.mark.asyncio async def test_shell_injection_blocked(self, tool): """Test shell injection prevention.""" result = await tool.run(ToolInput( target="192.168.1.1", extra_args="; rm -rf /" # Attempt injection )) assert result.error_type == ToolErrorType.VALIDATION_ERROR.value assert "metacharacter" in result.stderr.lower() or "forbidden" in result.stderr.lower() @pytest.mark.asyncio async def test_correlation_id_preserved(self, tool): """Test correlation ID is preserved.""" correlation_id = "test-12345" result = await tool.run(ToolInput( target="192.168.1.1", correlation_id=correlation_id )) assert result.correlation_id == correlation_id @pytest.mark.asyncio async def test_metadata_present(self, tool): """Test metadata is populated.""" result = await tool.run(ToolInput(target="192.168.1.1")) assert result.metadata is not None assert isinstance(result.metadata, dict) def test_tool_info(self, tool): """Test get_tool_info returns complete info.""" info = tool.get_tool_info() assert info['name'] == tool.tool_name assert info['command'] == tool.command_name assert 'concurrency' in info assert 'timeout' in info assert 'circuit_breaker' in info def test_command_available(self, tool): """Test command is available in PATH.""" cmd_path = tool._resolve_command() assert cmd_path is not None, f"Command '{tool.command_name}' not found in PATH" Integration Tests Python import pytest from mcp_server.server import EnhancedMCPServer from mcp_server.tools.mytool import MyTool @pytest.mark.integration class TestMyToolIntegration: """Integration tests with full server.""" @pytest.fixture async def server(self): """Create server with tool.""" tools = [MyTool()] server = EnhancedMCPServer(tools=tools, transport="http") yield server await server.cleanup() @pytest.mark.asyncio async def test_tool_registered(self, server): """Test tool is properly registered.""" tool = server.tool_registry.get_tool("MyTool") assert tool is not None assert isinstance(tool, MyTool) @pytest.mark.asyncio async def test_tool_execution_via_registry(self, server): """Test execution through registry.""" tool = server.tool_registry.get_tool("MyTool") from mcp_server.base_tool import ToolInput result = await tool.run(ToolInput(target="192.168.1.1")) assert result.returncode == 0 Performance Tests Python import pytest import asyncio import time @pytest.mark.performance class TestMyToolPerformance: """Performance and load tests.""" @pytest.mark.asyncio async def test_concurrent_execution(self): """Test concurrent execution respects limits.""" tool = MyTool() # Create multiple concurrent requests tasks = [ tool.run(ToolInput(target=f"192.168.1.{i}")) for i in range(10) ] start = time.time() results = await asyncio.gather(*tasks) duration = time.time() - start # All should succeed assert all(r.returncode == 0 for r in results) # Should respect concurrency limit (some serialization expected) # With concurrency=2, 10 tasks should take roughly 5x single execution time print(f"10 concurrent tasks completed in {duration:.2f}s") @pytest.mark.asyncio async def test_memory_usage(self): """Test memory usage stays reasonable.""" import psutil import os process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB tool = MyTool() # Execute many times for i in range(100): await tool.run(ToolInput(target="192.168.1.1")) final_memory = process.memory_info().rss / 1024 / 1024 # MB memory_increase = final_memory - initial_memory # Memory shouldn't grow excessively assert memory_increase < 100, f"Memory increased by {memory_increase:.2f}MB" Configuration Integration Reading Configuration Python from mcp_server.config import get_config class ConfigurableTool(MCPBaseTool): def __init__(self): super().__init__() self._apply_config() def _apply_config(self): """Apply configuration safely.""" config = get_config() # Access security settings if hasattr(config, 'security'): self.allow_intrusive = getattr( config.security, 'allow_intrusive', False ) # Access tool settings if hasattr(config, 'tool'): # Clamp to safe ranges timeout = getattr(config.tool, 'default_timeout', 300.0) self.default_timeout_sec = max(60.0, min(3600.0, float(timeout))) concurrency = getattr(config.tool, 'default_concurrency', 2) self.concurrency = max(1, min(10, int(concurrency))) # Access circuit breaker settings if hasattr(config, 'circuit_breaker'): threshold = getattr(config.circuit_breaker, 'failure_threshold', 5) self.circuit_breaker_failure_threshold = max(1, min(10, int(threshold))) Environment Variables Python import os class EnvConfigTool(MCPBaseTool): def __init__(self): super().__init__() self._load_env_config() def _load_env_config(self): """Load configuration from environment.""" # Tool-specific settings self.enable_feature_x = os.getenv('MYTOOL_FEATURE_X', 'false').lower() == 'true' # Timeout override timeout_env = os.getenv('MYTOOL_TIMEOUT') if timeout_env: try: self.default_timeout_sec = max(60.0, min(3600.0, float(timeout_env))) except ValueError: log.warning("Invalid MYTOOL_TIMEOUT value: %s", timeout_env) # Concurrency override concurrency_env = os.getenv('MYTOOL_CONCURRENCY') if concurrency_env: try: self.concurrency = max(1, min(10, int(concurrency_env))) except ValueError: log.warning("Invalid MYTOOL_CONCURRENCY value: %s", concurrency_env) Configuration Validation Python def validate_configuration(self) -> Dict[str, Any]: """ Validate current configuration. Returns dict with: - valid: bool - issues: List[str] - warnings: List[str] """ issues = [] warnings = [] # Check command availability if not self._resolve_command(): issues.append(f"Command '{self.command_name}' not found in PATH") # Check timeout if self.default_timeout_sec < 60: warnings.append(f"Timeout very low: {self.default_timeout_sec}s") # Check concurrency if self.concurrency > 10: warnings.append(f"High concurrency may impact performance: {self.concurrency}") # Check circuit breaker if hasattr(self, '_circuit_breaker') and self._circuit_breaker: from mcp_server.circuit_breaker import CircuitBreakerState if self._circuit_breaker.state == CircuitBreakerState.OPEN: warnings.append("Circuit breaker is currently OPEN") return { "valid": len(issues) == 0, "issues": issues, "warnings": warnings, "configuration": { "command": self.command_name, "command_available": self._resolve_command() is not None, "timeout": self.default_timeout_sec, "concurrency": self.concurrency, "circuit_breaker_enabled": hasattr(self, '_circuit_breaker') and self._circuit_breaker is not None } } Best Practices 1. Use Type Hints Python from typing import Optional, Dict, Any, List, Sequence class MyTool(MCPBaseTool): async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: ... def _validate_custom(self, value: str) -> bool: ... def get_tool_info(self) -> Dict[str, Any]: ... 2. Comprehensive Documentation Python class MyTool(MCPBaseTool): """ One-line tool description. Detailed multi-line description explaining: - What the tool does - When to use it - Security considerations - Configuration options Usage: tool = MyTool() result = await tool.run(ToolInput(target="192.168.1.1")) Configuration: MYTOOL_SETTING: Description Examples: # Example 1: Basic usage result = await tool.run(ToolInput(target="192.168.1.1")) # Example 2: With options result = await tool.run(ToolInput( target="192.168.1.0/24", extra_args="-v --fast" )) """ def _my_method(self, param: str) -> bool: """ Brief method description. Args: param: Parameter description Returns: Return value description Raises: ValueError: When validation fails """ ... 3. Defensive Programming Python class DefensiveTool(MCPBaseTool): def _safe_parse(self, value: Optional[str]) -> str: """Safely parse value with defaults.""" # Handle None if value is None: return "" # Handle empty value = value.strip() if not value: return "" # Validate length if len(value) > 1000: log.warning("value_truncated original_length=%d", len(value)) value = value[:1000] return value def _safe_get_config(self, config, attr: str, default: Any) -> Any: """Safely get config attribute.""" try: return getattr(config, attr, default) except Exception as e: log.warning("config_access_failed attr=%s error=%s", attr, str(e)) return default 4. Logging Best Practices Python import logging log = logging.getLogger(__name__) class LoggingBestPractices(MCPBaseTool): async def _execute_tool(self, inp: ToolInput, timeout_sec) -> ToolOutput: # Log start of operation log.info("tool.start target=%s args=%s timeout=%.1f correlation_id=%s", inp.target, inp.extra_args, timeout_sec, inp.correlation_id) try: result = await super()._execute_tool(inp, timeout_sec) # Log success log.info("tool.success target=%s returncode=%d duration=%.2f", inp.target, result.returncode, result.execution_time or 0) return result except Exception as e: # Log failure with context log.error("tool.error target=%s error=%s error_type=%s", inp.target, str(e), type(e).__name__, exc_info=True, # Include stack trace extra={ "tool": self.tool_name, "correlation_id": inp.correlation_id, "target": inp.target }) raise 5. Performance Optimization Python class OptimizedTool(MCPBaseTool): # Compile regex patterns once _PATTERN1 = re.compile(r'...') _PATTERN2 = re.compile(r'...') def __init__(self): super().__init__() # Cache expensive computations self._validation_cache: Dict[str, bool] = {} self._max_cache_size = 1000 def _cached_validation(self, value: str) -> bool: """Validate with caching.""" if value in self._validation_cache: return self._validation_cache[value] # Perform validation result = self._expensive_validation(value) # Cache with size limit if len(self._validation_cache) < self._max_cache_size: self._validation_cache[value] = result return result def clear_cache(self): """Clear validation cache (useful for testing).""" self._validation_cache.clear() 6. Graceful Degradation Python class GracefulTool(MCPBaseTool): def __init__(self): super().__init__() # Feature flags for optional functionality self.advanced_parsing_available = self._check_advanced_parsing() def _check_advanced_parsing(self) -> bool: """Check if advanced parsing is available.""" try: import optional_library return True except ImportError: log.info("advanced_parsing_unavailable hint='pip install optional_library'") return False def parse_output(self, output: str) -> Dict[str, Any]: """Parse with fallback.""" if self.advanced_parsing_available: try: return self._advanced_parse(output) except Exception as e: log.warning("advanced_parse_failed fallback=basic error=%s", str(e)) return self._basic_parse(output) else: return self._basic_parse(output) Common Pitfalls Pitfall 1: Not Calling Super Methods ❌ WRONG: Python async def _execute_tool(self, inp: ToolInput, timeout_sec) -> ToolOutput: # Custom logic only - loses base class functionality! return ToolOutput(stdout="result", stderr="", returncode=0) ✅ CORRECT: Python async def _execute_tool(self, inp: ToolInput, timeout_sec) -> ToolOutput: # Custom validation if error := self._validate_custom(inp): return error # Call base implementation for actual execution return await super()._execute_tool(inp, timeout_sec) Pitfall 2: Mutable Default Arguments ❌ WRONG: Python class BadTool(MCPBaseTool): allowed_flags = [] # Mutable! def __init__(self): super().__init__() # This modifies the class attribute! self.allowed_flags.append("-v") ✅ CORRECT: Python class GoodTool(MCPBaseTool): BASE_FLAGS = ["-v", "-o"] # Immutable tuple or class constant @property def allowed_flags(self) -> List[str]: # Return new list each time flags = list(self.BASE_FLAGS) if self.allow_advanced: flags.append("--advanced") return flags Pitfall 3: Not Handling Missing Dependencies ❌ WRONG: Python from optional_library import feature # Crashes if not installed class BadTool(MCPBaseTool): ... ✅ CORRECT: Python try: from optional_library import feature FEATURE_AVAILABLE = True except ImportError: FEATURE_AVAILABLE = False feature = None class GoodTool(MCPBaseTool): def use_feature(self): if not FEATURE_AVAILABLE: log.warning("feature_unavailable hint='pip install optional_library'") return self.fallback_behavior() return feature() Pitfall 4: Blocking the Event Loop ❌ WRONG: Python async def _execute_tool(self, inp: ToolInput, timeout_sec) -> ToolOutput: # CPU-intensive work blocks event loop! result = self._expensive_computation() return result ✅ CORRECT: Python async def _execute_tool(self, inp: ToolInput, timeout_sec) -> ToolOutput: # Run in thread pool to avoid blocking loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, # Use default executor self._expensive_computation ) return result Pitfall 5: Ignoring Cleanup ❌ WRONG: Python class ResourceLeakTool(MCPBaseTool): def __init__(self): super().__init__() self.temp_files = [] async def _execute_tool(self, inp, timeout_sec): # Create temp files but never clean up temp = self._create_temp_file() self.temp_files.append(temp) ... ✅ CORRECT: Python class CleanupTool(MCPBaseTool): def __init__(self): super().__init__() self.temp_files = [] async def _execute_tool(self, inp, timeout_sec): temp = self._create_temp_file() try: # Use temp file result = await super()._execute_tool(inp, timeout_sec) return result finally: # Always cleanup self._cleanup_temp_file(temp) async def cleanup(self): """Called during shutdown.""" for temp in self.temp_files: self._cleanup_temp_file(temp) self.temp_files.clear() Pitfall 6: Not Validating Configuration Values ❌ WRONG: Python def _apply_config(self): config = get_config() # Trusting config values - could be negative, huge, etc. self.timeout = config.tool.timeout self.concurrency = config.tool.concurrency ✅ CORRECT: Python def _apply_config(self): config = get_config() # Clamp to safe ranges timeout = getattr(config.tool, 'timeout', 300.0) self.timeout = max(60.0, min(3600.0, float(timeout))) concurrency = getattr(config.tool, 'concurrency', 2) self.concurrency = max(1, min(10, int(concurrency))) # Log if clamped if self.timeout != timeout: log.warning("config_clamped param=timeout original=%.1f clamped=%.1f", timeout, self.timeout) Reference Examples Example 1: Simple Tool (Ping) Python """PingTool - ICMP echo request tool.""" from mcp_server.base_tool import MCPBaseTool class PingTool(MCPBaseTool): """ Ping tool for basic connectivity checks. Usage: tool = PingTool() result = await tool.run(ToolInput( target="192.168.1.1", extra_args="-c 4" # 4 packets )) """ command_name = "ping" allowed_flags = ["-c", "-W", "-i", "-s"] # count, timeout, interval, packet size default_timeout_sec = 30.0 concurrency = 5 Example 2: Tool with Custom Validation (Traceroute) Python """TracerouteTool - Network path tracing.""" import re from typing import Optional from mcp_server.base_tool import MCPBaseTool, ToolInput, ToolOutput, ErrorContext, ToolErrorType from datetime import datetime, timezone class TracerouteTool(MCPBaseTool): """Trace network path to target.""" command_name = "traceroute" allowed_flags = ["-m", "-q", "-w", "-n"] # max-hops, queries, wait, numeric default_timeout_sec = 120.0 concurrency = 2 MAX_HOPS = 30 _HOP_PATTERN = re.compile(r'^\d+$') async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: # Validate hop count if specified if inp.extra_args and "-m" in inp.extra_args: try: tokens = inp.extra_args.split() if "-m" in tokens: idx = tokens.index("-m") if idx + 1 < len(tokens): hops = tokens[idx + 1] if not self._validate_hops(hops): error_ctx = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=f"Invalid hop count: {hops} (max: {self.MAX_HOPS})", recovery_suggestion=f"Use hop count 1-{self.MAX_HOPS}", timestamp=datetime.now(timezone.utc), tool_name=self.tool_name, target=inp.target, metadata={"max_hops": self.MAX_HOPS} ) return self._create_error_output(error_ctx, inp.correlation_id or "") except Exception as e: pass # Let base class handle parsing return await super()._execute_tool(inp, timeout_sec) def _validate_hops(self, hops: str) -> bool: """Validate hop count.""" if not self._HOP_PATTERN.match(hops): return False try: hop_count = int(hops) return 1 <= hop_count <= self.MAX_HOPS except ValueError: return False Example 3: Tool with Configuration (Nmap) - See Provided Code The NmapTool in your codebase is the definitive reference for: Complex argument parsing Configuration integration Policy-based operation modes Script filtering Result parsing Scan templates

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nordeim/Security-MCP-Server-v3'

If you have feedback or need assistance with the MCP directory API, please join our Discord server