"""Security Tests: Path Traversal Prevention.
Tests for path traversal attack prevention:
- File reservation pattern with traversal rejected/warned
- Attachment path traversal blocked
- Archive tree/content path validation
- Agent name with path separators sanitized
- Project slug validation prevents traversal
Reference: mcp_agent_mail-qe7
"""
from __future__ import annotations
from zipfile import ZipFile
import pytest
from fastmcp import Client
from mcp_agent_mail.app import build_mcp_server
from mcp_agent_mail.config import get_settings
from mcp_agent_mail.storage import (
ensure_archive,
get_archive_tree,
get_file_content,
)
from mcp_agent_mail.utils import sanitize_agent_name, validate_agent_name_format
# ============================================================================
# Agent Name Sanitization Tests
# ============================================================================
class TestAgentNamePathSanitization:
"""Tests for agent name sanitization against path traversal."""
def test_agent_name_strips_forward_slashes(self):
"""Agent names with forward slashes have them stripped."""
result = sanitize_agent_name("../../../etc/passwd")
assert result == "etcpasswd"
assert "/" not in (result or "")
def test_agent_name_strips_backslashes(self):
"""Agent names with backslashes have them stripped."""
result = sanitize_agent_name("..\\..\\..\\windows\\system32")
assert result == "windowssystem32"
assert "\\" not in (result or "")
def test_agent_name_strips_dots(self):
"""Agent names with dots have them stripped."""
result = sanitize_agent_name("..secret")
assert result == "secret"
assert ".." not in (result or "")
def test_agent_name_path_traversal_empty_result(self):
"""Pure path traversal strings return None after sanitization."""
result = sanitize_agent_name("../../../")
assert result is None
def test_agent_name_only_slashes_returns_none(self):
"""Names with only path separators return None."""
result = sanitize_agent_name("///")
assert result is None
def test_agent_name_mixed_separators_stripped(self):
"""Mixed path separators are all stripped."""
result = sanitize_agent_name("foo/bar\\baz/../qux")
assert result == "foobarbazqux"
def test_validate_rejects_path_traversal_name(self):
"""validate_agent_name_format rejects path-like names."""
# Path traversal patterns are not valid adjective+noun combinations
assert validate_agent_name_format("../etc") is False
assert validate_agent_name_format("foo/bar") is False
assert validate_agent_name_format("..\\windows") is False
def test_validate_accepts_valid_names(self):
"""Valid adjective+noun names are accepted."""
assert validate_agent_name_format("BlueLake") is True
assert validate_agent_name_format("GreenCastle") is True
assert validate_agent_name_format("RedStone") is True
# ============================================================================
# Archive Path Traversal Tests
# ============================================================================
class TestArchivePathTraversal:
"""Tests for archive tree/content path traversal prevention."""
@pytest.mark.asyncio
async def test_get_archive_tree_rejects_parent_traversal(self, isolated_env):
"""get_archive_tree rejects paths with parent directory traversal."""
settings = get_settings()
archive = await ensure_archive(settings, "path-test-1")
with pytest.raises(ValueError, match="directory traversal"):
await get_archive_tree(archive, "../../../etc")
@pytest.mark.asyncio
async def test_get_archive_tree_rejects_dotdot_path(self, isolated_env):
"""get_archive_tree rejects pure '..' path."""
settings = get_settings()
archive = await ensure_archive(settings, "path-test-2")
with pytest.raises(ValueError, match="directory traversal"):
await get_archive_tree(archive, "..")
@pytest.mark.asyncio
async def test_get_archive_tree_rejects_embedded_traversal(self, isolated_env):
"""get_archive_tree rejects paths with embedded /../."""
settings = get_settings()
archive = await ensure_archive(settings, "path-test-3")
with pytest.raises(ValueError, match="directory traversal"):
await get_archive_tree(archive, "valid/../../../etc")
@pytest.mark.asyncio
async def test_get_archive_tree_rejects_trailing_dotdot(self, isolated_env):
"""get_archive_tree rejects paths ending with /.."""
settings = get_settings()
archive = await ensure_archive(settings, "path-test-4")
with pytest.raises(ValueError, match="directory traversal"):
await get_archive_tree(archive, "messages/..")
@pytest.mark.asyncio
async def test_get_archive_tree_rejects_absolute_path(self, isolated_env):
"""get_archive_tree rejects absolute paths."""
settings = get_settings()
archive = await ensure_archive(settings, "path-test-5")
with pytest.raises(ValueError, match="directory traversal"):
await get_archive_tree(archive, "/etc/passwd")
@pytest.mark.asyncio
async def test_get_archive_tree_rejects_backslash_traversal(self, isolated_env):
"""get_archive_tree rejects Windows-style backslash traversal."""
settings = get_settings()
archive = await ensure_archive(settings, "path-test-6")
# Backslashes are normalized to forward slashes, so ..\ becomes ../
with pytest.raises(ValueError, match="directory traversal"):
await get_archive_tree(archive, "..\\..\\..\\etc")
@pytest.mark.asyncio
async def test_get_file_content_rejects_parent_traversal(self, isolated_env):
"""get_file_content rejects paths with parent directory traversal."""
settings = get_settings()
archive = await ensure_archive(settings, "content-test-1")
with pytest.raises(ValueError, match="directory traversal"):
await get_file_content(archive, "../../../etc/passwd")
@pytest.mark.asyncio
async def test_get_file_content_rejects_dotdot_only(self, isolated_env):
"""get_file_content rejects pure '..' path."""
settings = get_settings()
archive = await ensure_archive(settings, "content-test-2")
with pytest.raises(ValueError, match="directory traversal"):
await get_file_content(archive, "..")
@pytest.mark.asyncio
async def test_get_file_content_rejects_embedded_traversal(self, isolated_env):
"""get_file_content rejects paths with embedded /../."""
settings = get_settings()
archive = await ensure_archive(settings, "content-test-3")
with pytest.raises(ValueError, match="directory traversal"):
await get_file_content(archive, "agents/test/../../../etc/passwd")
@pytest.mark.asyncio
async def test_get_archive_tree_allows_valid_paths(self, isolated_env):
"""get_archive_tree allows legitimate nested paths."""
settings = get_settings()
archive = await ensure_archive(settings, "valid-path-test")
# These should not raise - they're valid relative paths
result = await get_archive_tree(archive, "")
assert isinstance(result, list)
result = await get_archive_tree(archive, "messages")
assert isinstance(result, list)
result = await get_archive_tree(archive, "agents")
assert isinstance(result, list)
# ============================================================================
# Project Slug Validation Tests
# ============================================================================
class TestProjectSlugValidation:
"""Tests for project slug validation against path traversal."""
def test_validate_slug_rejects_traversal(self):
"""Project slugs with path traversal are rejected by slugify."""
from mcp_agent_mail.utils import slugify
# Path traversal patterns should be normalized to safe slugs
result = slugify("../../../etc/passwd")
assert ".." not in result
assert "/" not in result
result = slugify("..\\windows\\system32")
assert "\\" not in result
assert ".." not in result
@pytest.mark.asyncio
async def test_ensure_project_sanitizes_slug(self, isolated_env):
"""ensure_project normalizes human_key to safe slug."""
server = build_mcp_server()
async with Client(server) as client:
# Path traversal in human_key should be sanitized to safe slug
result = await client.call_tool(
"ensure_project",
{"human_key": "/path/to/../../../etc/passwd"},
)
# The slug should not contain traversal patterns
slug = result.data.get("slug", "")
assert ".." not in slug
assert "/" not in slug
assert "\\" not in slug
# ============================================================================
# File Reservation Path Tests
# ============================================================================
class TestFileReservationPathTraversal:
"""Tests for file reservation path handling."""
@pytest.mark.asyncio
async def test_file_reservation_warns_on_broad_patterns(self, isolated_env):
"""File reservation warns on overly broad patterns."""
server = build_mcp_server()
async with Client(server) as client:
await client.call_tool("ensure_project", {"human_key": "/backend"})
await client.call_tool(
"register_agent",
{
"project_key": "/backend",
"program": "test",
"model": "test",
"name": "BlueLake",
},
)
# Very broad pattern should still work but gets logged as warning
result = await client.call_tool(
"file_reservation_paths",
{
"project_key": "/backend",
"agent_name": "BlueLake",
"paths": ["**/*"],
"ttl_seconds": 3600,
},
)
# Reservation is granted (advisory model)
assert result.data.get("granted") is not None
@pytest.mark.asyncio
async def test_file_reservation_path_traversal_pattern(self, isolated_env):
"""File reservation with path traversal pattern is handled."""
server = build_mcp_server()
async with Client(server) as client:
await client.call_tool("ensure_project", {"human_key": "/backend"})
await client.call_tool(
"register_agent",
{
"project_key": "/backend",
"program": "test",
"model": "test",
"name": "GreenCastle",
},
)
# Path traversal pattern - advisory model allows it but it's contained
result = await client.call_tool(
"file_reservation_paths",
{
"project_key": "/backend",
"agent_name": "GreenCastle",
"paths": ["../../../etc/passwd"],
"ttl_seconds": 3600,
},
)
# The reservation is created (advisory) but pattern is stored as-is
# The key is that file operations using this pattern would be rejected
assert "granted" in result.data
# ============================================================================
# Attachment Path Traversal Tests
# ============================================================================
class TestAttachmentPathTraversal:
"""Tests for attachment path traversal handling."""
@pytest.mark.asyncio
async def test_attachment_nonexistent_traversal_path_rejected(self, isolated_env, monkeypatch):
"""Nonexistent attachment paths with traversal are rejected."""
from fastmcp.exceptions import ToolError
# Disable image conversion
monkeypatch.setenv("CONVERT_IMAGES", "false")
from mcp_agent_mail import config as _config
_config.clear_settings_cache()
server = build_mcp_server()
async with Client(server) as client:
await client.call_tool("ensure_project", {"human_key": "/backend"})
await client.call_tool(
"register_agent",
{
"project_key": "/backend",
"program": "test",
"model": "test",
"name": "PurpleBear",
},
)
# Try to attach a nonexistent file with traversal path
# This should raise a ToolError (the system rejects invalid file paths)
with pytest.raises(ToolError) as exc_info:
await client.call_tool(
"send_message",
{
"project_key": "/backend",
"sender_name": "PurpleBear",
"to": ["PurpleBear"],
"subject": "Test missing attachment",
"body_md": "Test message",
"attachment_paths": ["../../../nonexistent.bin"],
"convert_images": False,
},
)
# Verify the error is a safe rejection (either traversal blocked or file missing)
err = str(exc_info.value).lower()
assert ("directory traversal" in err) or ("no such file" in err) or ("not found" in err)
@pytest.mark.asyncio
async def test_attachment_markdown_image_traversal_handled(self, isolated_env, monkeypatch):
"""Markdown image references with traversal patterns are handled safely."""
# Disable image conversion
monkeypatch.setenv("CONVERT_IMAGES", "false")
from mcp_agent_mail import config as _config
_config.clear_settings_cache()
server = build_mcp_server()
async with Client(server) as client:
await client.call_tool("ensure_project", {"human_key": "/backend"})
await client.call_tool(
"register_agent",
{
"project_key": "/backend",
"program": "test",
"model": "test",
"name": "RedStone",
},
)
# Path traversal in markdown body - should be preserved as text
# but not actually access the file system
result = await client.call_tool(
"send_message",
{
"project_key": "/backend",
"sender_name": "RedStone",
"to": ["RedStone"],
"subject": "Test traversal in body",
"body_md": "Check ",
"convert_images": False,
},
)
# Message should be sent (body text is just text)
assert result.data.get("deliveries") is not None
# ============================================================================
# Archive Extraction Security Tests
# ============================================================================
class TestArchiveExtractionSecurity:
"""Tests for archive extraction path safety."""
def test_zipfile_rejects_absolute_paths(self, tmp_path):
"""ZipFile extraction handles absolute path members safely."""
# Create a malicious zip with absolute path
malicious_zip = tmp_path / "malicious.zip"
with ZipFile(malicious_zip, "w") as zf:
# Try to write to absolute path - this tests Python's zipfile behavior
# Modern Python zipfile.extractall() sanitizes these
zf.writestr("normal.txt", "safe content")
# Extract to temp directory
extract_dir = tmp_path / "extracted"
extract_dir.mkdir()
with ZipFile(malicious_zip, "r") as zf:
zf.extractall(extract_dir)
# Only the safe file should be extracted
assert (extract_dir / "normal.txt").exists()
def test_zipfile_handles_traversal_members(self, tmp_path):
"""ZipFile extraction handles path traversal in member names."""
# Create a zip with path traversal in filename
traversal_zip = tmp_path / "traversal.zip"
with ZipFile(traversal_zip, "w") as zf:
# Add normal file
zf.writestr("data/file.txt", "normal content")
# Extract to temp directory
extract_dir = tmp_path / "extracted"
extract_dir.mkdir()
with ZipFile(traversal_zip, "r") as zf:
# Check member names before extraction
for member in zf.namelist():
# Validate no traversal patterns
assert ".." not in member
zf.extractall(extract_dir)
# Files should be safely extracted
assert (extract_dir / "data" / "file.txt").exists()
# ============================================================================
# Integration Tests
# ============================================================================
class TestPathTraversalIntegration:
"""Integration tests for path traversal prevention across components."""
@pytest.mark.asyncio
async def test_full_workflow_with_sanitized_paths(self, isolated_env):
"""Full workflow demonstrates path sanitization across layers."""
server = build_mcp_server()
async with Client(server) as client:
# Create project with path-like human_key
proj_result = await client.call_tool(
"ensure_project",
{"human_key": "/test/project/path"},
)
project_slug = proj_result.data.get("slug", "")
assert ".." not in project_slug
# Register agent with a name that would be sanitized
# Note: register_agent requires valid adjective+noun names
await client.call_tool(
"register_agent",
{
"project_key": "/test/project/path",
"program": "test",
"model": "test",
"name": "BlueLake", # Valid name
},
)
# Send message - paths in body are preserved but rendered safely
msg_result = await client.call_tool(
"send_message",
{
"project_key": "/test/project/path",
"sender_name": "BlueLake",
"to": ["BlueLake"],
"subject": "Path test",
"body_md": "Check file at ../../../etc/passwd",
},
)
assert msg_result.data.get("deliveries") is not None
# Fetch inbox - should work without issues
inbox_result = await client.call_tool(
"fetch_inbox",
{
"project_key": "/test/project/path",
"agent_name": "BlueLake",
},
)
assert isinstance(inbox_result.data, list)