Skip to main content
Glama

Adversary MCP Server

by brettbergin
test_migration_integration.py22.4 kB
"""Comprehensive integration tests for database migration system.""" import json import tempfile import time from pathlib import Path from unittest.mock import patch import pytest from adversary_mcp_server.database.migrations import DataMigrationManager from adversary_mcp_server.database.models import ( AdversaryDatabase, MCPToolExecution, ScanEngineExecution, ThreatFinding, ) from adversary_mcp_server.migration.database_migration import DatabaseMigrationManager from adversary_mcp_server.migration.orchestrator import MigrationOrchestrator @pytest.fixture def temp_db_path(): """Create temporary database path.""" with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: temp_path = Path(f.name) yield temp_path # Cleanup if temp_path.exists(): temp_path.unlink() @pytest.fixture def temp_legacy_cache_dir(): """Create temporary legacy cache directory with test files.""" with tempfile.TemporaryDirectory() as temp_dir: cache_dir = Path(temp_dir) cache_dir.mkdir(exist_ok=True) # Create mock legacy SQLite file with proper SQLite format using SQLAlchemy from sqlalchemy import ( Column, Float, Integer, MetaData, String, Table, create_engine, ) from sqlalchemy.orm import sessionmaker legacy_db = cache_dir / "cache_metadata.db" engine = create_engine(f"sqlite:///{legacy_db}") # Define legacy table structure metadata = MetaData() scan_metadata_table = Table( "scan_metadata", metadata, Column("id", Integer, primary_key=True), Column("scan_id", String), Column("file_path", String), Column("timestamp", Float), ) # Create table and add test data metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() try: # Insert test data using SQLAlchemy ORM session.execute( scan_metadata_table.insert(), { "scan_id": "legacy-scan-123", "file_path": "/test/legacy.py", "timestamp": 1234567890.0, }, ) session.commit() finally: session.close() engine.dispose() yield cache_dir @pytest.fixture def temp_legacy_metrics_dir(): """Create temporary legacy metrics directory with test JSON files.""" with tempfile.TemporaryDirectory() as temp_dir: metrics_dir = Path(temp_dir) metrics_dir.mkdir(exist_ok=True) # Create mock JSON metrics file metrics_file = metrics_dir / "legacy_metrics.json" with metrics_file.open("w") as f: json.dump( { "metadata": {"collected_at": time.time(), "db_size_mb": 10.5}, "legacy_metrics": {"scan_count": 5, "threat_count": 10}, "scan_metrics": {"total_scans": 5, "total_findings": 10}, }, f, ) yield metrics_dir @pytest.fixture def populated_database(temp_db_path): """Create a database with test data including inconsistencies.""" db = AdversaryDatabase(temp_db_path) with db.get_session() as session: # Create test scan execution scan_execution = ScanEngineExecution( scan_id="test-scan-123", trigger_source="test", scan_type="file", target_path="/test/file.py", threats_found=5, # This will be inconsistent with actual threat findings execution_start=time.time(), execution_end=time.time() + 1.0, ) session.add(scan_execution) # Create MCP tool execution mcp_execution = MCPToolExecution( tool_name="adv_scan_file", session_id="test-scan-123", request_params={"path": "/test/file.py"}, execution_start=time.time(), execution_end=time.time() + 1.0, findings_count=3, # This will be inconsistent ) session.add(mcp_execution) # Create fewer threat findings than recorded (inconsistency) for i in range(2): threat = ThreatFinding( scan_id="test-scan-123", finding_uuid=f"test-threat-{i}", scanner_source="test", category="test_category", severity="medium", file_path="/test/file.py", line_start=10 + i, line_end=10 + i, title=f"Test Threat {i}", timestamp=time.time(), ) session.add(threat) session.commit() yield db db.close() class TestDataMigrationIntegration: """Test data consistency migration functionality.""" def test_data_migration_detects_inconsistencies(self, populated_database): """Test that data migration detects and reports inconsistencies.""" manager = DataMigrationManager(populated_database) # Validate current state results = manager.validate_data_consistency() assert results["validation_success"] is True assert results["total_inconsistencies"] > 0 assert not results["data_consistent"] def test_data_migration_fixes_inconsistencies(self, populated_database): """Test that data migration fixes inconsistencies.""" manager = DataMigrationManager(populated_database) # Run migration results = manager.fix_summary_field_inconsistencies() assert results["migration_success"] is True assert results["total_records_fixed"] > 0 # Validate that inconsistencies are fixed validation_results = manager.validate_data_consistency() assert validation_results["data_consistent"] is True assert validation_results["total_inconsistencies"] == 0 def test_data_migration_handles_empty_database(self, temp_db_path): """Test that data migration handles empty database gracefully.""" db = AdversaryDatabase(temp_db_path) manager = DataMigrationManager(db) results = manager.validate_data_consistency() assert results["validation_success"] is True assert results["data_consistent"] is True assert results["total_inconsistencies"] == 0 db.close() def test_orphaned_records_cleanup(self, populated_database): """Test cleanup of orphaned records.""" manager = DataMigrationManager(populated_database) # Create orphaned threat finding (scan doesn't exist) with populated_database.get_session() as session: orphaned_threat = ThreatFinding( scan_id="nonexistent-scan", finding_uuid="orphaned-threat", scanner_source="test", category="test", severity="low", file_path="/test/orphaned.py", line_start=1, line_end=1, title="Orphaned Threat", timestamp=time.time(), ) session.add(orphaned_threat) session.commit() # Run cleanup results = manager.cleanup_orphaned_records() assert results["cleanup_success"] is True assert results["orphaned_threats_deleted"] > 0 def test_stale_executions_cleanup(self, populated_database): """Test cleanup of stale executions.""" manager = DataMigrationManager(populated_database) # Create stale execution (old and no end time) with populated_database.get_session() as session: stale_execution = ScanEngineExecution( scan_id="stale-scan", trigger_source="test", scan_type="file", target_path="/test/stale.py", execution_start=time.time() - (25 * 3600), # 25 hours ago # No execution_end - indicates hanging execution ) session.add(stale_execution) session.commit() # Run cleanup results = manager.mark_stale_executions_as_failed() assert results["cleanup_success"] is True assert results["total_executions_fixed"] > 0 class TestLegacyMigrationIntegration: """Test legacy system migration functionality.""" @patch("adversary_mcp_server.migration.database_migration.get_app_cache_dir") @patch("adversary_mcp_server.migration.database_migration.get_app_metrics_dir") def test_legacy_migration_detects_files( self, mock_metrics_dir, mock_cache_dir, temp_legacy_cache_dir, temp_legacy_metrics_dir, temp_db_path, ): """Test that legacy migration detects legacy files.""" mock_cache_dir.return_value = temp_legacy_cache_dir mock_metrics_dir.return_value = temp_legacy_metrics_dir manager = DatabaseMigrationManager(temp_db_path) results = manager.check_migration_needed() assert results["migration_needed"] is True assert len(results["legacy_sqlite_files"]) > 0 assert len(results["json_metrics_files"]) > 0 @patch("adversary_mcp_server.migration.database_migration.get_app_cache_dir") @patch("adversary_mcp_server.migration.database_migration.get_app_metrics_dir") def test_legacy_migration_no_files( self, mock_metrics_dir, mock_cache_dir, temp_db_path ): """Test that legacy migration handles no legacy files.""" # Point to empty directories with tempfile.TemporaryDirectory() as empty_dir: empty_cache = Path(empty_dir) / "cache" empty_metrics = Path(empty_dir) / "metrics" empty_cache.mkdir(parents=True, exist_ok=True) empty_metrics.mkdir(parents=True, exist_ok=True) mock_cache_dir.return_value = empty_cache mock_metrics_dir.return_value = empty_metrics manager = DatabaseMigrationManager(temp_db_path) results = manager.check_migration_needed() assert results["migration_needed"] is False @patch("adversary_mcp_server.migration.database_migration.get_app_cache_dir") @patch("adversary_mcp_server.migration.database_migration.get_app_metrics_dir") def test_legacy_migration_creates_backup( self, mock_metrics_dir, mock_cache_dir, temp_legacy_cache_dir, temp_legacy_metrics_dir, temp_db_path, ): """Test that legacy migration creates backup.""" mock_cache_dir.return_value = temp_legacy_cache_dir mock_metrics_dir.return_value = temp_legacy_metrics_dir manager = DatabaseMigrationManager(temp_db_path) results = manager.run_full_migration(backup=True) assert results.get("backup_created") is True assert "backup_path" in results class TestMigrationOrchestratorIntegration: """Test the unified migration orchestrator.""" @patch("adversary_mcp_server.migration.database_migration.get_app_cache_dir") @patch("adversary_mcp_server.migration.database_migration.get_app_metrics_dir") def test_orchestrator_dry_run( self, mock_metrics_dir, mock_cache_dir, temp_legacy_cache_dir, temp_legacy_metrics_dir, populated_database, ): """Test orchestrator dry run functionality.""" mock_cache_dir.return_value = temp_legacy_cache_dir.parent mock_metrics_dir.return_value = temp_legacy_metrics_dir.parent orchestrator = MigrationOrchestrator(populated_database.db_path) results = orchestrator.run_complete_migration(dry_run=True) assert results["dry_run"] is True assert results["overall_success"] is True assert "phases" in results assert len(results["phases"]) > 0 def test_orchestrator_complete_workflow_no_legacy(self, populated_database): """Test orchestrator complete workflow without legacy migration.""" orchestrator = MigrationOrchestrator(populated_database.db_path) results = orchestrator.run_complete_migration( skip_legacy=True, skip_constraints=True, backup=False ) assert results["overall_success"] is True assert "phases" in results assert "summary" in results # Should have fixed data inconsistencies data_phase = results["phases"].get("data_migration", {}) assert data_phase.get("success") is True def test_orchestrator_handles_errors_gracefully(self, temp_db_path): """Test that orchestrator handles errors gracefully.""" orchestrator = MigrationOrchestrator(temp_db_path) # Mock a component to fail with patch.object( orchestrator.health_checker, "run_comprehensive_health_check", side_effect=Exception("Simulated health check failure"), ): results = orchestrator.run_complete_migration() assert results["overall_success"] is False assert "workflow_error" in results or len(results["errors"]) > 0 def test_orchestrator_rollback_functionality(self, temp_db_path): """Test orchestrator rollback functionality.""" orchestrator = MigrationOrchestrator(temp_db_path) # Create mock backup directory with tempfile.TemporaryDirectory() as backup_dir: backup_path = Path(backup_dir) # Create mock backup files cache_backup = backup_path / "cache" cache_backup.mkdir() (cache_backup / "test.db").touch() results = orchestrator.rollback_migration(backup_path) # Rollback should at least attempt to restore files assert isinstance(results, dict) assert "success" in results class TestMigrationRobustness: """Test migration system robustness and edge cases.""" def test_concurrent_migration_prevention(self, populated_database): """Test that concurrent migrations are handled properly.""" manager1 = DataMigrationManager(populated_database) manager2 = DataMigrationManager(populated_database) # Both should work independently as they use separate sessions results1 = manager1.validate_data_consistency() results2 = manager2.validate_data_consistency() assert results1["validation_success"] is True assert results2["validation_success"] is True def test_migration_with_large_dataset(self, temp_db_path): """Test migration performance with larger dataset.""" db = AdversaryDatabase(temp_db_path) # Create larger test dataset with db.get_session() as session: # Create multiple scan executions with threat findings for i in range(10): scan_execution = ScanEngineExecution( scan_id=f"scan-{i}", trigger_source="test", scan_type="file", target_path=f"/test/file-{i}.py", threats_found=3, execution_start=time.time(), execution_end=time.time() + 1.0, ) session.add(scan_execution) # Add threat findings (creating intentional mismatch) for j in range(2): # Only 2 threats instead of 3 threat = ThreatFinding( scan_id=f"scan-{i}", finding_uuid=f"threat-{i}-{j}", scanner_source="test", category="test", severity="medium", file_path=f"/test/file-{i}.py", line_start=j + 1, line_end=j + 1, title=f"Threat {j}", timestamp=time.time(), ) session.add(threat) session.commit() manager = DataMigrationManager(db) # First validate to ensure we have inconsistencies validation = manager.validate_data_consistency() assert validation["validation_success"] is True expected_inconsistencies = validation["total_inconsistencies"] # Run migration on larger dataset start_time = time.time() results = manager.fix_summary_field_inconsistencies() duration = time.time() - start_time assert results["migration_success"] is True # The number of records fixed should match the inconsistencies found assert results["total_records_fixed"] == expected_inconsistencies assert duration < 30 # Should complete within 30 seconds # Verify post-migration consistency post_validation = manager.validate_data_consistency() assert post_validation["total_inconsistencies"] == 0 db.close() def test_migration_partial_failure_recovery(self, populated_database): """Test migration recovery from partial failures.""" manager = DataMigrationManager(populated_database) # First, ensure we have inconsistencies validation = manager.validate_data_consistency() assert validation["total_inconsistencies"] > 0 # Mock a partial failure scenario by patching one of the fix methods with patch.object( manager, "_fix_cli_command_findings_count", side_effect=Exception("Simulated failure"), ): results = manager.fix_summary_field_inconsistencies() # Should handle the error gracefully assert results["migration_success"] is False assert "error" in results def test_migration_system_state_preservation(self, populated_database): """Test that migration preserves system state on failure.""" manager = DataMigrationManager(populated_database) # Record initial state with populated_database.get_session() as session: initial_scan_count = session.query(ScanEngineExecution).count() initial_threat_count = session.query(ThreatFinding).count() # Force a failure scenario with patch.object( manager, "_fix_scan_engine_threats_found", side_effect=Exception("Simulated database error"), ): results = manager.fix_summary_field_inconsistencies() assert results["migration_success"] is False # Verify state is preserved (rollback occurred) with populated_database.get_session() as session: final_scan_count = session.query(ScanEngineExecution).count() final_threat_count = session.query(ThreatFinding).count() assert final_scan_count == initial_scan_count assert final_threat_count == initial_threat_count @pytest.mark.integration class TestEndToEndMigrationScenarios: """End-to-end migration scenario tests.""" def test_fresh_installation_migration(self, temp_db_path): """Test migration scenario for fresh installation.""" orchestrator = MigrationOrchestrator(temp_db_path) results = orchestrator.run_complete_migration( skip_legacy=True, # Fresh installation has no legacy files backup=False, # No existing data to backup ) assert results["overall_success"] is True assert ( results["phases"]["data_migration"]["skipped"] is True ) # No data to migrate # Final state should be healthy summary = results["summary"] assert summary["final_health_status"] in ["healthy", "fair"] def test_upgrade_from_legacy_system( self, temp_legacy_cache_dir, temp_legacy_metrics_dir, temp_db_path ): """Test migration scenario upgrading from legacy system.""" with ( patch( "adversary_mcp_server.migration.database_migration.get_app_cache_dir" ) as mock_cache, patch( "adversary_mcp_server.migration.database_migration.get_app_metrics_dir" ) as mock_metrics, ): mock_cache.return_value = temp_legacy_cache_dir mock_metrics.return_value = temp_legacy_metrics_dir orchestrator = MigrationOrchestrator(temp_db_path) results = orchestrator.run_complete_migration(backup=True) # Debug: check results if test fails if not results["overall_success"]: # Migration failed - error details are in results["errors"] for phase_name, phase_data in results.get("phases", {}).items(): if not phase_data.get("success", True): # Phase failed - details are in phase_data pass # The overall success depends on whether all critical phases succeeded # Legacy migration failure might not be critical if no files exist legacy_phase = results["phases"]["legacy_migration"] if legacy_phase.get("skipped"): # If legacy migration was skipped (no files found), that's acceptable assert results["overall_success"] is True else: # If legacy migration ran, it should succeed assert legacy_phase.get("success", False) is True assert results["overall_success"] is True def test_maintenance_migration_with_inconsistencies(self, populated_database): """Test migration scenario for maintenance with data inconsistencies.""" orchestrator = MigrationOrchestrator(populated_database.db_path) results = orchestrator.run_complete_migration( skip_legacy=True, # No legacy files in maintenance scenario backup=True, ) assert results["overall_success"] is True # Should have fixed data inconsistencies data_phase = results["phases"]["data_migration"] assert data_phase["success"] is True assert data_phase["records_fixed"] > 0 # Final state should be consistent assert results["summary"]["final_data_consistency"] is True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server