test_security_integration.py.disabled•20.9 kB
"""
Integration Security Tests for Maverick MCP.
Tests security under real-world conditions including:
- Full request flow with all security middleware
- Rate limiting under concurrent load
- Authentication + CORS + headers integration
- Security performance under stress
- End-to-end security scenarios
"""
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi.testclient import TestClient
from maverick_mcp.auth.jwt_enhanced import EnhancedJWTManager
from maverick_mcp.config.security import SecurityConfig
from maverick_mcp.config.security_utils import (
apply_cors_to_fastapi,
apply_security_headers_to_fastapi,
)
# Skip integration tests if Redis not available
redis_available = True
try:
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
r.ping()
except Exception:
redis_available = False
@pytest.fixture
def security_app():
"""Create a FastAPI app with full security middleware stack."""
app = FastAPI(title="Security Test App")
# Mock security config for testing
with patch("maverick_mcp.config.security.get_security_config") as mock_config:
mock_security_config = MagicMock()
mock_security_config.environment = "development"
mock_security_config.cors.allowed_origins = ["http://localhost:3000"]
mock_security_config.cors.allow_credentials = True
mock_security_config.cors.allowed_methods = [
"GET",
"POST",
"PUT",
"DELETE",
"OPTIONS",
]
mock_security_config.cors.allowed_headers = [
"Authorization",
"Content-Type",
"X-CSRF-Token",
]
mock_security_config.cors.exposed_headers = ["X-Request-ID"]
mock_security_config.cors.max_age = 86400
mock_security_config.get_cors_middleware_config.return_value = {
"allow_origins": ["http://localhost:3000"],
"allow_credentials": True,
"allow_methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Authorization", "Content-Type", "X-CSRF-Token"],
"expose_headers": ["X-Request-ID"],
"max_age": 86400,
}
mock_security_config.get_security_headers.return_value = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
"Content-Security-Policy": "default-src 'self'",
"Referrer-Policy": "strict-origin-when-cross-origin",
}
mock_security_config.is_production.return_value = False
mock_security_config.is_development.return_value = True
mock_config.return_value = mock_security_config
# Mock validation to pass
with patch(
"maverick_mcp.config.security_utils.validate_security_config"
) as mock_validate:
mock_validate.return_value = {"valid": True, "issues": [], "warnings": []}
# Apply security middleware
apply_cors_to_fastapi(app)
apply_security_headers_to_fastapi(app)
# JWT authentication setup
jwt_manager = EnhancedJWTManager()
security = HTTPBearer(auto_error=False)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
):
if not credentials:
return None
try:
payload = jwt_manager.decode_access_token(credentials.credentials)
return {
"user_id": payload["sub"],
"scopes": payload.get("scope", "").split(),
}
except Exception:
return None
# Test endpoints
@app.get("/public")
async def public_endpoint():
return {"message": "public", "timestamp": time.time()}
@app.get("/protected")
async def protected_endpoint(user=Depends(get_current_user)):
if not user:
raise HTTPException(status_code=401, detail="Authentication required")
return {"message": "protected", "user_id": user["user_id"]}
@app.post("/api/data")
async def create_data(request: Request, user=Depends(get_current_user)):
# Simulate CSRF protection
csrf_token = request.headers.get("X-CSRF-Token")
if not csrf_token:
raise HTTPException(status_code=403, detail="CSRF token required")
return {"message": "data created", "csrf_valid": True}
@app.get("/rate-limited")
async def rate_limited_endpoint():
return {"message": "rate limited endpoint", "timestamp": time.time()}
return app, jwt_manager
class TestFullSecurityIntegration:
"""Test full security stack integration."""
def test_public_endpoint_security_headers(self, security_app):
"""Test public endpoint includes security headers."""
app, _ = security_app
client = TestClient(app)
response = client.get("/public")
assert response.status_code == 200
assert response.headers["X-Frame-Options"] == "DENY"
assert response.headers["X-Content-Type-Options"] == "nosniff"
# Test that CSP header contains the expected directives
csp_header = response.headers["Content-Security-Policy"]
assert "default-src 'self'" in csp_header
assert "script-src 'self' 'unsafe-inline'" in csp_header
assert "connect-src 'self'" in csp_header
assert "frame-src 'none'" in csp_header
assert "object-src 'none'" in csp_header
def test_cors_preflight_with_security_headers(self, security_app):
"""Test CORS preflight includes security headers."""
app, _ = security_app
client = TestClient(app)
response = client.options(
"/api/data",
headers={
"Origin": "http://localhost:3000",
"Access-Control-Request-Method": "POST",
"Access-Control-Request-Headers": "Content-Type,Authorization",
},
)
assert response.status_code == 200
assert (
response.headers.get("Access-Control-Allow-Origin")
== "http://localhost:3000"
)
assert "POST" in response.headers.get("Access-Control-Allow-Methods", "")
assert response.headers["X-Frame-Options"] == "DENY"
def test_authenticated_request_flow(self, security_app):
"""Test full authenticated request flow with security."""
app, jwt_manager = security_app
client = TestClient(app)
# Generate valid token
token, _, _ = jwt_manager.generate_token_pair(user_id="test_user")
response = client.get(
"/protected",
headers={
"Authorization": f"Bearer {token}",
"Origin": "http://localhost:3000",
},
)
assert response.status_code == 200
assert response.json()["user_id"] == "test_user"
assert response.headers["X-Frame-Options"] == "DENY"
assert (
response.headers.get("Access-Control-Allow-Origin")
== "http://localhost:3000"
)
def test_csrf_protection_integration(self, security_app):
"""Test CSRF protection with CORS and security headers."""
app, jwt_manager = security_app
client = TestClient(app)
# Generate valid token
token, _, _ = jwt_manager.generate_token_pair(user_id="test_user")
# Request without CSRF token should fail
response = client.post(
"/api/data",
headers={
"Authorization": f"Bearer {token}",
"Origin": "http://localhost:3000",
"Content-Type": "application/json",
},
json={"data": "test"},
)
assert response.status_code == 403
assert "CSRF token required" in response.json()["detail"]
assert response.headers["X-Frame-Options"] == "DENY"
def test_csrf_protection_with_valid_token(self, security_app):
"""Test CSRF protection with valid token."""
app, jwt_manager = security_app
client = TestClient(app)
# Generate valid token
token, _, _ = jwt_manager.generate_token_pair(user_id="test_user")
# Request with CSRF token should succeed
response = client.post(
"/api/data",
headers={
"Authorization": f"Bearer {token}",
"Origin": "http://localhost:3000",
"Content-Type": "application/json",
"X-CSRF-Token": "valid_csrf_token",
},
json={"data": "test"},
)
assert response.status_code == 200
assert response.json()["csrf_valid"] is True
def test_unauthorized_request_blocked(self, security_app):
"""Test unauthorized requests are properly blocked."""
app, _ = security_app
client = TestClient(app)
# Request without authentication
response = client.get("/protected")
assert response.status_code == 401
assert response.headers["X-Frame-Options"] == "DENY"
def test_invalid_jwt_blocked(self, security_app):
"""Test invalid JWT tokens are blocked."""
app, _ = security_app
client = TestClient(app)
response = client.get(
"/protected", headers={"Authorization": "Bearer invalid_token"}
)
assert response.status_code == 401
@pytest.mark.integration
class TestRateLimitingIntegration:
"""Test rate limiting integration with other security features."""
@pytest.mark.skipif(not redis_available, reason="Redis not available")
def test_rate_limiting_with_security_headers(self, security_app):
"""Test rate limiting includes security headers in responses."""
app, _ = security_app
# Mock rate limiting middleware
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address, default_limits=["5 per minute"])
app.state.limiter = limiter
# Apply rate limit to endpoint
@limiter.limit("2 per minute")
@app.get("/limited")
async def limited_endpoint(request: Request):
return {"message": "limited"}
client = TestClient(app)
# First request should succeed
response = client.get("/limited")
assert response.status_code == 200
assert response.headers["X-Frame-Options"] == "DENY"
# Second request should succeed
response = client.get("/limited")
assert response.status_code == 200
# Third request should be rate limited
response = client.get("/limited")
if response.status_code == 429:
assert response.headers["X-Frame-Options"] == "DENY"
def test_rate_limiting_preserves_cors(self, security_app):
"""Test rate limiting preserves CORS headers."""
app, _ = security_app
client = TestClient(app)
# Make requests with Origin header
for _ in range(3):
response = client.get(
"/rate-limited", headers={"Origin": "http://localhost:3000"}
)
# Even if rate limited, CORS headers should be present
if response.status_code == 200:
assert (
response.headers.get("Access-Control-Allow-Origin")
== "http://localhost:3000"
)
@pytest.mark.integration
class TestConcurrentSecurityOperations:
"""Test security under concurrent load."""
def test_concurrent_authentication(self, security_app):
"""Test JWT authentication under concurrent load."""
app, jwt_manager = security_app
client = TestClient(app)
# Generate valid token
token, _, _ = jwt_manager.generate_token_pair(user_id="test_user")
def make_authenticated_request():
return client.get(
"/protected", headers={"Authorization": f"Bearer {token}"}
)
# Make concurrent requests
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(make_authenticated_request) for _ in range(50)]
results = [future.result() for future in as_completed(futures)]
# All requests should succeed
success_count = sum(1 for r in results if r.status_code == 200)
assert success_count == 50
# All should have security headers
for result in results:
assert result.headers["X-Frame-Options"] == "DENY"
def test_concurrent_cors_requests(self, security_app):
"""Test CORS handling under concurrent load."""
app, _ = security_app
client = TestClient(app)
def make_cors_request():
return client.options(
"/api/data",
headers={
"Origin": "http://localhost:3000",
"Access-Control-Request-Method": "POST",
},
)
# Make concurrent CORS preflight requests
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(make_cors_request) for _ in range(30)]
results = [future.result() for future in as_completed(futures)]
# All should succeed with proper CORS headers
for result in results:
assert result.status_code == 200
assert (
result.headers.get("Access-Control-Allow-Origin")
== "http://localhost:3000"
)
@pytest.mark.integration
class TestSecurityPerformance:
"""Test security middleware performance."""
def test_security_middleware_overhead(self, security_app):
"""Test security middleware performance overhead."""
app, _ = security_app
client = TestClient(app)
# Measure time for requests with security middleware
start_time = time.time()
for _ in range(100):
response = client.get("/public")
assert response.status_code == 200
end_time = time.time()
total_time = end_time - start_time
avg_time_per_request = total_time / 100
# Should be fast (less than 10ms per request on average)
assert avg_time_per_request < 0.01
def test_security_headers_performance(self, security_app):
"""Test security headers don't significantly impact performance."""
app, _ = security_app
client = TestClient(app)
# Time multiple requests
times = []
for _ in range(50):
start = time.time()
response = client.get("/public")
end = time.time()
assert response.status_code == 200
assert len(response.headers) >= 5 # Should have security headers
times.append(end - start)
avg_time = sum(times) / len(times)
max_time = max(times)
# Average should be very fast
assert avg_time < 0.01
# No single request should take too long
assert max_time < 0.1
def test_jwt_verification_performance(self, security_app):
"""Test JWT verification performance under load."""
app, jwt_manager = security_app
client = TestClient(app)
# Generate token
token, _, _ = jwt_manager.generate_token_pair(user_id="test_user")
# Time JWT verification
start_time = time.time()
for _ in range(100):
response = client.get(
"/protected", headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
end_time = time.time()
total_time = end_time - start_time
avg_time = total_time / 100
# JWT verification should be fast
assert avg_time < 0.02
@pytest.mark.integration
class TestSecurityErrorHandling:
"""Test security error handling and recovery."""
def test_malformed_requests_handled_securely(self, security_app):
"""Test malformed requests are handled securely."""
app, _ = security_app
client = TestClient(app)
# Malformed JSON
response = client.post(
"/api/data",
content=b"invalid json",
headers={"Content-Type": "application/json"},
)
# Should fail gracefully with security headers
assert response.status_code in [400, 422]
assert response.headers["X-Frame-Options"] == "DENY"
def test_oversized_requests_blocked(self, security_app):
"""Test oversized requests are properly blocked."""
app, _ = security_app
client = TestClient(app)
# Large payload
large_payload = "x" * (1024 * 1024) # 1MB
response = client.post(
"/api/data", json={"data": large_payload}, headers={"X-CSRF-Token": "test"}
)
# Should have security headers regardless of outcome
assert "X-Frame-Options" in response.headers
def test_invalid_headers_handled(self, security_app):
"""Test requests with invalid headers are handled securely."""
app, _ = security_app
client = TestClient(app)
# Request with invalid Authorization header
response = client.get("/protected", headers={"Authorization": "InvalidFormat"})
assert response.status_code == 401
assert response.headers["X-Frame-Options"] == "DENY"
def test_exception_handling_preserves_security(self, security_app):
"""Test exceptions don't bypass security headers."""
app, _ = security_app
@app.get("/error")
async def error_endpoint():
raise Exception("Test error")
client = TestClient(app)
response = client.get("/error")
# Even on server errors, security headers should be present
assert response.status_code == 500
assert response.headers["X-Frame-Options"] == "DENY"
assert response.headers["X-Content-Type-Options"] == "nosniff"
@pytest.mark.integration
class TestProductionSecurityScenarios:
"""Test production-like security scenarios."""
def test_production_security_configuration(self):
"""Test production security configuration is secure."""
with patch.dict("os.environ", {"ENVIRONMENT": "production"}, clear=False):
with patch(
"maverick_mcp.config.security._get_cors_origins"
) as mock_origins:
mock_origins.return_value = ["https://app.maverick-mcp.com"]
config = SecurityConfig()
# Production should have secure defaults
assert config.is_production() is True
assert "localhost" not in str(config.cors.allowed_origins).lower()
headers = config.get_security_headers()
assert "Strict-Transport-Security" in headers
def test_development_to_production_transition(self):
"""Test security changes when transitioning from dev to production."""
# Development config
with patch.dict("os.environ", {"ENVIRONMENT": "development"}, clear=False):
dev_config = SecurityConfig()
dev_headers = dev_config.get_security_headers()
# Production config
with patch.dict("os.environ", {"ENVIRONMENT": "production"}, clear=False):
with patch(
"maverick_mcp.config.security._get_cors_origins"
) as mock_origins:
mock_origins.return_value = ["https://app.maverick-mcp.com"]
prod_config = SecurityConfig()
prod_headers = prod_config.get_security_headers()
# Production should have HSTS, development should not
assert "Strict-Transport-Security" not in dev_headers
assert "Strict-Transport-Security" in prod_headers
def test_security_monitoring_integration(self, security_app):
"""Test security events are properly logged/monitored."""
app, _ = security_app
client = TestClient(app)
# Mock audit logging
with patch("maverick_mcp.api.middleware.security.audit_logger") as mock_audit:
mock_audit.log_security_event = AsyncMock()
# Make unauthorized request
response = client.get("/protected")
assert response.status_code == 401
# Should trigger security event logging
# (In actual implementation, this would be called by middleware)
if __name__ == "__main__":
pytest.main([__file__, "-v", "-m", "integration"])