Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_security_penetration.py27.6 kB
""" Security Penetration Testing Suite for MaverickMCP. This suite performs security penetration testing to validate that security protections are active and effective against real attack vectors. Tests include: - Authentication bypass attempts - CSRF attack vectors - Rate limiting evasion - Input validation bypass - Session hijacking attempts - SQL injection prevention - XSS protection validation - Information disclosure prevention """ import time from datetime import UTC, datetime, timedelta from uuid import uuid4 import pytest from fastapi.testclient import TestClient from maverick_mcp.api.api_server import create_api_app @pytest.fixture def security_test_app(): """Create app for security testing.""" return create_api_app() @pytest.fixture def security_client(security_test_app): """Create client for security testing.""" return TestClient(security_test_app) @pytest.fixture def test_user(): """Test user for security testing.""" return { "email": f"sectest{uuid4().hex[:8]}@example.com", "password": "SecurePass123!", "name": "Security Test User", "company": "Test Security Inc", } class TestAuthenticationSecurity: """Test authentication security against bypass attempts.""" @pytest.mark.integration def test_jwt_token_manipulation_resistance(self, security_client, test_user): """Test resistance to JWT token manipulation attacks.""" # Register and login security_client.post("/auth/register", json=test_user) login_response = security_client.post( "/auth/login", json={"email": test_user["email"], "password": test_user["password"]}, ) # Extract tokens from cookies cookies = login_response.cookies access_token_cookie = cookies.get("maverick_access_token") if not access_token_cookie: pytest.skip("JWT tokens not in cookies - may be test environment") # Attempt 1: Modified JWT signature tampered_token = access_token_cookie[:-10] + "tampered123" response = security_client.get( "/user/profile", cookies={"maverick_access_token": tampered_token} ) assert response.status_code == 401 # Should reject tampered token # Attempt 2: Algorithm confusion attack (trying "none" algorithm) none_algorithm_token = "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJ1c2VyX2lkIjoxLCJleHAiOjk5OTk5OTk5OTl9." response = security_client.get( "/user/profile", cookies={"maverick_access_token": none_algorithm_token} ) assert response.status_code == 401 # Should reject "none" algorithm # Attempt 3: Expired token { "user_id": 1, "exp": int((datetime.now(UTC) - timedelta(hours=1)).timestamp()), "iat": int((datetime.now(UTC) - timedelta(hours=2)).timestamp()), "jti": "expired_token", } # This would require creating an expired token with the same secret # For security, we just test that expired tokens are rejected @pytest.mark.integration def test_session_fixation_protection(self, security_client, test_user): """Test protection against session fixation attacks.""" # Get initial session state initial_response = security_client.get("/auth/login") initial_cookies = initial_response.cookies # Login with potential pre-set session security_client.post("/auth/register", json=test_user) login_response = security_client.post( "/auth/login", json={"email": test_user["email"], "password": test_user["password"]}, cookies=initial_cookies, # Try to maintain old session ) # Verify new session is created (cookies should be different) new_cookies = login_response.cookies # Session should be regenerated after login if "maverick_access_token" in new_cookies: # New token should be different from any pre-existing one assert login_response.status_code == 200 @pytest.mark.integration def test_concurrent_session_limits(self, security_client, test_user): """Test limits on concurrent sessions.""" # Register user security_client.post("/auth/register", json=test_user) # Create multiple concurrent sessions session_responses = [] for _i in range(5): client_instance = TestClient(security_client.app) response = client_instance.post( "/auth/login", json={"email": test_user["email"], "password": test_user["password"]}, ) session_responses.append(response) # All should succeed (or be limited if concurrent session limits implemented) success_count = sum(1 for r in session_responses if r.status_code == 200) assert success_count >= 1 # At least one should succeed # If concurrent session limits are implemented, test that old sessions are invalidated @pytest.mark.integration def test_password_brute_force_protection(self, security_client, test_user): """Test protection against password brute force attacks.""" # Register user security_client.post("/auth/register", json=test_user) # Attempt multiple failed logins failed_attempts = [] for i in range(10): response = security_client.post( "/auth/login", json={"email": test_user["email"], "password": f"wrong_password_{i}"}, ) failed_attempts.append(response.status_code) # Small delay to avoid overwhelming the system time.sleep(0.1) # Should have multiple failures assert all(status == 401 for status in failed_attempts) # After multiple failures, account should be locked or rate limited # Test with correct password - should be blocked if protection is active final_attempt = security_client.post( "/auth/login", json={"email": test_user["email"], "password": test_user["password"]}, ) # If brute force protection is active, should be rate limited # Otherwise, should succeed assert final_attempt.status_code in [200, 401, 429] class TestCSRFAttackVectors: """Test CSRF protection against various attack vectors.""" @pytest.mark.integration def test_csrf_attack_simulation(self, security_client, test_user): """Simulate CSRF attacks to test protection.""" # Setup authenticated session security_client.post("/auth/register", json=test_user) login_response = security_client.post( "/auth/login", json={"email": test_user["email"], "password": test_user["password"]}, ) csrf_token = login_response.json().get("csrf_token") # Attack 1: Missing CSRF token attack_response_1 = security_client.post( "/user/profile", json={"name": "Attacked Name"} ) assert attack_response_1.status_code == 403 assert "CSRF" in attack_response_1.json()["detail"] # Attack 2: Invalid CSRF token attack_response_2 = security_client.post( "/user/profile", json={"name": "Attacked Name"}, headers={"X-CSRF-Token": "invalid_token_123"}, ) assert attack_response_2.status_code == 403 # Attack 3: CSRF token from different session # Create second user and get their CSRF token other_user = { "email": f"other{uuid4().hex[:8]}@example.com", "password": "OtherPass123!", "name": "Other User", } other_client = TestClient(security_client.app) other_client.post("/auth/register", json=other_user) other_login = other_client.post( "/auth/login", json={"email": other_user["email"], "password": other_user["password"]}, ) other_csrf = other_login.json().get("csrf_token") # Try to use other user's CSRF token attack_response_3 = security_client.post( "/user/profile", json={"name": "Cross-User Attack"}, headers={"X-CSRF-Token": other_csrf}, ) assert attack_response_3.status_code == 403 # Legitimate request should work legitimate_response = security_client.post( "/user/profile", json={"name": "Legitimate Update"}, headers={"X-CSRF-Token": csrf_token}, ) assert legitimate_response.status_code == 200 @pytest.mark.integration def test_csrf_double_submit_validation(self, security_client, test_user): """Test CSRF double-submit cookie validation.""" # Setup session security_client.post("/auth/register", json=test_user) login_response = security_client.post( "/auth/login", json={"email": test_user["email"], "password": test_user["password"]}, ) csrf_token = login_response.json().get("csrf_token") cookies = login_response.cookies # Attack: Modify CSRF cookie but keep header the same modified_cookies = cookies.copy() if "maverick_csrf_token" in modified_cookies: modified_cookies["maverick_csrf_token"] = "modified_csrf_token" attack_response = security_client.post( "/user/profile", json={"name": "CSRF Cookie Attack"}, headers={"X-CSRF-Token": csrf_token}, cookies=modified_cookies, ) assert attack_response.status_code == 403 @pytest.mark.integration def test_csrf_token_entropy_and_uniqueness(self, security_client, test_user): """Test CSRF tokens have sufficient entropy and are unique.""" # Register user security_client.post("/auth/register", json=test_user) # Generate multiple CSRF tokens csrf_tokens = [] for _i in range(5): response = security_client.post( "/auth/login", json={"email": test_user["email"], "password": test_user["password"]}, ) csrf_token = response.json().get("csrf_token") if csrf_token: csrf_tokens.append(csrf_token) if csrf_tokens: # All tokens should be unique assert len(set(csrf_tokens)) == len(csrf_tokens) # Tokens should have sufficient length (at least 32 chars) for token in csrf_tokens: assert len(token) >= 32 # Tokens should not be predictable patterns for i, token in enumerate(csrf_tokens[1:], 1): # Should not be sequential or pattern-based assert token != csrf_tokens[0] + str(i) assert not token.startswith(csrf_tokens[0][:-5]) class TestRateLimitingEvasion: """Test rate limiting against evasion attempts.""" @pytest.mark.integration def test_ip_based_rate_limit_evasion(self, security_client): """Test attempts to evade IP-based rate limiting.""" # Test basic rate limiting responses = [] for _i in range(25): response = security_client.get("/api/data") responses.append(response.status_code) # Should hit rate limit sum(1 for status in responses if status == 200) rate_limited_count = sum(1 for status in responses if status == 429) assert rate_limited_count > 0 # Should have some rate limited responses # Attempt 1: X-Forwarded-For header spoofing spoofed_responses = [] for i in range(10): response = security_client.get( "/api/data", headers={"X-Forwarded-For": f"192.168.1.{i}"} ) spoofed_responses.append(response.status_code) # Should still be rate limited (proper implementation should use real IP) sum(1 for status in spoofed_responses if status == 429) # Attempt 2: X-Real-IP header spoofing real_ip_responses = [] for i in range(5): response = security_client.get( "/api/data", headers={"X-Real-IP": f"10.0.0.{i}"} ) real_ip_responses.append(response.status_code) # Rate limiting should not be easily bypassed @pytest.mark.integration def test_user_agent_rotation_evasion(self, security_client): """Test rate limiting against user agent rotation.""" user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101", "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X)", ] # Attempt to evade rate limiting by rotating user agents ua_responses = [] for i in range(15): ua = user_agents[i % len(user_agents)] response = security_client.get("/api/data", headers={"User-Agent": ua}) ua_responses.append(response.status_code) # Should still enforce rate limiting regardless of user agent sum(1 for status in ua_responses if status == 429) # Should have some rate limiting if effective @pytest.mark.integration def test_distributed_rate_limit_evasion(self, security_client): """Test against distributed rate limit evasion attempts.""" # Simulate requests with small delays (trying to stay under rate limits) distributed_responses = [] for _i in range(10): response = security_client.get("/api/data") distributed_responses.append(response.status_code) time.sleep(0.1) # Small delay # Even with delays, sustained high-rate requests should be limited # This tests if rate limiting has proper time windows class TestInputValidationBypass: """Test input validation against bypass attempts.""" @pytest.mark.integration def test_sql_injection_prevention(self, security_client, test_user): """Test SQL injection prevention.""" # SQL injection payloads sql_payloads = [ "'; DROP TABLE users; --", "' OR '1'='1", "' UNION SELECT * FROM users --", "'; DELETE FROM users WHERE '1'='1", "' OR 1=1 --", "admin'--", "admin'/*", "' OR 'x'='x", "' AND id IS NULL; --", "'OR 1=1#", ] # Test SQL injection in login email field for payload in sql_payloads: response = security_client.post( "/auth/login", json={"email": payload, "password": "any_password"} ) # Should handle gracefully without SQL errors assert response.status_code in [400, 401, 422] # Not 500 (SQL error) # Response should not contain SQL error messages response_text = response.text.lower() sql_error_indicators = [ "syntax error", "sql", "mysql", "postgresql", "sqlite", "database", "column", "table", "select", "union", ] for indicator in sql_error_indicators: assert indicator not in response_text # Test SQL injection in registration fields for field in ["name", "company"]: malicious_user = test_user.copy() malicious_user[field] = "'; DROP TABLE users; --" response = security_client.post("/auth/register", json=malicious_user) # Should either reject or sanitize the input assert response.status_code in [200, 201, 400, 422] if response.status_code in [200, 201]: # If accepted, verify it's sanitized login_response = security_client.post( "/auth/login", json={ "email": malicious_user["email"], "password": malicious_user["password"], }, ) if login_response.status_code == 200: csrf_token = login_response.json().get("csrf_token") profile_response = security_client.get( "/user/profile", headers={"X-CSRF-Token": csrf_token} ) if profile_response.status_code == 200: profile_data = profile_response.json() # SQL injection should be sanitized assert "DROP TABLE" not in profile_data.get(field, "") @pytest.mark.integration def test_xss_prevention(self, security_client, test_user): """Test XSS prevention.""" xss_payloads = [ "<script>alert('XSS')</script>", "<img src=x onerror=alert('XSS')>", "javascript:alert('XSS')", "<svg onload=alert('XSS')>", "<iframe src=javascript:alert('XSS')>", "';alert('XSS');//", "<body onload=alert('XSS')>", "<input onfocus=alert('XSS') autofocus>", "<select onfocus=alert('XSS') autofocus>", "<textarea onfocus=alert('XSS') autofocus>", ] for payload in xss_payloads: # Test XSS in user registration malicious_user = test_user.copy() malicious_user["email"] = f"xss{uuid4().hex[:8]}@example.com" malicious_user["name"] = payload response = security_client.post("/auth/register", json=malicious_user) if response.status_code in [200, 201]: # Login and check profile login_response = security_client.post( "/auth/login", json={ "email": malicious_user["email"], "password": malicious_user["password"], }, ) if login_response.status_code == 200: csrf_token = login_response.json().get("csrf_token") profile_response = security_client.get( "/user/profile", headers={"X-CSRF-Token": csrf_token} ) if profile_response.status_code == 200: profile_data = profile_response.json() stored_name = profile_data.get("name", "") # XSS should be escaped or removed assert "<script>" not in stored_name assert "javascript:" not in stored_name assert "onerror=" not in stored_name assert "onload=" not in stored_name assert "alert(" not in stored_name @pytest.mark.integration def test_path_traversal_prevention(self, security_client): """Test path traversal prevention.""" path_traversal_payloads = [ "../../../etc/passwd", "..\\..\\..\\windows\\system32\\config\\sam", "....//....//....//etc/passwd", "%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd", "..%252f..%252f..%252fetc%252fpasswd", "..%c0%af..%c0%af..%c0%afetc%c0%afpasswd", ] # Test path traversal in file access endpoints (if any) for payload in path_traversal_payloads: # Test in URL path response = security_client.get(f"/api/files/{payload}") # Should return 404 or 400, not 500 or file contents assert response.status_code in [400, 404, 422] # Should not return file system contents response_text = response.text.lower() sensitive_file_indicators = [ "root:", "daemon:", "bin:", "sys:", # /etc/passwd content "[boot loader]", "[operating systems]", # Windows boot.ini "password", "hash", "secret", ] for indicator in sensitive_file_indicators: assert indicator not in response_text @pytest.mark.integration def test_command_injection_prevention(self, security_client, test_user): """Test command injection prevention.""" command_injection_payloads = [ "; cat /etc/passwd", "| cat /etc/passwd", "& dir", "`cat /etc/passwd`", "$(cat /etc/passwd)", "; rm -rf /", "&& rm -rf /", "|| rm -rf /", "; shutdown -h now", "'; whoami; echo '", ] # Test command injection in various fields for payload in command_injection_payloads: malicious_user = test_user.copy() malicious_user["email"] = f"cmd{uuid4().hex[:8]}@example.com" malicious_user["company"] = payload response = security_client.post("/auth/register", json=malicious_user) # Should handle gracefully assert response.status_code in [200, 201, 400, 422] # Should not execute system commands response_text = response.text command_output_indicators = [ "root:", "daemon:", "bin:", # Output of cat /etc/passwd "total ", "drwx", # Output of ls -la "uid=", "gid=", # Output of whoami/id ] for indicator in command_output_indicators: assert indicator not in response_text class TestInformationDisclosure: """Test prevention of information disclosure.""" @pytest.mark.integration def test_error_message_sanitization(self, security_client): """Test that error messages don't leak sensitive information.""" # Test 404 error response = security_client.get("/nonexistent/endpoint/123") assert response.status_code == 404 error_data = response.json() error_message = str(error_data).lower() # Should not contain sensitive system information sensitive_info = [ "/users/", "/home/", "\\users\\", "\\home\\", # File paths "password", "secret", "key", "token", "jwt", # Credentials "localhost", "127.0.0.1", "redis://", "postgresql://", # Internal addresses "traceback", "stack trace", "exception", "error at", # Stack traces "python", "uvicorn", "fastapi", "sqlalchemy", # Framework details "database", "sql", "query", "connection", # Database details ] for info in sensitive_info: assert info not in error_message # Should include request ID for tracking assert "request_id" in error_data or "error_id" in error_data @pytest.mark.integration def test_debug_information_disclosure(self, security_client): """Test that debug information is not disclosed.""" # Attempt to trigger various error conditions error_test_cases = [ ("/auth/login", {"invalid": "json_structure"}), ("/user/profile", {}), # Missing authentication ] for endpoint, data in error_test_cases: response = security_client.post(endpoint, json=data) # Should not contain debug information response_text = response.text.lower() debug_indicators = [ "traceback", "stack trace", "file ", "line ", "exception", "raise ", "assert", "debug", "__file__", "__name__", "locals()", "globals()", ] for indicator in debug_indicators: assert indicator not in response_text @pytest.mark.integration def test_version_information_disclosure(self, security_client): """Test that version information is not disclosed.""" # Test common endpoints that might leak version info test_endpoints = [ "/health", "/", "/api/docs", "/metrics", ] for endpoint in test_endpoints: response = security_client.get(endpoint) if response.status_code == 200: response_text = response.text.lower() # Should not contain detailed version information version_indicators = [ "python/", "fastapi/", "uvicorn/", "nginx/", "version", "build", "commit", "git", "dev", "debug", "staging", "test", ] # Some version info might be acceptable in health endpoints if endpoint != "/health": for indicator in version_indicators: assert indicator not in response_text @pytest.mark.integration def test_user_enumeration_prevention(self, security_client): """Test prevention of user enumeration attacks.""" # Test with valid email (user exists) existing_user = { "email": f"existing{uuid4().hex[:8]}@example.com", "password": "ValidPass123!", "name": "Existing User", } security_client.post("/auth/register", json=existing_user) # Test login with existing user but wrong password response_existing = security_client.post( "/auth/login", json={"email": existing_user["email"], "password": "wrong_password"}, ) # Test login with non-existing user response_nonexisting = security_client.post( "/auth/login", json={ "email": f"nonexisting{uuid4().hex[:8]}@example.com", "password": "any_password", }, ) # Both should return similar error messages and status codes assert response_existing.status_code == response_nonexisting.status_code # Error messages should not distinguish between cases error_1 = response_existing.json().get("detail", "") error_2 = response_nonexisting.json().get("detail", "") # Should not contain user-specific information user_specific_terms = [ "user not found", "user does not exist", "invalid user", "email not found", "account not found", "user unknown", ] for term in user_specific_terms: assert term.lower() not in error_1.lower() assert term.lower() not in error_2.lower() if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server