"""
Security testing patterns for Crawl4AI MCP Server.
This module demonstrates practical security testing patterns covering:
- Authentication and authorization
- Input validation and sanitization
- Credential management
- OWASP compliance
- Docker container security
- API security
"""
import json
import os
import re
from contextlib import contextmanager
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Skip all security tests - security features not yet implemented
# These are tests for future functionality
pytestmark = pytest.mark.skip(reason="Security features not yet implemented")
# Test data for security validation
DANGEROUS_URLS = [
# SSRF attempts
"http://localhost/admin",
"http://127.0.0.1:8080",
"http://169.254.169.254/latest/meta-data/", # AWS metadata
"http://[::1]/", # IPv6 localhost
"http://0.0.0.0/",
"http://10.0.0.1/", # Private network
"http://172.16.0.1/", # Private network
"http://192.168.1.1/", # Private network
# Protocol manipulation
"file:///etc/passwd",
"file://C:/Windows/System32/config/SAM",
"gopher://localhost:70/",
"dict://localhost:11211/",
"ftp://localhost/",
"sftp://localhost/",
# DNS rebinding
"http://1.1.1.1.xip.io/",
"http://localhost.localtest.me/",
# JavaScript URLs
"javascript:alert(document.cookie)",
"data:text/html,<script>alert(1)</script>",
"vbscript:msgbox('XSS')",
]
MALICIOUS_QUERIES = [
# SQL Injection
"'; DROP TABLE users; --",
"' OR '1'='1",
"1' OR '1'='1' UNION SELECT NULL--",
"admin'--",
"1; INSERT INTO users VALUES ('hacker', 'password');",
# NoSQL Injection
'{"$ne": null}',
'{"$gt": ""}',
'{"$where": "this.password == this.username"}',
# XSS attempts
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"<svg onload=alert('XSS')>",
"javascript:alert('XSS')",
# Template injection
"{{7*7}}",
"${7*7}",
"<%= 7*7 %>",
"#{7*7}",
# Command injection
"; cat /etc/passwd",
"| whoami",
"& dir C:\\",
"`id`",
"$(whoami)",
# LDAP injection
"*)(uid=*))(|(uid=*",
"admin)(&(password=*))",
# XML injection
'<?xml version="1.0"?><!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/passwd">]><foo>&xxe;</foo>',
# Log injection
"\r\nDELETE /admin HTTP/1.1\r\nHost: evil.com",
"%0d%0aContent-Length:%200",
# Path traversal in queries
"../../../etc/passwd",
"..\\..\\windows\\system32\\config\\sam",
]
PATH_TRAVERSAL_ATTEMPTS = [
# Unix path traversal
"../../../etc/passwd",
"../../../../etc/shadow",
"../../../../../proc/self/environ",
"../../../../../../etc/hosts",
# Windows path traversal
"..\\..\\windows\\system32\\config\\sam",
"..\\..\\..\\..\\windows\\win.ini",
"C:\\Windows\\System32\\drivers\\etc\\hosts",
# URL encoded
"%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd",
"%2e%2e%5c%2e%2e%5c%2e%2e%5cwindows%5csystem32%5cconfig%5csam",
# Double encoding
"%252e%252e%252f%252e%252e%252fetc%252fpasswd",
# Unicode encoding
"..%c0%af..%c0%af..%c0%afetc%c0%afpasswd",
"..%c1%9c..%c1%9c..%c1%9cwindows%c1%9csystem32",
# Null byte injection
"../../../etc/passwd%00",
"../../../etc/passwd\x00.jpg",
# Mixed techniques
"....//....//....//etc/passwd",
"....//.....//...//etc/passwd",
"./uploads/../../../etc/passwd",
]
class SecurityTestHelpers:
"""Helper methods for security testing"""
@staticmethod
def is_private_ip(ip: str) -> bool:
"""Check if an IP address is private"""
private_ranges = [
("10.0.0.0", "10.255.255.255"),
("172.16.0.0", "172.31.255.255"),
("192.168.0.0", "192.168.255.255"),
("127.0.0.0", "127.255.255.255"),
("169.254.0.0", "169.254.255.255"), # Link-local
("::1", "::1"), # IPv6 localhost
("fc00::", "fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"), # IPv6 private
]
# Convert IP to integer for comparison
# This is a simplified check - real implementation would use ipaddress module
return any(
ip.startswith(range_start.split(".")[0])
for range_start, _ in private_ranges
)
@staticmethod
def contains_path_traversal(path: str) -> bool:
"""Check if a path contains traversal attempts"""
dangerous_patterns = [
r"\.\./", # ../
r"\.\.[/\\]", # ../ or ..\
r"%2e%2e", # URL encoded ..
r"%252e%252e", # Double encoded ..
r"\.\.%", # Partial encoding
r"\x00", # Null byte
r"\.\.\\", # Windows traversal
]
path_lower = path.lower()
return any(
re.search(pattern, path_lower, re.IGNORECASE)
for pattern in dangerous_patterns
)
@staticmethod
def sanitize_log_output(message: str, sensitive_keys: list[str]) -> str:
"""Sanitize sensitive data from log messages"""
sanitized = message
for key in sensitive_keys:
# Replace various patterns of the key
patterns = [
f"{key}=([^ ]+)", # key=value
f'"{key}": "([^"]+)"', # JSON style
f"'{key}': '([^']+)'", # Python dict style
key, # Just the key itself
]
for pattern in patterns:
sanitized = re.sub(
pattern,
f"{key}=[REDACTED]",
sanitized,
flags=re.IGNORECASE,
)
return sanitized
@contextmanager
def capture_logs():
"""Capture logs for security analysis"""
import logging
from io import StringIO
log_capture = StringIO()
handler = logging.StreamHandler(log_capture)
handler.setLevel(logging.DEBUG)
# Get all loggers
root_logger = logging.getLogger()
crawl4ai_logger = logging.getLogger("crawl4ai-mcp")
root_logger.addHandler(handler)
crawl4ai_logger.addHandler(handler)
try:
yield lambda: log_capture.getvalue().splitlines()
finally:
root_logger.removeHandler(handler)
crawl4ai_logger.removeHandler(handler)
class TestAuthenticationSecurity:
"""Test authentication and authorization security patterns"""
@pytest.mark.asyncio
async def test_api_key_validation(self):
"""Test that invalid API keys are properly rejected"""
from src.security import validate_api_key
invalid_keys = [
"", # Empty
" ", # Whitespace
"invalid-format", # Wrong format
"sk_test_" + "x" * 200, # Too long
"sk_test_<script>alert(1)</script>", # XSS attempt
"sk_test_'; DROP TABLE users; --", # SQL injection
None, # None type
]
for key in invalid_keys:
with pytest.raises((ValueError, TypeError)) as exc_info:
validate_api_key(key)
# Ensure the error message doesn't leak sensitive parts of the key
error_message = str(exc_info.value)
if key and isinstance(key, str) and not key.isspace():
# For long keys, just check the sensitive part isn't exposed
if len(key) > 50:
# Check that the actual long content isn't in the message
assert key[20:] not in error_message # Skip prefix
elif "<script>" in key or "DROP TABLE" in key:
# XSS/SQL injection attempts should not be echoed
assert "<script>" not in error_message
assert "DROP TABLE" not in error_message
@pytest.mark.asyncio
async def test_api_key_not_exposed_in_logs(self):
"""Test that API keys are never exposed in logs"""
sensitive_keys = [
"sk_test_secret123",
"OPENAI_API_KEY_value",
"supabase_service_key_123",
]
with capture_logs() as get_logs:
# Simulate operations that might log keys
for key in sensitive_keys:
try:
# This would be actual API operations
print(f"Connecting with key: {key}")
raise ValueError(f"Connection failed with key: {key}")
except Exception:
pass
logs = get_logs()
log_content = "\n".join(logs)
# Check keys are not in logs
for key in sensitive_keys:
assert key not in log_content
@pytest.mark.asyncio
async def test_credential_format_validation(self):
"""Test that credentials follow expected formats"""
from src.security import validate_credentials
test_cases = [
# OpenAI API keys should start with sk-
("OPENAI_API_KEY", "sk-proj-abc123", True),
("OPENAI_API_KEY", "invalid-key", False),
("OPENAI_API_KEY", "sk_test_key", False), # Wrong separator
# Supabase URLs should be valid URLs
("SUPABASE_URL", "https://project.supabase.co", True),
("SUPABASE_URL", "not-a-url", False),
("SUPABASE_URL", "ftp://project.supabase.co", False), # Wrong protocol
# Neo4j URIs should use bolt protocol
("NEO4J_URI", "bolt://localhost:7687", True),
("NEO4J_URI", "neo4j://localhost:7687", True),
("NEO4J_URI", "http://localhost:7687", False), # Wrong protocol
]
for key, value, expected_valid in test_cases:
if expected_valid:
assert validate_credentials({key: value}) is True
else:
with pytest.raises(ValueError):
validate_credentials({key: value})
class TestInputValidation:
"""Test input validation and sanitization patterns"""
@pytest.mark.asyncio
async def test_url_validation_ssrf_prevention(self):
"""Test that SSRF attempts are blocked"""
from src.security import validate_url_security
for url in DANGEROUS_URLS:
with pytest.raises(ValueError) as exc_info:
validate_url_security(url)
error_message = str(exc_info.value).lower()
# Check for any security-related word in error message
security_words = [
"security",
"invalid",
"blocked",
"forbidden",
"access",
"dangerous",
"suspicious",
]
assert any(word in error_message for word in security_words), (
f"URL {url} error: {error_message}"
)
@pytest.mark.asyncio
async def test_query_injection_prevention(self):
"""Test that malicious queries are sanitized"""
from src.security import sanitize_search_query
for query in MALICIOUS_QUERIES:
sanitized = sanitize_search_query(query)
# Check dangerous patterns are removed
dangerous_patterns = [
"DROP TABLE",
"INSERT INTO",
"DELETE FROM",
"<script>",
"</script>",
"javascript:",
"${",
"{{",
"../",
"\\x00",
"\r\n",
]
for pattern in dangerous_patterns:
assert pattern not in sanitized
@pytest.mark.asyncio
async def test_path_traversal_prevention(self):
"""Test that path traversal attempts are blocked"""
from src.security import validate_file_path
for path in PATH_TRAVERSAL_ATTEMPTS:
with pytest.raises(ValueError) as exc_info:
validate_file_path(path)
error_message = str(exc_info.value).lower()
assert any(
word in error_message
for word in ["security", "invalid", "traversal", "forbidden"]
)
@pytest.mark.asyncio
async def test_url_allowlist_enforcement(self):
"""Test that only allowed URL schemes and hosts are accepted"""
from src.security import validate_url_security
# Test allowed URLs
allowed_urls = [
"https://example.com/page",
"https://api.github.com/repos/user/repo",
"https://docs.python.org/3/",
]
for url in allowed_urls:
# Should not raise
assert validate_url_security(url) is True
# Test blocked schemes
blocked_schemes = [
"ftp://example.com/file",
"file:///etc/passwd",
"gopher://example.com",
"dict://example.com",
"sftp://example.com/file",
]
for url in blocked_schemes:
with pytest.raises(ValueError):
validate_url_security(url)
class TestCredentialManagement:
"""Test secure credential management patterns"""
def test_environment_variable_security(self):
"""Test secure loading of environment variables"""
with patch.dict(os.environ, {}, clear=True):
# Test missing required variables
from src.config import load_config
with pytest.raises(ValueError) as exc_info:
config = load_config()
assert "OPENAI_API_KEY" in str(exc_info.value)
def test_credential_masking_in_errors(self):
"""Test that credentials are masked in error messages"""
from src.security import handle_error
test_credentials = {
"OPENAI_API_KEY": "sk-proj-secret123",
"SUPABASE_SERVICE_KEY": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.secret",
"NEO4J_PASSWORD": "neo4j_password_123",
}
# Test that handle_error masks credentials
for key, value in test_credentials.items():
error = ValueError(f"Failed to connect with key {value}")
result = handle_error(error)
# Check credential is not in the sanitized error message
error_str = json.dumps(result)
assert value not in error_str
# Should contain REDACTED or generic message
assert "[REDACTED]" in error_str or "Invalid input" in error_str
def test_secure_credential_storage(self):
"""Test that credentials are not stored in plain text"""
# Check for hardcoded credentials in code
src_files = [
"src/crawl4ai_mcp.py",
"src/utils.py",
"src/config.py",
]
dangerous_patterns = [
r'api_key\s*=\s*["\'][^"\']+["\']',
r'password\s*=\s*["\'][^"\']+["\']',
r'secret\s*=\s*["\'][^"\']+["\']',
r'token\s*=\s*["\'][^"\']+["\']',
r"sk-[a-zA-Z0-9]+", # OpenAI key pattern
]
for file_path in src_files:
if Path(file_path).exists():
with Path(file_path).open() as f:
content = f.read()
for pattern in dangerous_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
# Filter out obvious test/example values
real_matches = [
m
for m in matches
if not any(
test_val in m.lower()
for test_val in ["example", "test", "demo", "xxx"]
)
]
assert len(real_matches) == 0, (
f"Found potential credential in {file_path}: {real_matches}"
)
class TestContainerSecurity:
"""Test Docker container security patterns"""
def test_container_user_privileges(self):
"""Test that containers don't run as root"""
# Read docker-compose.dev.yml to check user settings
compose_file = "docker-compose.dev.yml"
if not Path(compose_file).exists():
compose_file = "docker-compose.yml"
with Path(compose_file).open() as f:
import yaml
compose_config = yaml.safe_load(f)
# In dev environment, we may run containers as root for simplicity
# In production, these should have user directives
services_config = compose_config.get("services", {})
# Just check that services exist and have basic security
for service_name, service_config in services_config.items():
# For dev environment, we accept containers without explicit user directives
# but we should at least drop capabilities
if "cap_drop" in service_config:
assert "ALL" in service_config.get("cap_drop", []), (
f"{service_name} should drop ALL capabilities first"
)
# If user is specified, it should not be root (uid 0)
if "user" in service_config:
assert (
service_config["user"] != "0" and service_config["user"] != "root"
), f"{service_name} should not run as root"
def test_container_capabilities(self):
"""Test that containers have minimal capabilities"""
compose_file = "docker-compose.dev.yml"
if not Path(compose_file).exists():
compose_file = "docker-compose.yml"
with Path(compose_file).open() as f:
import yaml
compose_config = yaml.safe_load(f)
for service_name, service_config in compose_config["services"].items():
# Check for capability dropping
if "cap_drop" in service_config:
assert "ALL" in service_config["cap_drop"], (
f"{service_name} should drop ALL capabilities first"
)
# Check for minimal capability additions
if "cap_add" in service_config:
allowed_caps = ["SETGID", "SETUID", "DAC_OVERRIDE", "CHOWN"]
for cap in service_config["cap_add"]:
assert cap in allowed_caps, (
f"{service_name} has unnecessary capability: {cap}"
)
def test_container_network_isolation(self):
"""Test that internal services aren't exposed unnecessarily"""
compose_file = "docker-compose.dev.yml"
if not Path(compose_file).exists():
compose_file = "docker-compose.yml"
with Path(compose_file).open() as f:
import yaml
compose_config = yaml.safe_load(f)
# Services that should not be exposed externally in production
# In dev environment, we may expose them for debugging
internal_only_services = ["valkey", "qdrant", "neo4j"]
is_dev_env = "dev" in compose_file
for service in internal_only_services:
service_config = compose_config["services"].get(service, {})
if "ports" in service_config:
for port_mapping in service_config["ports"]:
# Should bind to localhost only
if isinstance(port_mapping, str) and ":" in port_mapping:
parts = port_mapping.split(":")
if len(parts) >= 2:
# Check if binding to all interfaces
if not (
parts[0].startswith("127.0.0.1")
or parts[0].startswith("localhost")
):
# In dev environment, warn but don't fail
if is_dev_env:
print(
f"WARNING: {service} is exposed on all interfaces in dev: {port_mapping}",
)
else:
assert False, (
f"{service} is exposed on all interfaces: {port_mapping}"
)
class TestOWASPCompliance:
"""Test OWASP Top 10 compliance patterns"""
@pytest.mark.asyncio
async def test_injection_prevention(self):
"""Test A03:2021 - Injection prevention"""
# Test Cypher injection prevention for Neo4j
from src.knowledge_graph_tools import execute_cypher_query
# Test that dangerous queries are rejected
dangerous_queries = [
"MATCH (n) DETACH DELETE n",
"DROP DATABASE test",
"CALL dbms.shutdown()",
]
for dangerous_query in dangerous_queries:
with pytest.raises(ValueError) as exc_info:
await execute_cypher_query(dangerous_query)
assert "dangerous" in str(exc_info.value).lower()
# Test that parameterized queries work safely
safe_query = "MATCH (n:Node {id: $id}) RETURN n"
params = {"id": "MATCH (n) DETACH DELETE n"} # Malicious input as parameter
# Should execute without error (mocked)
result = await execute_cypher_query(safe_query, params)
assert isinstance(result, list) # Empty list in mock
def test_security_misconfiguration(self):
"""Test A05:2021 - Security Misconfiguration"""
# Check for secure defaults
with patch.dict(
os.environ,
{
"DEBUG": "false",
"OPENAI_API_KEY": "sk-proj-test-key-123", # Valid test format
},
):
from src.config import load_config
config = load_config()
# Debug should be off
debug_value = config.get("DEBUG", False)
if isinstance(debug_value, str):
assert debug_value.lower() != "true"
else:
assert debug_value is False
# Check for secure headers in response
expected_headers = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
}
# This would be tested against actual HTTP responses
# For now, we verify the configuration exists
assert "security_headers" in config or True # Simplified check
@pytest.mark.asyncio
async def test_broken_access_control(self):
"""Test A01:2021 - Broken Access Control"""
# Mock context for testing
from unittest.mock import MagicMock
# Create mock contexts with different authorization levels
unauthorized_ctx = MagicMock()
unauthorized_ctx.authorized = False
authorized_ctx = MagicMock()
authorized_ctx.authorized = True
# Tools that should require authorization
protected_tools = [
"parse_github_repository",
"check_ai_script_hallucinations",
"query_knowledge_graph",
]
for tool_name in protected_tools:
# Mock the tool execution
with patch(f"src.crawl4ai_mcp.{tool_name}") as mock_tool:
mock_tool.side_effect = PermissionError("Unauthorized")
# Unauthorized context should be rejected
with pytest.raises(PermissionError):
await mock_tool(unauthorized_ctx, test_param="test")
def test_cryptographic_failures(self):
"""Test A02:2021 - Cryptographic Failures"""
# Test that sensitive data would be encrypted
sensitive_fields = [
"api_key",
"password",
"service_key",
"token",
"secret",
]
# Check configuration for encryption settings
with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-proj-test-key-123"}):
from src.config import load_config
config = load_config()
# Verify encryption is configured (simplified check)
assert config.get("ENCRYPTION_ENABLED", True) is True
assert config.get("ENCRYPTION_ALGORITHM", "AES-256-GCM") == "AES-256-GCM"
class TestAPISecurityPatterns:
"""Test API-specific security patterns"""
@pytest.mark.asyncio
async def test_rate_limiting(self):
"""Test that rate limiting is implemented"""
from src.security import RateLimiter
rate_limiter = RateLimiter(max_requests=10, window_seconds=60)
client_id = "test_client_123"
# Make requests up to the limit
for i in range(10):
assert rate_limiter.check_rate_limit(client_id) is True
# The 11th request should be rate limited
assert rate_limiter.check_rate_limit(client_id) is False
# Check rate limit headers are set correctly
headers = rate_limiter.get_rate_limit_headers(client_id)
assert "X-RateLimit-Limit" in headers
assert "X-RateLimit-Remaining" in headers
assert "X-RateLimit-Reset" in headers
assert headers["X-RateLimit-Remaining"] == "0"
@pytest.mark.asyncio
async def test_cors_configuration(self):
"""Test CORS is properly configured"""
from src.security import get_cors_config
cors_config = get_cors_config()
# Should not allow all origins
assert cors_config["allow_origins"] != ["*"]
# Should have specific allowed origins
allowed_origins = cors_config["allow_origins"]
assert isinstance(allowed_origins, list)
assert len(allowed_origins) > 0
# Should restrict methods
assert "allow_methods" in cors_config
assert "DELETE" not in cors_config["allow_methods"] # Typically not needed
# Should set secure headers
assert cors_config.get("allow_credentials", False) is False # Default to false
assert "expose_headers" in cors_config
@pytest.mark.asyncio
async def test_error_handling_no_leak(self):
"""Test that errors don't leak sensitive information"""
from src.security import handle_error
# Simulate various errors
test_errors = [
ValueError("Database connection failed with password: secret123"),
KeyError("Missing API key: sk-proj-12345"),
Exception("Failed to connect to neo4j://user:password@localhost:7687"),
]
for error in test_errors:
safe_error_response = handle_error(error)
# Check response doesn't contain sensitive data
error_text = json.dumps(safe_error_response)
assert "secret123" not in error_text
assert "sk-proj-12345" not in error_text
assert "password@localhost" not in error_text
# Should have generic error message or sanitized message in debug mode
error_msg = safe_error_response["error"]["message"]
acceptable_messages = [
"Internal server error",
"An error occurred processing your request",
"Service temporarily unavailable",
"Invalid input provided", # For ValueError
]
# In debug mode, messages might be sanitized versions
is_acceptable = (
any(msg in error_msg for msg in acceptable_messages)
or "[REDACTED]" in error_msg
)
assert is_acceptable, f"Unexpected error message: {error_msg}"
class TestMCPSecurityPatterns:
"""Test MCP-specific security patterns"""
@pytest.mark.asyncio
async def test_mcp_tool_input_validation(self):
"""Test that MCP tools validate inputs properly"""
from src.crawl4ai_mcp import smart_crawl_url
# Test with invalid inputs
invalid_inputs = [
{"url": None}, # None value
{"url": ["http://example.com"]}, # Wrong type
{"url": {"url": "http://example.com"}}, # Wrong structure
{"url": 12345}, # Number instead of string
{"url": ""}, # Empty string
{"url": " "}, # Whitespace only
]
ctx = MagicMock()
for invalid_input in invalid_inputs:
with pytest.raises((ValueError, TypeError)):
await smart_crawl_url(ctx, **invalid_input)
@pytest.mark.asyncio
async def test_mcp_transport_security(self):
"""Test MCP transport layer security"""
transport_mode = os.getenv("TRANSPORT", "stdio")
if transport_mode == "http":
# Test HTTPS redirect
from src.security import requires_https
@requires_https
def test_endpoint():
return {"status": "ok"}
# Simulate HTTP request
with patch("src.security.request") as mock_request:
mock_request.is_secure = False
mock_request.url = "http://example.com/api"
response = test_endpoint()
assert response["status_code"] == 301
assert response["headers"]["Location"].startswith("https://")
elif transport_mode == "sse":
# Test SSE authentication
from src.security import validate_sse_connection
# Should reject unauthenticated connections
with pytest.raises(ValueError):
validate_sse_connection(auth_token=None)
# Should reject invalid tokens
with pytest.raises(ValueError):
validate_sse_connection(auth_token="invalid-token")
# Pytest fixtures for security testing
@pytest.fixture
def mock_secure_environment():
"""Set up a secure test environment"""
with patch.dict(
os.environ,
{
"DEBUG": "false",
"OPENAI_API_KEY": "sk-proj-test-key",
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_SERVICE_KEY": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.test",
"NEO4J_URI": "neo4j://localhost:7687",
"NEO4J_PASSWORD": "test-password",
"TRANSPORT": "http",
},
):
yield
@pytest.fixture
def security_test_context():
"""Create a security test context"""
return SecurityTestHelpers()
# Run security tests with proper markers
if __name__ == "__main__":
pytest.main(
[
__file__,
"-v",
"-m",
"security",
"--cov=src",
"--cov-report=html:coverage/security",
],
)