Skip to main content
Glama

Security MCP Server

by nordeim
prompt_to_review_PAD.mdโ€ข51.1 kB
You are an elite AI coding assistant and a helpful partner. You always think deeper and harder to explore all possible solutions, approaches, and options before choosing the most suitable and optimal option to formulate your answer. Please carefully process and internalize this comprehensive framework for how you should operate. Your role and responsibilities are as follows: ## Your Core Operating Framework ### 1. **Deep Analysis & Deliberation** - You will thoroughly explore the problem space before proposing solutions - Use comprehensive chain-of-thought reasoning to uncover true intent behind requests - Consider multiple approaches, weighing trade-offs and long-term implications - Never rush to the first viable solution; always seek the optimal one ### 2. **Systematic Planning & Execution** - Break down complex tasks into logical, independent phases - Create detailed execution plans with integrated checklists - Ensure each step is verifiable and has clear success criteria - Design for modularity and maintainability from the start ### 3. **Technical Excellence Standards** - Deliver production-ready, clean, well-documented code - Prioritize scalability and future extensibility - Consider performance, security, and edge cases - Write code that other developers would be proud to maintain ### 4. **Partnership Approach** - Act as a strategic technical partner, not just a code generator - Provide clear rationale for all decisions and recommendations - Anticipate potential challenges and address them proactively - Focus on creating solutions with genuine "wow" factor in UX/UI ### 5. **Communication & Process** - Use `<think>` tags for internal deliberation when needed - Provide clear, structured responses with reasoning - Maintain transparency about trade-offs and alternatives considered - Ensure non-disruptive implementation strategies ### Your Commitment You will apply this framework consistently, taking the time needed to deliver thoughtful, comprehensive solutions rather than quick fixes. Each response will reflect careful consideration of immediate needs, long-term sustainability, and the broader context of my projects. You will take the above as your meta-instruction going forward. You will apply this framework to all future requests. Please acknowledge that you are ready to operate at this elevated standard. Now, please help me meticulously review the project at GitHub repo URL https://github.com/nordeim/Security-MCP-Server/tree/main and validate the following assessment. ## ๐Ÿ›ก๏ธ What Security MCP Server Is Itโ€™s an **enterprise-grade orchestration framework** for running and managing security tools in a **safe, reliable, and observable** way. Think of it as a **control plane for security scanners** (like Nmap, Masscan, Gobuster), but wrapped with production-ready features: circuit breakers, health monitoring, metrics, and strict safety controls. Instead of running tools ad hoc, this server provides a **unified, extensible platform** where tools can be executed via **HTTP API or stdio**, with **sandboxing, validation, and monitoring** built in. --- ## โœจ Core Value Proposition It solves common pain points in security operations: - **Tool sprawl** โ†’ unify many tools under one consistent interface - **Reliability issues** โ†’ circuit breakers, retries, graceful degradation - **Resource conflicts** โ†’ concurrency limits, rate limiting, timeouts - **Lack of visibility** โ†’ Prometheus metrics, structured logs, health checks - **Safety concerns** โ†’ strict input validation, private network enforcement, sandboxed execution --- ## ๐Ÿ—๏ธ Architecture Highlights - **Transport Layer**: Supports both stdio (CLI) and HTTP/REST API - **Server Core**: - `EnhancedMCPServer` orchestrates everything - `ToolRegistry` manages available tools - `HealthManager` and `MetricsManager` provide observability - **Tool Framework**: - All tools inherit from `MCPBaseTool` - Built-in input validation, resource limits, circuit breaker integration - **Security Layer**: - Only allows RFC1918 private IP ranges - Whitelisted flags per tool - No shell execution (`shell=False`) - Output size and timeout limits - **Observability**: - Prometheus metrics endpoint - Health checks with priority levels (critical, important, informational) - Audit logging and correlation IDs for traceability --- ## ๐Ÿ”ง Built-in Tools - **NmapTool** โ†’ network discovery, port scanning, OS fingerprinting - **MasscanTool** โ†’ ultra-fast port scanning for large ranges - **GobusterTool** โ†’ directory, DNS, and vhost enumeration Each tool has **safety guards** (rate limits, thread caps, wordlist validation, etc.). --- ## ๐Ÿš€ Deployment Options - **Docker Compose** (recommended for quick start) - **Kubernetes** (scalable, with probes and resource limits) - **SystemD service** (for bare-metal/VM installs) --- ## ๐Ÿ“Š Observability & Metrics - Prometheus metrics like: - `mcp_tool_execution_total` (per-tool executions) - `mcp_tool_execution_seconds` (execution time histograms) - `mcp_circuit_breaker_state` (resilience tracking) - Pre-built Grafana dashboards for visualization - Real-time events via Server-Sent Events (SSE) --- ## ๐Ÿ”ฎ Roadmap (from README) - **v2.1 (Q1 2025)**: WebSocket streaming, built-in auth, Redis caching - **v2.2 (Q2 2025)**: Kubernetes operator, multi-tenancy, tool marketplace - **v3.0 (Q3 2025)**: Distributed execution, GraphQL API, tool chaining/workflows --- ## ๐ŸŽฏ Use Cases - **SOC automation**: orchestrating scans in incident response - **DevSecOps pipelines**: pre-deployment scanning, compliance checks - **Penetration testing**: controlled recon and enumeration - **Research**: benchmarking and comparing security tools --- โœ… **In short**: The Security MCP Server is a **security automation backbone**โ€”a hardened, extensible orchestration layer that makes running security tools **safe, observable, and production-ready**. --- ## Project Structure ### File Hierarchy ``` mcp-server/ โ”œโ”€โ”€ mcp_server/ โ”‚ โ”œโ”€โ”€ __init__.py # Package initialization โ”‚ โ”œโ”€โ”€ server.py # Main server implementation โ”‚ โ”œโ”€โ”€ base_tool.py # Base class for all tools โ”‚ โ”œโ”€โ”€ config.py # Configuration management โ”‚ โ”œโ”€โ”€ health.py # Health monitoring system โ”‚ โ”œโ”€โ”€ metrics.py # Metrics collection โ”‚ โ”œโ”€โ”€ circuit_breaker.py # Circuit breaker implementation โ”‚ โ”‚ โ”‚ โ””โ”€โ”€ tools/ # Tool implementations โ”‚ โ”œโ”€โ”€ __init__.py โ”‚ โ”œโ”€โ”€ nmap_tool.py # Nmap network scanner โ”‚ โ”œโ”€โ”€ masscan_tool.py # Masscan port scanner โ”‚ โ””โ”€โ”€ gobuster_tool.py # Gobuster enumeration โ”‚ โ”œโ”€โ”€ tests/ # Test suite โ”‚ โ”œโ”€โ”€ __init__.py โ”‚ โ”œโ”€โ”€ test_server.py โ”‚ โ”œโ”€โ”€ test_base_tool.py โ”‚ โ”œโ”€โ”€ test_config.py โ”‚ โ”œโ”€โ”€ test_health.py โ”‚ โ”œโ”€โ”€ test_metrics.py โ”‚ โ”œโ”€โ”€ test_circuit_breaker.py โ”‚ โ””โ”€โ”€ test_tools/ โ”‚ โ”œโ”€โ”€ test_nmap_tool.py โ”‚ โ”œโ”€โ”€ test_masscan_tool.py โ”‚ โ””โ”€โ”€ test_gobuster_tool.py โ”‚ โ”œโ”€โ”€ config/ # Configuration files โ”‚ โ”œโ”€โ”€ config.yaml # YAML configuration โ”‚ โ””โ”€โ”€ config.json # JSON configuration โ”‚ โ”œโ”€โ”€ docs/ # Documentation โ”‚ โ”œโ”€โ”€ API.md # API documentation โ”‚ โ”œโ”€โ”€ TOOLS.md # Tool documentation โ”‚ โ””โ”€โ”€ DEPLOYMENT.md # Deployment guide โ”‚ โ”œโ”€โ”€ scripts/ # Utility scripts โ”‚ โ”œโ”€โ”€ install.sh # Installation script โ”‚ โ””โ”€โ”€ health_check.py # Health check utility โ”‚ โ”œโ”€โ”€ .env.example # Environment template โ”œโ”€โ”€ docker-compose.yml # Docker composition โ”œโ”€โ”€ Dockerfile # Container definition โ”œโ”€โ”€ requirements.txt # Python dependencies โ”œโ”€โ”€ pyproject.toml # Project metadata โ””โ”€โ”€ README.md # Project documentation ``` --- ## Architecture Diagrams ### Component Architecture ```mermaid graph TB subgraph "Client Layer" CLI[CLI Client] HTTP[HTTP Client] MCP[MCP Client] end subgraph "Transport Layer" STDIO[STDIO Transport] REST[REST API] end subgraph "Server Core" ES[EnhancedMCPServer] TR[ToolRegistry] HM[HealthManager] MM[MetricsManager] end subgraph "Tool Framework" BT[BaseTool] CB[CircuitBreaker] VAL[Validator] EXEC[Executor] end subgraph "Tools" NMAP[NmapTool] MASS[MasscanTool] GOB[GobusterTool] end subgraph "Infrastructure" CFG[Config] LOG[Logging] PROM[Prometheus] end CLI --> STDIO HTTP --> REST MCP --> STDIO STDIO --> ES REST --> ES ES --> TR ES --> HM ES --> MM TR --> BT BT --> CB BT --> VAL BT --> EXEC NMAP --> BT MASS --> BT GOB --> BT ES --> CFG ES --> LOG MM --> PROM style ES fill:#f9f,stroke:#333,stroke-width:4px style BT fill:#bbf,stroke:#333,stroke-width:2px ``` ### Data Flow Diagram ```mermaid sequenceDiagram participant Client participant Server participant ToolRegistry participant Tool participant CircuitBreaker participant Executor participant Metrics Client->>Server: Request (target, tool, args) Server->>Server: Validate transport Server->>ToolRegistry: Get tool alt Tool not found ToolRegistry-->>Client: Error: Tool not found else Tool disabled ToolRegistry-->>Client: Error: Tool disabled else Tool available ToolRegistry->>Tool: Execute request Tool->>CircuitBreaker: Check state alt Circuit open CircuitBreaker-->>Client: Error: Circuit open else Circuit closed/half-open CircuitBreaker->>Tool: Proceed Tool->>Tool: Validate input alt Validation failed Tool-->>Client: Error: Invalid input else Validation passed Tool->>Executor: Spawn process Executor->>Executor: Apply limits Executor->>Executor: Execute command alt Timeout Executor-->>Tool: Timeout error Tool->>CircuitBreaker: Record failure else Success Executor-->>Tool: Output Tool->>CircuitBreaker: Record success end Tool->>Metrics: Record execution Tool-->>Client: Response end end end ``` ### Tool Execution Flow ```mermaid flowchart TD Start([Client Request]) --> Parse[Parse Request] Parse --> GetTool{Tool Exists?} GetTool -->|No| ErrorNotFound[Return Error: Not Found] GetTool -->|Yes| CheckEnabled{Tool Enabled?} CheckEnabled -->|No| ErrorDisabled[Return Error: Disabled] CheckEnabled -->|Yes| CheckCircuit{Circuit Breaker OK?} CheckCircuit -->|Open| ErrorCircuit[Return Error: Circuit Open] CheckCircuit -->|Closed/Half-Open| ValidateInput[Validate Input] ValidateInput --> CheckTarget{Valid Target?} CheckTarget -->|No| ErrorTarget[Return Error: Invalid Target] CheckTarget -->|Yes| CheckArgs{Valid Arguments?} CheckArgs -->|No| ErrorArgs[Return Error: Invalid Args] CheckArgs -->|Yes| AcquireSemaphore[Acquire Semaphore] AcquireSemaphore --> ExecuteTool[Execute Tool] ExecuteTool --> CheckTimeout{Timeout?} CheckTimeout -->|Yes| KillProcess[Kill Process] KillProcess --> RecordFailure[Record Failure] RecordFailure --> UpdateCircuit1[Update Circuit Breaker] UpdateCircuit1 --> ErrorTimeout[Return Error: Timeout] CheckTimeout -->|No| CheckSuccess{Success?} CheckSuccess -->|No| RecordFailure2[Record Failure] RecordFailure2 --> UpdateCircuit2[Update Circuit Breaker] UpdateCircuit2 --> ReturnError[Return Error Output] CheckSuccess -->|Yes| RecordSuccess[Record Success] RecordSuccess --> UpdateCircuit3[Update Circuit Breaker] UpdateCircuit3 --> RecordMetrics[Record Metrics] RecordMetrics --> ReturnSuccess[Return Success Output] ErrorNotFound --> End([End]) ErrorDisabled --> End ErrorCircuit --> End ErrorTarget --> End ErrorArgs --> End ErrorTimeout --> End ReturnError --> End ReturnSuccess --> End style Start fill:#e1f5e1 style End fill:#ffe1e1 style ReturnSuccess fill:#e1ffe1 style ExecuteTool fill:#fff4e1 ``` --- ## Core Components ### 1. **server.py** - Main Server Implementation **Purpose**: Orchestrates the entire MCP server, managing transports, tools, and monitoring. **Key Classes**: - `EnhancedMCPServer`: Main server class - `ToolRegistry`: Tool management and registration **Interfaces**: ```python class EnhancedMCPServer: def __init__(self, tools: List[MCPBaseTool], transport: str = "stdio", config=None) async def run(self) -> None async def run_stdio_original(self) -> None async def run_http_enhanced(self) -> None ``` **Dependencies**: - `config.py`: Configuration management - `health.py`: Health monitoring - `metrics.py`: Metrics collection - `base_tool.py`: Tool framework ### 2. **base_tool.py** - Tool Framework **Purpose**: Provides the base class and framework for all tool implementations. **Key Classes**: - `MCPBaseTool`: Abstract base class for tools - `ToolInput`: Input validation model - `ToolOutput`: Output structure model - `ErrorContext`: Error handling context **Interfaces**: ```python class MCPBaseTool(ABC): async def run(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput async def _spawn(self, cmd: Sequence[str], timeout_sec: float) -> ToolOutput ``` **Key Features**: - Input validation - Resource limits (CPU, memory, output size) - Circuit breaker integration - Metrics collection - Error handling ### 3. **config.py** - Configuration Management **Purpose**: Centralized configuration with hot-reload, validation, and multiple sources. **Key Classes**: - `MCPConfig`: Main configuration class - Various dataclasses for configuration sections **Interfaces**: ```python def get_config(config_path: Optional[str] = None, force_new: bool = False) -> MCPConfig def reset_config() -> None class MCPConfig: def load_config(self) -> None def reload_config(self) -> bool def save_config(self, file_path: Optional[str] = None) -> None def to_dict(self, redact_sensitive: bool = True) -> Dict[str, Any] ``` **Configuration Sources** (priority order): 1. Environment variables (highest) 2. Configuration file (YAML/JSON) 3. Default values (lowest) ### 4. **health.py** - Health Monitoring **Purpose**: Comprehensive health monitoring with priority-based checks. **Key Classes**: - `HealthCheckManager`: Manages all health checks - `SystemResourceHealthCheck`: CPU/memory/disk monitoring - `ToolAvailabilityHealthCheck`: Tool availability verification - `ProcessHealthCheck`: Process health monitoring **Interfaces**: ```python class HealthCheckManager: def add_health_check(self, health_check: HealthCheck, priority: int = 2) -> None def register_check(self, name: str, check_func: Callable, priority: int = 2) -> None async def run_health_checks(self) -> SystemHealth async def start_monitoring(self) -> None ``` **Health Check Priorities**: - 0: Critical (any failure = system unhealthy) - 1: Important (failures = system degraded) - 2: Informational (logged but don't affect overall status) ### 5. **metrics.py** - Metrics Collection **Purpose**: Metrics collection with Prometheus integration and memory management. **Key Classes**: - `MetricsManager`: Central metrics management - `ToolMetrics`: Per-tool metrics wrapper - `SystemMetrics`: System-wide metrics **Interfaces**: ```python class MetricsManager: def get_tool_metrics(self, tool_name: str) -> ToolMetrics def record_tool_execution(self, tool_name: str, success: bool, execution_time: float) -> None def get_all_stats(self) -> Dict[str, Any] def get_prometheus_metrics(self) -> Optional[str] ``` **Metrics Collected**: - Execution count, success/failure rates - Execution time (min, max, average, percentiles) - Active executions - Error types and frequencies ### 6. **circuit_breaker.py** - Resilience Pattern **Purpose**: Implements circuit breaker pattern for failure isolation. **Key Classes**: - `CircuitBreaker`: Main circuit breaker implementation - `CircuitBreakerState`: State enumeration (CLOSED, OPEN, HALF_OPEN) **Interfaces**: ```python class CircuitBreaker: async def call(self, func: Callable, *args, **kwargs) -> Any async def force_open(self) -> None async def force_close(self) -> None def get_stats(self) -> dict ``` **Features**: - Adaptive timeout with exponential backoff - Jitter to prevent thundering herd - Comprehensive statistics - Prometheus metrics integration --- ## Tool System ### Tool Base Class All tools inherit from `MCPBaseTool` and must implement: ```python class MyTool(MCPBaseTool): command_name: str = "mytool" # System command to execute allowed_flags: Sequence[str] = ["-v", "--output"] # Allowed command flags default_timeout_sec: float = 300.0 # Default timeout concurrency: int = 2 # Max concurrent executions # Optional: Override execution for custom logic async def _execute_tool(self, inp: ToolInput, timeout_sec: Optional[float] = None) -> ToolOutput: # Custom validation or processing return await super()._execute_tool(inp, timeout_sec) ``` ### Existing Tools #### **NmapTool** (`nmap_tool.py`) - **Purpose**: Network discovery and security auditing - **Features**: Port scanning, service detection, OS fingerprinting - **Safety**: Network range limits, script validation, rate limiting #### **MasscanTool** (`masscan_tool.py`) - **Purpose**: Fast port scanning for large networks - **Features**: High-speed scanning, banner grabbing - **Safety**: Rate limiting, packet rate enforcement #### **GobusterTool** (`gobuster_tool.py`) - **Purpose**: Directory/DNS/vhost enumeration - **Modes**: dir (directories), dns (subdomains), vhost (virtual hosts) - **Safety**: Thread limits, wordlist validation --- # Codebase review and validation against PAD and README Below is a focused, auditable review of the specified modules, validating implementation details against the architecture intent described in the PAD and README. Each section highlights confirmations, notable design choices, potential gaps or risks, and concrete test validations. --- ## Configuration management The configuration system provides dataclass-backed sections (database, security, circuit breaker, health, metrics, logging, server, tool), supports layered loading (defaults โ†’ file โ†’ environment), clamps values to sane ranges, performs thread-safe hot reload, and redacts sensitive keys on output. It validates server host/port and transport values, and cleanly exposes getters and a global singleton accessor with reset for tests. These align with production-grade requirements for safety, maintainability, and auditability. - **Core confirmations:** - **Layered load order:** Defaults merged with optional file, then with environment; deep-merged per section. - **Validation clamps:** Bounds applied to security, circuit breaker, health, metrics, server, and tool settings; invalid host/transport rejected. - **Thread-safety & reloads:** Global RLock guards load/apply; mtime-based change detection; revert-to-backup on reload failure. - **Sensitive redaction:** Redacts nested keys such as database.url and security keys in to_dict()/__str__. - **Potential gaps/risks:** - **Sensitive keys list:** Includes logging.file_path; not strictly sensitive, but acceptable. Consider adding any auth tokens used in future to the list proactively. - **Env mappings:** Prometheus enable flag is parsed but not explicitly mapped in env map (enabled, prometheus_enabled handled generically). This is fine, but verify docker envs align with these variable names. - **Validation steps:** - **Hot reload:** Edit config file values for timeouts and thresholds; assert reload_config() applies and clamps within specified ranges. - **Transport validation:** Set MCP_SERVER_TRANSPORT=invalid โ†’ expect ValueError; โ€œstdioโ€ and โ€œhttpโ€ accepted. - **Redaction:** to_dict(redact_sensitive=True) must replace database.url and security secret keys with ***REDACTED***. --- ## Health monitoring The health system defines clear status levels (healthy, degraded, unhealthy), check result structures, and a manager that runs multiple checks concurrently with prioritization (0=critical, 1=important, 2=informational). Default checks include system resources, process health, and optional dependency presence. If psutil is unavailable, checks degrade gracefully. The manager provides a monitoring loop, history, and summary aggregation, with overall status determined by priority-weighted logic, matching PAD/READMEโ€™s emphasis on priority-aware health and graceful degradation. - **Core confirmations:** - **Priority-driven status:** Critical failures yield unhealthy; important failures degrade; info-only all unhealthy also degrade. - **Default checks:** System resources (CPU/memory/disk), process health, and dependency checks if configured. - **Robust execution:** Per-check timeouts, centralized timeout for the run, cancellation handling, and rich metadata in results and summary. - **Tool availability check:** Verifies tools are enabled and their command resolves via _resolve_command to detect missing binaries/misconfigurations. - **Potential gaps/risks:** - **psutil dependency:** When unavailable, resource and process checks return degraded; ensure container image includes psutil if full fidelity is required. - **ToolAvailabilityHealthCheck:** Assumes tool objects expose _resolve_command; if base class changes method name, check will misreport. Keep consistent across refactors. - **Validation steps:** - **Priority semantics:** Introduce a custom critical check that returns unhealthy; assert overall = unhealthy and HTTP status mapping in server layer follows 503/207/200 (validated in integration). - **Dependency injection:** Configure health_dependencies with a missing module; assert overall degrades and metadata lists missing dependencies. - **Tool availability:** Temporarily modify PATH to hide nmap; assert degraded with unavailable tool listed. --- ## Metrics and Prometheus integration A singleton Prometheus registry defends against double registration, with counters, histograms, and gauges for tool executions, latency, activity, and errors. Local in-memory stats track success/failure/timeouts, last run, and recent failure rate. MetricsManager aggregates per-tool stats, system stats (uptime, request count, error rate), and supports cleanup and eviction to manage cardinality. It can render Prometheus exposition text if the client library is available. This maps to PAD/README claims for per-tool execution metrics and system-level observability. - **Core confirmations:** - **Metric names:** mcp_tool_execution_total{tool,status,error_type}, mcp_tool_execution_seconds{tool}, mcp_tool_active{tool}, mcp_tool_errors_total{tool,error_type}. - **Thread safety:** RLocks around tool stats; locks for system counters and active gauge updates. - **Registry initialization:** Attempts to find existing collectors to avoid duplication across imports; marks availability if prometheus_client not present. - **System stats:** Uptime, request_count, error_count/rate, active_connections for fallback JSON view and internal monitoring. - **Potential gaps/risks:** - **Registry internals:** Accesses private attributes of the default REGISTRY (_collector_to_names). This is practical but brittle across prometheus_client versions; monitor for upstream changes. - **Active connections metric:** Exposed only in internal JSON, not as Prometheus metrics; consider adding an explicit gauge if deemed valuable. - **Double emission paths:** Both ToolMetrics.record_execution and MetricsManager.record_tool_execution emit Prometheus metrics. Ensure routes donโ€™t call both for the same execution to avoid double counting. - **Validation steps:** - **Metric emission:** Execute a tool successfully and with a forced error; assert increments across execution_total, errors_total, and histogram observations; active gauge increments during execution and decrements after. - **Eviction/cleanup:** Simulate many tools with last_execution_time stale >24h; assert cleanup removes old series from in-memory catalog (Prometheus series persist until TTL). - **Availability fallback:** Temporarily uninstall prometheus_client in a test venv; get_prometheus_metrics() should return None and JSON stats should still populate. --- ## Nmap tool NmapTool extends the base tool with strict target validation (RFC1918 or .lab.internal), network-size limits, safe script categories, port-spec validation, allowed flag safelist, and default performance optimizations (T4, max-parallelism=10, -Pn, top-1000 ports). It pulls timeouts/concurrency and circuit-breaker thresholds from config and surfaces a comprehensive get_tool_info for discovery/UX. This closely matches the PAD/README requirements for safety, performance, and resilience by default. - **Core confirmations:** - **Target safety:** Networks must be private/loopback and โ‰ค 1024 addresses; single IPs must be private/loopback; hostnames must end with .lab.internal. - **Args validation:** Port specs constrained to digits, commas, dashes with range bounds and count limit; scripts restricted to SAFE_SCRIPTS or safe/default/discovery categories; timing templates validated; flags limited to an allowlist (including -sV, -sC, -T4, -Pn, -p, --script, etc.). - **Optimizations:** Adds -T4, --max-parallelism=10, -Pn, --top-ports=1000 when not provided, supporting sensible defaults for speed with restraint. - **Config integration:** Applies circuit breaker thresholds and tool defaults (timeout, concurrency) from MCP config on init. - **Potential gaps/risks:** - **SAFE_SCRIPTS includes โ€œvulnโ€:** Some โ€œvulnโ€ scripts can be intrusive; if the PAD mandates strictly non-intrusive scans, consider moving โ€œvulnโ€ behind an explicit opt-in feature flag or higher trust mode. - **Allowed flags contain โ€œ-Aโ€:** This aggregates aggressive behaviors (OS detection, script scanning). If PAD requires conservative defaults, consider blocking โ€œ-Aโ€ unless explicitly permitted by policy. - **Network size error message:** Recovery suggestion uses f"/{32 - network.prefixlen + 10}" which can produce confusing CIDR math; replace with a clear message like โ€œuse at most /22 for IPv4 to keep โ‰ค1024 hostsโ€. - **Non-flag tokens passthrough:** Parser allows non-flag tokens (treated as positional args). Base tool likely constructs command as [nmap, tokens..., target]; allowing extra positional tokens could create multi-target invocations or accidental input injection. Consider rejecting non-flag tokens or enforcing they only appear as legitimate values of known flags. - **Validation steps:** - **Target acceptance:** Accept 10.0.0.1 and 192.168.1.0/24; reject 8.8.8.8, 192.168.0.0/16 (if exceeding size limit), and example.com (not .lab.internal) with specific error messages. - **Port and script guards:** Reject -p โ€œ0,70000โ€ and --script โ€œexploitโ€; accept -p โ€œ80,443,8000-8100โ€ and --script โ€œdefault,discoveryโ€. - **Aggressive flags:** Attempt โ€œ-Aโ€ and โ€œ--script=vulnโ€; ensure theyโ€™re allowed only if policy permits; else add tests to enforce desired policy by tightening allowed_flags/SAFE_SCRIPTS. - **Optimizations apply:** Call with no extra args; assert injected -T4 --max-parallelism=10 -Pn --top-ports=1000 appear before user args. - **Config overrides:** Set MCP_TOOL_DEFAULT_TIMEOUT=120 and MCP_TOOL_DEFAULT_CONCURRENCY=4; assert reflected in get_tool_info() and execution behavior. --- ## Alignment highlights and discrepancies - **Safety model:** Strong input validation and allowlists enforce private targets, script categories, and port specs, consistent with a โ€œsafe-by-defaultโ€ posture described in PAD/README. - **Observability:** Per-tool counters, latencies, active gauges, and error typing are implemented and exportable to Prometheus, meeting observability claims. - **Health semantics:** Priority-aware aggregation with system/process/dependency checks and tool availability aligns with reliability claims. - **Configuration:** Layered config, clamped bounds, safe reloads, and redaction support operational excellence and auditability. - **Notable deviations to resolve:** - **Intrusive options:** โ€œvulnโ€ and โ€œ-Aโ€ may exceed โ€œstrictly safeโ€ posture; require explicit policy and potentially environment-based toggles to prevent accidental use. - **CIDR hint accuracy:** Correct the network-size recovery hint for clarity and operator trust. - **Argument non-flags:** Consider disallowing arbitrary positional args to avoid multi-target or ambiguous invocation, unless explicitly designed for it. - **Prometheus internals:** Reduce reliance on private REGISTRY attributes to improve forward compatibility. --- ## Test suite blueprint (first-pass coverage) ### Unit tests - **config_test.py** - **Layering precedence:** Defaults < file < env; verify clamps and ValueErrors for invalid transport/host; verify redaction of sensitive keys. - **Hot reload:** Modify on-disk config and assert reload_config applies and reverts on simulated failure. - **health_test.py** - **Priority aggregation:** Inject critical/important/info failures and assert overall status transitions; verify psutil absence yields degraded not unhealthy; dependency miss causes unhealthy for that check and overall downgraded appropriately. - **Tool availability:** Mock tool_registry with failing _resolve_command; assert degraded with explicit listing. - **metrics_test.py** - **Emission and stats:** Record success/failure/timeout and assert counters, histograms, active gauge behavior, and recent_failure_rate; test cleanup and eviction behavior boundaries. - **Prometheus availability fallback:** Simulate missing prometheus_client and assert JSON stats still returned, Prometheus text is None. - **nmap_tool_test.py** - **Target validation:** Accept RFC1918/.lab.internal; reject public IPs and oversized CIDRs with correct messages; validate CIDR hint correctness once fixed. - **Port/script/flags:** Validate legal and illegal values; assert timing templates and allowlist enforcement; test behavior when โ€œ-Aโ€/โ€œvulnโ€ restricted by policy toggle. - **Optimizations & config:** Assert injected defaults and config-driven overrides for timeout/concurrency. ### Integration tests - **metrics_integration_test.py** - **End-to-end emission:** Execute NmapTool with success and injected failures/timeouts; scrape /metrics; assert presence and correct label sets for mcp_tool_execution_total, _seconds, _active, _errors_total. - **health_integration_test.py** - **End-to-end health:** Start HealthCheckManager; assert summaries reflect resource/process/dependency and tool availability over time with history recorded. > Sources: --- ## Quick wins and recommended fixes - **Restrict intrusive capabilities:** - **Option toggle:** Add config.security.allow_intrusive=false to gate โ€œ-Aโ€ and โ€œvulnโ€ scripts; enforce in NmapTool._parse_and_validate_args() and SAFE_SCRIPTS selection. - **Clarify network-size guidance:** - **Fix message:** Replace the recovery_suggestion with a precise limit (e.g., โ€œUse at most /22 for IPv4 to keep โ‰ค1024 hostsโ€). - **Harden argument parsing:** - **Reject stray positional tokens:** Unless explicitly intended, reject non-flag tokens that donโ€™t belong to known flags; this prevents multi-target/unexpected inputs. - **Prometheus registry robustness:** - **Custom registry:** Prefer an explicit CollectorRegistry per process or feature-gated registration flags to avoid accessing private registry attributes. --- ## What Iโ€™ll deliver next - **Pytest skeletons** for the four modules with fixtures and parametrized cases to validate the above. - **One-page traceability matrix** mapping PAD/README claims to specific code lines/functions and the corresponding tests. - **A short PR** with the recommended fixes (intrusive toggle, CIDR message, parser tightening) and targeted unit tests. If you want me to generate the pytest files and a Makefile target now, Iโ€™ll produce drop-in test modules and a minimal tox/pytest.ini to run them cleanly in your repo. https://copilot.microsoft.com/shares/R6jRyKKvs45CkZsf6NGh2 --- # Test plan, pytest suite, Makefile target, and traceability matrix Below is a meticulous plan followed by ready-to-drop-in pytest files, a Makefile target, and a concise one-page traceability matrix. The tests focus on config, health, metrics, and the Nmap tool, aligning to PAD/README claims. Theyโ€™re designed to be auditable, deterministic, and CI-friendly. --- ## Plan overview - **Scope:** config.py, health.py, metrics.py, tools/nmap_tool.py - **Test types:** - **Unit:** pure validation, parsing, bounds, and guards - **Integration-light:** health manager aggregation, tool metrics emission path - **Design principles:** - **Isolation:** monkeypatch and temp files for config reload; no network calls - **Determinism:** fixed seeds where randomness exists; strict assertions - **Skips:** soft-skip tests if optional deps are missing (psutil, prometheus_client) - **Outputs:** - pytest files in tests/ - Makefile target make test - One-page traceability matrix --- ## Project files to add ### tests/conftest.py ```python # tests/conftest.py import os import shutil import tempfile from contextlib import contextmanager import pytest @contextmanager def temp_env(env: dict): old = {} try: for k, v in env.items(): old[k] = os.environ.get(k) if v is None: os.environ.pop(k, None) else: os.environ[k] = str(v) yield finally: for k, v in env.items(): if old[k] is None: os.environ.pop(k, None) else: os.environ[k] = old[k] @pytest.fixture def tmp_config_dir(): d = tempfile.mkdtemp(prefix="mcp_cfg_") try: yield d finally: shutil.rmtree(d, ignore_errors=True) @pytest.fixture def has_psutil(): try: import psutil # noqa: F401 return True except Exception: return False @pytest.fixture def has_prom(): try: import prometheus_client # noqa: F401 return True except Exception: return False ``` --- ### tests/test_config.py ```python # tests/test_config.py import json import os import time import textwrap import pathlib import pytest from mcp_server import config as cfg_mod def write_yaml(path: str, data: str): p = pathlib.Path(path) p.write_text(textwrap.dedent(data), encoding="utf-8") def test_config_layering_precedence(tmp_config_dir, monkeypatch): cfg_file = os.path.join(tmp_config_dir, "mcp.yaml") write_yaml(cfg_file, """ server: host: "0.0.0.0" port: 8080 transport: "http" tool: default_timeout: 45 default_concurrency: 3 """) with monkeypatch.context() as m: m.setenv("MCP_CONFIG_FILE", cfg_file) m.setenv("MCP_SERVER_HOST", "127.0.0.1") # env should override file m.setenv("MCP_TOOL_DEFAULT_TIMEOUT", "60") cfg_mod.reset_config() conf = cfg_mod.get_config() assert conf.server.host == "127.0.0.1" assert conf.server.port == 8080 assert conf.server.transport == "http" assert conf.tool.default_timeout == 60 assert conf.tool.default_concurrency == 3 def test_config_validation_and_clamps(tmp_config_dir, monkeypatch): cfg_file = os.path.join(tmp_config_dir, "mcp.yaml") write_yaml(cfg_file, """ tool: default_timeout: -5 default_concurrency: 0 circuit_breaker: failure_threshold: -1 recovery_timeout: 0 success_threshold: 0 health: interval_seconds: 0 check_timeout_seconds: -1 """) with monkeypatch.context() as m: m.setenv("MCP_CONFIG_FILE", cfg_file) cfg_mod.reset_config() conf = cfg_mod.get_config() # Expect sane minimums applied by validation assert conf.tool.default_timeout >= 1 assert conf.tool.default_concurrency >= 1 assert conf.circuit_breaker.failure_threshold >= 1 assert conf.circuit_breaker.recovery_timeout >= 1 assert conf.circuit_breaker.success_threshold >= 1 assert conf.health.interval_seconds >= 1 assert conf.health.check_timeout_seconds >= 1 def test_invalid_transport_rejected(monkeypatch): with monkeypatch.context() as m: m.setenv("MCP_SERVER_TRANSPORT", "invalid-xyz") cfg_mod.reset_config() with pytest.raises(ValueError): _ = cfg_mod.get_config() def test_redaction_of_sensitive_data(tmp_config_dir, monkeypatch): cfg_file = os.path.join(tmp_config_dir, "mcp.yaml") write_yaml(cfg_file, """ database: url: "postgres://user:secret@example/db" security: api_key: "supersecret" token: "moresecret" """) with monkeypatch.context() as m: m.setenv("MCP_CONFIG_FILE", cfg_file) cfg_mod.reset_config() conf = cfg_mod.get_config() dumped = conf.to_dict(redact_sensitive=True) assert dumped["database"]["url"] == "***REDACTED***" assert dumped["security"]["api_key"] == "***REDACTED***" assert dumped["security"]["token"] == "***REDACTED***" # JSON stringify should also be safe to print _ = json.dumps(dumped) def test_hot_reload_applies_changes(tmp_config_dir, monkeypatch): cfg_file = os.path.join(tmp_config_dir, "mcp.yaml") write_yaml(cfg_file, """ tool: default_timeout: 40 """) with monkeypatch.context() as m: m.setenv("MCP_CONFIG_FILE", cfg_file) cfg_mod.reset_config() conf = cfg_mod.get_config() assert conf.tool.default_timeout == 40 # Modify file time.sleep(0.01) # ensure mtime differences write_yaml(cfg_file, """ tool: default_timeout: 75 """) # Manual reload trigger (depends on config implementation) reloaded = cfg_mod.reload_config() assert reloaded is True conf2 = cfg_mod.get_config() assert conf2.tool.default_timeout == 75 ``` --- ### tests/test_health.py ```python # tests/test_health.py import asyncio import pytest from mcp_server.health import ( HealthStatus, HealthCheckResult, HealthCheck, HealthCheckManager, ) class DummyCheck(HealthCheck): def __init__(self, name, status, priority=1): super().__init__(name=name, priority=priority) self._status = status async def run(self) -> HealthCheckResult: return HealthCheckResult( name=self.name, status=self._status, priority=self.priority, message=f"{self.name}:{self._status.name}", metadata={}, ) @pytest.mark.asyncio async def test_priority_aggregation_rules(): # critical unhealthy should drive overall UNHEALTHY checks = [ DummyCheck("crit_bad", HealthStatus.UNHEALTHY, priority=0), DummyCheck("info_good", HealthStatus.HEALTHY, priority=2), ] mgr = HealthCheckManager(checks=checks) summary = await mgr.run_checks() assert summary.overall_status == HealthStatus.UNHEALTHY # important degraded should yield DEGRADED if no critical unhealthy checks = [ DummyCheck("imp_bad", HealthStatus.UNHEALTHY, priority=1), DummyCheck("info_good", HealthStatus.HEALTHY, priority=2), ] mgr = HealthCheckManager(checks=checks) summary = await mgr.run_checks() assert summary.overall_status == HealthStatus.DEGRADED # all healthy -> HEALTHY checks = [ DummyCheck("crit_ok", HealthStatus.HEALTHY, priority=0), DummyCheck("imp_ok", HealthStatus.HEALTHY, priority=1), DummyCheck("info_ok", HealthStatus.HEALTHY, priority=2), ] mgr = HealthCheckManager(checks=checks) summary = await mgr.run_checks() assert summary.overall_status == HealthStatus.HEALTHY @pytest.mark.asyncio async def test_check_timeouts_do_not_break_summary(): class SlowCheck(HealthCheck): async def run(self): await asyncio.sleep(0.2) return HealthCheckResult( name=self.name, status=HealthStatus.HEALTHY, priority=self.priority, message="slow ok", metadata={}, ) mgr = HealthCheckManager( checks=[SlowCheck(name="slow", priority=1)], check_timeout_seconds=0.05, ) summary = await mgr.run_checks() # A timeout should not raise; manager should mark degraded/unhealthy per policy assert summary is not None assert summary.overall_status in (HealthStatus.DEGRADED, HealthStatus.UNHEALTHY) ``` --- ### tests/test_metrics.py ```python # tests/test_metrics.py import time import pytest from mcp_server.metrics import MetricsManager, ToolMetrics def test_tool_metrics_record_and_failure_rate(): tm = ToolMetrics(tool_name="NmapTool") start = time.time() tm.record_start() time.sleep(0.01) tm.record_end(status="success", error_type=None, duration_seconds=time.time() - start) tm.record_start() time.sleep(0.005) tm.record_end(status="error", error_type="timeout", duration_seconds=0.005) stats = tm.to_dict() assert stats["executions_total"] == 2 assert stats["success_count"] == 1 assert stats["error_count"] == 1 assert stats["timeout_count"] == 1 assert 0.0 <= stats["recent_failure_rate"] <= 1.0 def test_metrics_manager_aggregates_and_cleans(): mm = MetricsManager.get() mm.reset() # ensure clean slate if supported mm.record_tool_execution("NmapTool", status="success", error_type=None, duration_seconds=0.02) mm.record_tool_execution("NmapTool", status="error", error_type="timeout", duration_seconds=0.01) sys_stats = mm.get_system_stats() assert sys_stats["uptime_seconds"] >= 0 tool_stats = mm.get_tool_stats("NmapTool") assert tool_stats["executions_total"] == 2 assert tool_stats["timeout_count"] == 1 @pytest.mark.skipif(not __import__("importlib").util.find_spec("prometheus_client"), reason="prometheus_client not installed") def test_prometheus_exposition_available(): mm = MetricsManager.get() text = mm.get_prometheus_metrics() assert text is None or isinstance(text, str) # If using default REGISTRY route, may return None ``` --- ### tests/test_nmap_tool.py ```python # tests/test_nmap_tool.py import asyncio import pytest from mcp_server.tools.nmap_tool import NmapTool from mcp_server.base_tool import ToolInput @pytest.mark.asyncio async def test_target_validation_private_ip_and_cidr_ok(monkeypatch): tool = NmapTool() # Single private IP ti = ToolInput(target="10.0.0.1", extra_args="") # Validate via tool's internal validator if exposed, else run a dry parse info = tool.get_tool_info() assert info["name"].lower().find("nmap") != -1 @pytest.mark.asyncio async def test_target_validation_rejects_public_ip(): tool = NmapTool() bad = ToolInput(target="8.8.8.8", extra_args="") with pytest.raises(Exception): await tool.run(bad) # Expect validation error before process spawn @pytest.mark.asyncio async def test_network_size_limit_rejects_large_cidr(): tool = NmapTool() # Example: /16 might exceed the MAX_NETWORK_SIZE gate bad = ToolInput(target="192.168.0.0/16", extra_args="") with pytest.raises(Exception): await tool.run(bad) @pytest.mark.asyncio async def test_script_and_ports_validation(monkeypatch): tool = NmapTool() # Safe scripts and valid ports ok = ToolInput(target="10.0.0.5", extra_args="--script=default,discovery -p 22,80,443,8000-8010") # We don't actually want to execute nmap; intercept underlying spawn by making command fail early with pytest.raises(Exception): await tool.run(ok) # Either validation passes then spawn fails, or run path requires nmap installed # Illegal script category bad_script = ToolInput(target="10.0.0.5", extra_args="--script=exploit") with pytest.raises(Exception): await tool.run(bad_script) # Illegal port range bad_ports = ToolInput(target="10.0.0.5", extra_args="-p 0,70000") with pytest.raises(Exception): await tool.run(bad_ports) @pytest.mark.asyncio async def test_aggressive_flags_policy(monkeypatch): tool = NmapTool() # If policy forbids -A or vuln, this should raise; otherwise adjust when security.allow_intrusive is enabled maybe_bad = ToolInput(target="10.0.0.5", extra_args="-A --script=vuln") with pytest.raises(Exception): await tool.run(maybe_bad) ``` Notes: - These Nmap tests intentionally avoid executing nmap; they rely on validation paths raising before process spawn. If your NmapTool requires nmap presence even to parse, we can monkeypatch the spawn method to prevent external calls and assert parsed/validated args directly. Tell me if you want me to add precise monkeypatch hooks based on your code. --- ## Makefile target ```make # Makefile PYTEST ?= pytest PYTEST_ARGS ?= -q --maxfail=1 --disable-warnings .PHONY: test lint unit integration test: $(PYTEST) $(PYTEST_ARGS) tests unit: $(PYTEST) $(PYTEST_ARGS) -k "config or health or metrics or nmap" tests integration: $(PYTEST) $(PYTEST_ARGS) -k "integration" tests ``` Optional: add a requirements-dev.txt with pytest and pytest-asyncio. --- ## One-page traceability matrix | PAD/README claim | Code reference | Test coverage | |---|---|---| | Layered config (defaults < file < env) with validation and hot reload | config.MCPConfig: load order methods; validate_*; reload_config | test_config_layering_precedence; test_config_validation_and_clamps; test_hot_reload_applies_changes | | Sensitive values are redacted for audit-safe outputs | config.MCPConfig.to_dict(redact_sensitive=True) | test_redaction_of_sensitive_data | | Priority-aware health model with critical/important/info | health.HealthStatus; HealthCheck; HealthCheckManager.aggregate | test_priority_aggregation_rules | | Health checks robust to timeouts and failures | health.HealthCheckManager(check_timeout_seconds) | test_check_timeouts_do_not_break_summary | | Prometheus metrics for executions, durations, active, errors | metrics.MetricsManager; ToolMetrics; Prom counters/histograms | test_tool_metrics_record_and_failure_rate; test_metrics_manager_aggregates_and_cleans; test_prometheus_exposition_available | | Safe-by-default Nmap with private-only targets and network-size limit | tools.nmap_tool.NmapTool._validate_* | test_target_validation_rejects_public_ip; test_network_size_limit_rejects_large_cidr | | Strict arg validation for ports, scripts, timing, and flags | tools.nmap_tool.NmapTool._parse_and_validate_args | test_script_and_ports_validation | | Restrict intrusive scans unless explicitly allowed | tools.nmap_tool.NmapTool SAFE_SCRIPTS/allowed_flags and policy checks | test_aggressive_flags_policy | --- ## Next steps - If method names differ (e.g., reload_config vs check_for_changes), I can align the tests precisely once you confirm the exact symbols. - Want me to add a small monkeypatch to NmapTool to bypass subprocess spawn and assert the parsed command/args exactly? --- Got it โ€” the safest way to test **NmapTool**โ€™s parsing/validation logic without actually invoking `nmap` is to **monkeypatch its internal execution method**. In your code, `NmapTool.run()` eventually calls `_execute_tool()` (inherited from `MCPBaseTool`) which spawns a subprocess. We can override that in tests to capture the command/args instead. Hereโ€™s a **patch-style test addition** you can drop into `tests/test_nmap_tool.py`: ```python # tests/test_nmap_tool.py (additions) import pytest from mcp_server.tools.nmap_tool import NmapTool from mcp_server.base_tool import ToolInput @pytest.mark.asyncio async def test_nmaptool_parsing_and_command_build(monkeypatch): tool = NmapTool() captured = {} async def fake_execute(self, cmd, timeout): # Instead of spawning, capture the command and return a dummy result captured["cmd"] = cmd captured["timeout"] = timeout class DummyResult: returncode = 0 stdout = b"FAKE NMAP OUTPUT" stderr = b"" return DummyResult() # Patch the base class _execute_tool monkeypatch.setattr(NmapTool, "_execute_tool", fake_execute) # Provide safe args that should pass validation ti = ToolInput(target="10.0.0.5", extra_args="-sV -p 22,80,443") result = await tool.run(ti) # Assertions: we never spawned, but we captured the built command assert "cmd" in captured cmd = captured["cmd"] # First element should be "nmap" assert cmd[0] == "nmap" # Target should appear at the end assert cmd[-1] == "10.0.0.5" # Our flags should be preserved assert "-sV" in cmd assert "-p" in cmd # Optimizations should be injected if not provided assert "-T4" in cmd assert "--max-parallelism=10" in cmd assert "--top-ports=1000" in cmd # Timeout should come from config defaults assert isinstance(captured["timeout"], int) # The fake result should propagate stdout assert "FAKE NMAP OUTPUT" in result.stdout ``` --- ### How this works - We **monkeypatch** `NmapTool._execute_tool` to a fake coroutine that just records the command and returns a dummy result. - This lets us assert: - The **command structure** (binary, flags, target). - That **optimizations** (`-T4`, `--max-parallelism=10`, `--top-ports=1000`) are injected. - That **timeout** is passed correctly from config. - No subprocess is spawned, so tests are deterministic and safe.

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nordeim/Security-MCP-Server-v3'

If you have feedback or need assistance with the MCP directory API, please join our Discord server