Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
test_security_integration.py.disabled20.9 kB
""" Integration Security Tests for Maverick MCP. Tests security under real-world conditions including: - Full request flow with all security middleware - Rate limiting under concurrent load - Authentication + CORS + headers integration - Security performance under stress - End-to-end security scenarios """ import time from concurrent.futures import ThreadPoolExecutor, as_completed from unittest.mock import AsyncMock, MagicMock, patch import pytest from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from fastapi.testclient import TestClient from maverick_mcp.auth.jwt_enhanced import EnhancedJWTManager from maverick_mcp.config.security import SecurityConfig from maverick_mcp.config.security_utils import ( apply_cors_to_fastapi, apply_security_headers_to_fastapi, ) # Skip integration tests if Redis not available redis_available = True try: import redis r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True) r.ping() except Exception: redis_available = False @pytest.fixture def security_app(): """Create a FastAPI app with full security middleware stack.""" app = FastAPI(title="Security Test App") # Mock security config for testing with patch("maverick_mcp.config.security.get_security_config") as mock_config: mock_security_config = MagicMock() mock_security_config.environment = "development" mock_security_config.cors.allowed_origins = ["http://localhost:3000"] mock_security_config.cors.allow_credentials = True mock_security_config.cors.allowed_methods = [ "GET", "POST", "PUT", "DELETE", "OPTIONS", ] mock_security_config.cors.allowed_headers = [ "Authorization", "Content-Type", "X-CSRF-Token", ] mock_security_config.cors.exposed_headers = ["X-Request-ID"] mock_security_config.cors.max_age = 86400 mock_security_config.get_cors_middleware_config.return_value = { "allow_origins": ["http://localhost:3000"], "allow_credentials": True, "allow_methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], "allow_headers": ["Authorization", "Content-Type", "X-CSRF-Token"], "expose_headers": ["X-Request-ID"], "max_age": 86400, } mock_security_config.get_security_headers.return_value = { "X-Content-Type-Options": "nosniff", "X-Frame-Options": "DENY", "X-XSS-Protection": "1; mode=block", "Content-Security-Policy": "default-src 'self'", "Referrer-Policy": "strict-origin-when-cross-origin", } mock_security_config.is_production.return_value = False mock_security_config.is_development.return_value = True mock_config.return_value = mock_security_config # Mock validation to pass with patch( "maverick_mcp.config.security_utils.validate_security_config" ) as mock_validate: mock_validate.return_value = {"valid": True, "issues": [], "warnings": []} # Apply security middleware apply_cors_to_fastapi(app) apply_security_headers_to_fastapi(app) # JWT authentication setup jwt_manager = EnhancedJWTManager() security = HTTPBearer(auto_error=False) async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), ): if not credentials: return None try: payload = jwt_manager.decode_access_token(credentials.credentials) return { "user_id": payload["sub"], "scopes": payload.get("scope", "").split(), } except Exception: return None # Test endpoints @app.get("/public") async def public_endpoint(): return {"message": "public", "timestamp": time.time()} @app.get("/protected") async def protected_endpoint(user=Depends(get_current_user)): if not user: raise HTTPException(status_code=401, detail="Authentication required") return {"message": "protected", "user_id": user["user_id"]} @app.post("/api/data") async def create_data(request: Request, user=Depends(get_current_user)): # Simulate CSRF protection csrf_token = request.headers.get("X-CSRF-Token") if not csrf_token: raise HTTPException(status_code=403, detail="CSRF token required") return {"message": "data created", "csrf_valid": True} @app.get("/rate-limited") async def rate_limited_endpoint(): return {"message": "rate limited endpoint", "timestamp": time.time()} return app, jwt_manager class TestFullSecurityIntegration: """Test full security stack integration.""" def test_public_endpoint_security_headers(self, security_app): """Test public endpoint includes security headers.""" app, _ = security_app client = TestClient(app) response = client.get("/public") assert response.status_code == 200 assert response.headers["X-Frame-Options"] == "DENY" assert response.headers["X-Content-Type-Options"] == "nosniff" # Test that CSP header contains the expected directives csp_header = response.headers["Content-Security-Policy"] assert "default-src 'self'" in csp_header assert "script-src 'self' 'unsafe-inline'" in csp_header assert "connect-src 'self'" in csp_header assert "frame-src 'none'" in csp_header assert "object-src 'none'" in csp_header def test_cors_preflight_with_security_headers(self, security_app): """Test CORS preflight includes security headers.""" app, _ = security_app client = TestClient(app) response = client.options( "/api/data", headers={ "Origin": "http://localhost:3000", "Access-Control-Request-Method": "POST", "Access-Control-Request-Headers": "Content-Type,Authorization", }, ) assert response.status_code == 200 assert ( response.headers.get("Access-Control-Allow-Origin") == "http://localhost:3000" ) assert "POST" in response.headers.get("Access-Control-Allow-Methods", "") assert response.headers["X-Frame-Options"] == "DENY" def test_authenticated_request_flow(self, security_app): """Test full authenticated request flow with security.""" app, jwt_manager = security_app client = TestClient(app) # Generate valid token token, _, _ = jwt_manager.generate_token_pair(user_id="test_user") response = client.get( "/protected", headers={ "Authorization": f"Bearer {token}", "Origin": "http://localhost:3000", }, ) assert response.status_code == 200 assert response.json()["user_id"] == "test_user" assert response.headers["X-Frame-Options"] == "DENY" assert ( response.headers.get("Access-Control-Allow-Origin") == "http://localhost:3000" ) def test_csrf_protection_integration(self, security_app): """Test CSRF protection with CORS and security headers.""" app, jwt_manager = security_app client = TestClient(app) # Generate valid token token, _, _ = jwt_manager.generate_token_pair(user_id="test_user") # Request without CSRF token should fail response = client.post( "/api/data", headers={ "Authorization": f"Bearer {token}", "Origin": "http://localhost:3000", "Content-Type": "application/json", }, json={"data": "test"}, ) assert response.status_code == 403 assert "CSRF token required" in response.json()["detail"] assert response.headers["X-Frame-Options"] == "DENY" def test_csrf_protection_with_valid_token(self, security_app): """Test CSRF protection with valid token.""" app, jwt_manager = security_app client = TestClient(app) # Generate valid token token, _, _ = jwt_manager.generate_token_pair(user_id="test_user") # Request with CSRF token should succeed response = client.post( "/api/data", headers={ "Authorization": f"Bearer {token}", "Origin": "http://localhost:3000", "Content-Type": "application/json", "X-CSRF-Token": "valid_csrf_token", }, json={"data": "test"}, ) assert response.status_code == 200 assert response.json()["csrf_valid"] is True def test_unauthorized_request_blocked(self, security_app): """Test unauthorized requests are properly blocked.""" app, _ = security_app client = TestClient(app) # Request without authentication response = client.get("/protected") assert response.status_code == 401 assert response.headers["X-Frame-Options"] == "DENY" def test_invalid_jwt_blocked(self, security_app): """Test invalid JWT tokens are blocked.""" app, _ = security_app client = TestClient(app) response = client.get( "/protected", headers={"Authorization": "Bearer invalid_token"} ) assert response.status_code == 401 @pytest.mark.integration class TestRateLimitingIntegration: """Test rate limiting integration with other security features.""" @pytest.mark.skipif(not redis_available, reason="Redis not available") def test_rate_limiting_with_security_headers(self, security_app): """Test rate limiting includes security headers in responses.""" app, _ = security_app # Mock rate limiting middleware from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address, default_limits=["5 per minute"]) app.state.limiter = limiter # Apply rate limit to endpoint @limiter.limit("2 per minute") @app.get("/limited") async def limited_endpoint(request: Request): return {"message": "limited"} client = TestClient(app) # First request should succeed response = client.get("/limited") assert response.status_code == 200 assert response.headers["X-Frame-Options"] == "DENY" # Second request should succeed response = client.get("/limited") assert response.status_code == 200 # Third request should be rate limited response = client.get("/limited") if response.status_code == 429: assert response.headers["X-Frame-Options"] == "DENY" def test_rate_limiting_preserves_cors(self, security_app): """Test rate limiting preserves CORS headers.""" app, _ = security_app client = TestClient(app) # Make requests with Origin header for _ in range(3): response = client.get( "/rate-limited", headers={"Origin": "http://localhost:3000"} ) # Even if rate limited, CORS headers should be present if response.status_code == 200: assert ( response.headers.get("Access-Control-Allow-Origin") == "http://localhost:3000" ) @pytest.mark.integration class TestConcurrentSecurityOperations: """Test security under concurrent load.""" def test_concurrent_authentication(self, security_app): """Test JWT authentication under concurrent load.""" app, jwt_manager = security_app client = TestClient(app) # Generate valid token token, _, _ = jwt_manager.generate_token_pair(user_id="test_user") def make_authenticated_request(): return client.get( "/protected", headers={"Authorization": f"Bearer {token}"} ) # Make concurrent requests with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(make_authenticated_request) for _ in range(50)] results = [future.result() for future in as_completed(futures)] # All requests should succeed success_count = sum(1 for r in results if r.status_code == 200) assert success_count == 50 # All should have security headers for result in results: assert result.headers["X-Frame-Options"] == "DENY" def test_concurrent_cors_requests(self, security_app): """Test CORS handling under concurrent load.""" app, _ = security_app client = TestClient(app) def make_cors_request(): return client.options( "/api/data", headers={ "Origin": "http://localhost:3000", "Access-Control-Request-Method": "POST", }, ) # Make concurrent CORS preflight requests with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(make_cors_request) for _ in range(30)] results = [future.result() for future in as_completed(futures)] # All should succeed with proper CORS headers for result in results: assert result.status_code == 200 assert ( result.headers.get("Access-Control-Allow-Origin") == "http://localhost:3000" ) @pytest.mark.integration class TestSecurityPerformance: """Test security middleware performance.""" def test_security_middleware_overhead(self, security_app): """Test security middleware performance overhead.""" app, _ = security_app client = TestClient(app) # Measure time for requests with security middleware start_time = time.time() for _ in range(100): response = client.get("/public") assert response.status_code == 200 end_time = time.time() total_time = end_time - start_time avg_time_per_request = total_time / 100 # Should be fast (less than 10ms per request on average) assert avg_time_per_request < 0.01 def test_security_headers_performance(self, security_app): """Test security headers don't significantly impact performance.""" app, _ = security_app client = TestClient(app) # Time multiple requests times = [] for _ in range(50): start = time.time() response = client.get("/public") end = time.time() assert response.status_code == 200 assert len(response.headers) >= 5 # Should have security headers times.append(end - start) avg_time = sum(times) / len(times) max_time = max(times) # Average should be very fast assert avg_time < 0.01 # No single request should take too long assert max_time < 0.1 def test_jwt_verification_performance(self, security_app): """Test JWT verification performance under load.""" app, jwt_manager = security_app client = TestClient(app) # Generate token token, _, _ = jwt_manager.generate_token_pair(user_id="test_user") # Time JWT verification start_time = time.time() for _ in range(100): response = client.get( "/protected", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code == 200 end_time = time.time() total_time = end_time - start_time avg_time = total_time / 100 # JWT verification should be fast assert avg_time < 0.02 @pytest.mark.integration class TestSecurityErrorHandling: """Test security error handling and recovery.""" def test_malformed_requests_handled_securely(self, security_app): """Test malformed requests are handled securely.""" app, _ = security_app client = TestClient(app) # Malformed JSON response = client.post( "/api/data", content=b"invalid json", headers={"Content-Type": "application/json"}, ) # Should fail gracefully with security headers assert response.status_code in [400, 422] assert response.headers["X-Frame-Options"] == "DENY" def test_oversized_requests_blocked(self, security_app): """Test oversized requests are properly blocked.""" app, _ = security_app client = TestClient(app) # Large payload large_payload = "x" * (1024 * 1024) # 1MB response = client.post( "/api/data", json={"data": large_payload}, headers={"X-CSRF-Token": "test"} ) # Should have security headers regardless of outcome assert "X-Frame-Options" in response.headers def test_invalid_headers_handled(self, security_app): """Test requests with invalid headers are handled securely.""" app, _ = security_app client = TestClient(app) # Request with invalid Authorization header response = client.get("/protected", headers={"Authorization": "InvalidFormat"}) assert response.status_code == 401 assert response.headers["X-Frame-Options"] == "DENY" def test_exception_handling_preserves_security(self, security_app): """Test exceptions don't bypass security headers.""" app, _ = security_app @app.get("/error") async def error_endpoint(): raise Exception("Test error") client = TestClient(app) response = client.get("/error") # Even on server errors, security headers should be present assert response.status_code == 500 assert response.headers["X-Frame-Options"] == "DENY" assert response.headers["X-Content-Type-Options"] == "nosniff" @pytest.mark.integration class TestProductionSecurityScenarios: """Test production-like security scenarios.""" def test_production_security_configuration(self): """Test production security configuration is secure.""" with patch.dict("os.environ", {"ENVIRONMENT": "production"}, clear=False): with patch( "maverick_mcp.config.security._get_cors_origins" ) as mock_origins: mock_origins.return_value = ["https://app.maverick-mcp.com"] config = SecurityConfig() # Production should have secure defaults assert config.is_production() is True assert "localhost" not in str(config.cors.allowed_origins).lower() headers = config.get_security_headers() assert "Strict-Transport-Security" in headers def test_development_to_production_transition(self): """Test security changes when transitioning from dev to production.""" # Development config with patch.dict("os.environ", {"ENVIRONMENT": "development"}, clear=False): dev_config = SecurityConfig() dev_headers = dev_config.get_security_headers() # Production config with patch.dict("os.environ", {"ENVIRONMENT": "production"}, clear=False): with patch( "maverick_mcp.config.security._get_cors_origins" ) as mock_origins: mock_origins.return_value = ["https://app.maverick-mcp.com"] prod_config = SecurityConfig() prod_headers = prod_config.get_security_headers() # Production should have HSTS, development should not assert "Strict-Transport-Security" not in dev_headers assert "Strict-Transport-Security" in prod_headers def test_security_monitoring_integration(self, security_app): """Test security events are properly logged/monitored.""" app, _ = security_app client = TestClient(app) # Mock audit logging with patch("maverick_mcp.api.middleware.security.audit_logger") as mock_audit: mock_audit.log_security_event = AsyncMock() # Make unauthorized request response = client.get("/protected") assert response.status_code == 401 # Should trigger security event logging # (In actual implementation, this would be called by middleware) if __name__ == "__main__": pytest.main([__file__, "-v", "-m", "integration"])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server