Skip to main content
Glama

Adversary MCP Server

by brettbergin
test_maintenance.py30.7 kB
"""Tests for database maintenance module.""" import sqlite3 import tempfile import time from pathlib import Path from unittest.mock import Mock, patch import pytest from sqlalchemy import text from adversary_mcp_server.telemetry.maintenance import ( DatabaseMaintenanceManager, get_maintenance_recommendations, run_database_maintenance, ) @pytest.fixture def temp_db_path(): """Create a temporary database file for testing.""" with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: db_path = Path(f.name) # Create a simple test database conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() # Create test tables cursor.execute( """ CREATE TABLE mcp_tool_executions ( id INTEGER PRIMARY KEY, tool_name TEXT, execution_start REAL, session_id TEXT ) """ ) cursor.execute( """ CREATE TABLE cli_command_executions ( id INTEGER PRIMARY KEY, command_name TEXT, execution_start REAL, args TEXT ) """ ) cursor.execute( """ CREATE TABLE cache_operation_metrics ( id INTEGER PRIMARY KEY, operation TEXT, timestamp REAL, cache_type TEXT ) """ ) cursor.execute( """ CREATE TABLE scan_executions ( id INTEGER PRIMARY KEY, file_path TEXT, execution_start REAL, scanner_type TEXT ) """ ) cursor.execute( """ CREATE TABLE threat_findings ( id INTEGER PRIMARY KEY, rule_id TEXT, timestamp REAL, severity TEXT ) """ ) cursor.execute( """ CREATE TABLE system_health ( id INTEGER PRIMARY KEY, metric_name TEXT, timestamp REAL, value REAL ) """ ) # Create test indexes cursor.execute("CREATE INDEX idx_mcp_tool_name ON mcp_tool_executions(tool_name)") cursor.execute( "CREATE INDEX idx_cli_command_name ON cli_command_executions(command_name)" ) cursor.execute("CREATE INDEX idx_threat_severity ON threat_findings(severity)") # Insert some test data current_time = time.time() old_time = current_time - (100 * 24 * 3600) # 100 days ago cursor.execute( """ INSERT INTO mcp_tool_executions (tool_name, execution_start, session_id) VALUES (?, ?, ?) """, ("test_tool", current_time, "session1"), ) cursor.execute( """ INSERT INTO mcp_tool_executions (tool_name, execution_start, session_id) VALUES (?, ?, ?) """, ("old_tool", old_time, "session2"), ) cursor.execute( """ INSERT INTO cli_command_executions (command_name, execution_start, args) VALUES (?, ?, ?) """, ("test_command", current_time, "{}"), ) cursor.execute( """ INSERT INTO cache_operation_metrics (operation, timestamp, cache_type) VALUES (?, ?, ?) """, ("get", old_time, "scan"), ) cursor.execute( """ INSERT INTO threat_findings (rule_id, timestamp, severity) VALUES (?, ?, ?) """, ("rule1", old_time, "high"), ) cursor.execute( """ INSERT INTO system_health (metric_name, timestamp, value) VALUES (?, ?, ?) """, ("cpu_usage", old_time, 75.5), ) conn.commit() conn.close() yield db_path # Cleanup db_path.unlink(missing_ok=True) @pytest.fixture def empty_db_path(): """Create an empty database file for testing.""" with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: db_path = Path(f.name) yield db_path # Cleanup db_path.unlink(missing_ok=True) @pytest.fixture def nonexistent_db_path(): """Provide a path to a nonexistent database.""" return Path("/tmp/nonexistent_test_db.db") class TestDatabaseMaintenanceManager: """Test DatabaseMaintenanceManager functionality.""" def test_initialization(self, temp_db_path): """Test manager initialization.""" manager = DatabaseMaintenanceManager(temp_db_path) assert manager.db_path == temp_db_path def test_run_maintenance_default_operations(self, temp_db_path): """Test running maintenance with default operations.""" manager = DatabaseMaintenanceManager(temp_db_path) results = manager.run_maintenance() assert "started_at" in results assert "completed_at" in results assert "maintenance_duration" in results assert "database_size_before" in results assert "database_size_after" in results assert "size_reduction_bytes" in results assert "size_reduction_percent" in results # Check that all default operations ran assert "analyze" in results["operations"] assert "vacuum" in results["operations"] assert "integrity_check" in results["operations"] assert "index_check" in results["operations"] # All operations should succeed for operation, result in results["operations"].items(): assert result["success"] is True def test_run_maintenance_specific_operations(self, temp_db_path): """Test running maintenance with specific operations.""" manager = DatabaseMaintenanceManager(temp_db_path) results = manager.run_maintenance(operations=["analyze", "vacuum"]) assert len(results["operations"]) == 2 assert "analyze" in results["operations"] assert "vacuum" in results["operations"] assert "integrity_check" not in results["operations"] def test_run_maintenance_with_cleanup(self, temp_db_path): """Test running maintenance with cleanup operation.""" manager = DatabaseMaintenanceManager(temp_db_path) results = manager.run_maintenance(operations=["cleanup"]) assert "cleanup" in results["operations"] cleanup_result = results["operations"]["cleanup"] assert cleanup_result["success"] is True assert "total_records_deleted" in cleanup_result assert cleanup_result["total_records_deleted"] > 0 # Should delete old records def test_run_maintenance_nonexistent_db(self, nonexistent_db_path): """Test maintenance on nonexistent database.""" manager = DatabaseMaintenanceManager(nonexistent_db_path) results = manager.run_maintenance() # Should complete but operations may fail gracefully assert "operations" in results assert "database_size_before" in results assert "database_size_after" in results def test_run_maintenance_unknown_operation(self, temp_db_path): """Test maintenance with unknown operation.""" manager = DatabaseMaintenanceManager(temp_db_path) with patch("adversary_mcp_server.telemetry.maintenance.logger") as mock_logger: results = manager.run_maintenance(operations=["unknown_operation"]) assert len(results["operations"]) == 0 mock_logger.warning.assert_called_once_with( "Unknown maintenance operation: unknown_operation" ) def test_run_analyze(self, temp_db_path): """Test _run_analyze method.""" manager = DatabaseMaintenanceManager(temp_db_path) result = manager._run_analyze() assert result["success"] is True assert "duration" in result assert "tables_analyzed" in result assert "description" in result assert result["tables_analyzed"] > 0 def test_run_vacuum(self, temp_db_path): """Test _run_vacuum method.""" manager = DatabaseMaintenanceManager(temp_db_path) result = manager._run_vacuum() assert result["success"] is True assert "duration" in result assert "size_before_bytes" in result assert "size_after_bytes" in result assert "space_reclaimed_bytes" in result assert "space_reclaimed_percent" in result assert "description" in result def test_run_integrity_check_healthy_db(self, temp_db_path): """Test integrity check on healthy database.""" manager = DatabaseMaintenanceManager(temp_db_path) result = manager._run_integrity_check() assert result["success"] is True assert result["quick_check_result"] == "ok" assert result["issues_found"] == 0 assert result["issues"] == [] assert "passed" in result["description"] def test_run_integrity_check_corrupted_db(self, temp_db_path): """Test integrity check on corrupted database (mocked).""" manager = DatabaseMaintenanceManager(temp_db_path) # Mock the SQLAlchemy engine connection to return corruption with patch.object(manager.engine, "connect") as mock_connect: mock_conn = Mock() mock_connect.return_value.__enter__.return_value = mock_conn # Mock quick check to fail mock_result = Mock() mock_result.fetchone.side_effect = [ ("error: database disk image is malformed",), # quick check ] mock_conn.execute.return_value = mock_result # Mock full integrity check mock_integrity_result = Mock() mock_integrity_result.fetchall.return_value = [ ("error: database disk image is malformed",) ] mock_conn.execute.side_effect = [mock_result, mock_integrity_result] result = manager._run_integrity_check() assert result["success"] is False assert result["issues_found"] == 1 assert len(result["issues"]) == 1 assert "failed" in result["description"] def test_check_indexes(self, temp_db_path): """Test _check_indexes method.""" manager = DatabaseMaintenanceManager(temp_db_path) result = manager._check_indexes() assert result["success"] is True assert "duration" in result assert "total_indexes" in result assert "used_indexes" in result assert "unused_indexes" in result assert "index_details" in result assert result["total_indexes"] > 0 # We created some indexes def test_cleanup_old_data(self, temp_db_path): """Test _cleanup_old_data method.""" manager = DatabaseMaintenanceManager(temp_db_path) # Use 50 days to keep some and delete some result = manager._cleanup_old_data(days_to_keep=50) assert result["success"] is True assert "duration" in result assert "total_records_deleted" in result assert "cutoff_days" in result assert "cutoff_timestamp" in result assert "table_results" in result assert result["total_records_deleted"] > 0 # Should delete old records # Check individual table results for table_name, table_result in result["table_results"].items(): assert "records_deleted" in table_result assert "success" in table_result def test_cleanup_old_data_nonexistent_table(self, temp_db_path): """Test cleanup with nonexistent table.""" manager = DatabaseMaintenanceManager(temp_db_path) # Add a table to the cleanup list that doesn't exist original_tables = [ ("mcp_tool_executions", "execution_start"), ("nonexistent_table", "timestamp"), ] with patch.object(manager, "_cleanup_old_data") as mock_cleanup: # Simulate the actual method behavior mock_cleanup.return_value = { "success": True, "duration": 0.1, "total_records_deleted": 1, "cutoff_days": 90, "cutoff_timestamp": time.time() - 7776000, "table_results": { "mcp_tool_executions": {"records_deleted": 1, "success": True}, "nonexistent_table": {"records_deleted": 0, "success": True}, }, "description": "Cleaned up 1 old records older than 90 days", } result = manager._cleanup_old_data() assert result["success"] is True assert "nonexistent_table" in result["table_results"] def test_get_engine_connection(self, temp_db_path): """Test SQLAlchemy engine connection.""" manager = DatabaseMaintenanceManager(temp_db_path) with manager.engine.connect() as conn: result = conn.execute( text("SELECT name FROM sqlite_master WHERE type='table'") ) tables = result.fetchall() assert len(tables) > 0 def test_get_database_info_existing_db(self, temp_db_path): """Test get_database_info with existing database.""" manager = DatabaseMaintenanceManager(temp_db_path) info = manager.get_database_info() assert info["exists"] is True assert info["path"] == str(temp_db_path) assert info["size_bytes"] > 0 assert info["size_mb"] > 0 assert "modified_time" in info assert "sqlite_version" in info assert "settings" in info assert "tables" in info assert "total_records" in info assert "index_count" in info # Check settings settings = info["settings"] assert "page_size" in settings assert "cache_size" in settings assert "synchronous" in settings # Check tables tables = info["tables"] assert "mcp_tool_executions" in tables assert tables["mcp_tool_executions"]["row_count"] == 2 assert info["total_records"] > 0 assert info["index_count"] > 0 def test_get_database_info_nonexistent_db(self, nonexistent_db_path): """Test get_database_info with nonexistent database.""" manager = DatabaseMaintenanceManager(nonexistent_db_path) # Ensure the database file doesn't exist if nonexistent_db_path.exists(): nonexistent_db_path.unlink() info = manager.get_database_info() assert info["exists"] is False # Only "exists" key should be present for nonexistent database assert len(info) == 1 assert list(info.keys()) == ["exists"] def test_get_database_info_with_error(self, temp_db_path): """Test get_database_info with database error.""" manager = DatabaseMaintenanceManager(temp_db_path) # Mock SQLAlchemy engine connect to raise exception with patch.object(manager.engine, "connect") as mock_connect: mock_connect.side_effect = Exception("Database error") info = manager.get_database_info() assert info["exists"] is True assert "error" in info assert "Database error" in info["error"] def test_maintenance_operation_exception_handling(self, temp_db_path): """Test exception handling in maintenance operations.""" manager = DatabaseMaintenanceManager(temp_db_path) # Mock _run_analyze to raise exception with patch.object(manager, "_run_analyze") as mock_analyze: mock_analyze.side_effect = Exception("Test error") results = manager.run_maintenance(operations=["analyze"]) assert "analyze" in results["operations"] assert results["operations"]["analyze"]["success"] is False assert results["operations"]["analyze"]["error"] == "Test error" def test_maintenance_size_calculation(self, temp_db_path): """Test database size calculation during maintenance.""" manager = DatabaseMaintenanceManager(temp_db_path) # Get initial size initial_size = temp_db_path.stat().st_size results = manager.run_maintenance() assert results["database_size_before"] == initial_size assert results["database_size_after"] > 0 assert isinstance(results["size_reduction_bytes"], int) assert isinstance(results["size_reduction_percent"], int | float) def test_cleanup_with_database_error(self, temp_db_path): """Test cleanup operation with database error.""" manager = DatabaseMaintenanceManager(temp_db_path) # Mock SQLAlchemy engine connection to cause error for specific table with patch.object(manager.engine, "connect") as mock_connect: mock_conn = Mock() mock_connect.return_value.__enter__.return_value = mock_conn # Mock inspector to return some tables mock_inspector = Mock() mock_inspector.get_table_names.return_value = ["mcp_tool_executions"] with patch( "adversary_mcp_server.telemetry.maintenance.inspect", return_value=mock_inspector, ): # Make execute raise exception for count operation mock_conn.execute.side_effect = Exception("Table error") result = manager._cleanup_old_data() # Should handle errors gracefully assert result["success"] is True assert result["total_records_deleted"] == 0 class TestMaintenanceFunctions: """Test module-level maintenance functions.""" def test_run_database_maintenance_with_path(self, temp_db_path): """Test run_database_maintenance function with specific path.""" results = run_database_maintenance(db_path=temp_db_path) assert "operations" in results assert len(results["operations"]) == 4 # Default operations def test_run_database_maintenance_default_path(self): """Test run_database_maintenance function with default path.""" with patch( "adversary_mcp_server.telemetry.maintenance.DatabaseMaintenanceManager" ) as mock_manager_class: mock_manager = Mock() mock_manager.run_maintenance.return_value = {"test": "result"} mock_manager_class.return_value = mock_manager results = run_database_maintenance() # Should use default path expected_path = ( Path.home() / ".local/share/adversary-mcp-server/cache/adversary.db" ) mock_manager_class.assert_called_once_with(expected_path) assert results == {"test": "result"} def test_run_database_maintenance_with_operations(self, temp_db_path): """Test run_database_maintenance function with specific operations.""" results = run_database_maintenance( db_path=temp_db_path, operations=["analyze", "vacuum"] ) assert len(results["operations"]) == 2 assert "analyze" in results["operations"] assert "vacuum" in results["operations"] class TestMaintenanceRecommendations: """Test maintenance recommendation functions.""" def test_get_maintenance_recommendations_nonexistent_db(self): """Test recommendations for nonexistent database.""" db_info = {"exists": False} recommendations = get_maintenance_recommendations(db_info) assert len(recommendations) == 1 assert recommendations[0] == "Database does not exist" def test_get_maintenance_recommendations_large_db(self): """Test recommendations for large database.""" db_info = { "exists": True, "size_mb": 150.0, "total_records": 50000, "settings": { "auto_vacuum": 1, "synchronous": 1, }, } recommendations = get_maintenance_recommendations(db_info) assert any( "large" in rec.lower() and "vacuum" in rec.lower() for rec in recommendations ) def test_get_maintenance_recommendations_high_record_count(self): """Test recommendations for high record count.""" db_info = { "exists": True, "size_mb": 50.0, "total_records": 150000, "settings": { "auto_vacuum": 1, "synchronous": 1, }, } recommendations = get_maintenance_recommendations(db_info) assert any( "record count" in rec.lower() and "cleanup" in rec.lower() for rec in recommendations ) def test_get_maintenance_recommendations_auto_vacuum_disabled(self): """Test recommendations when auto-vacuum is disabled.""" db_info = { "exists": True, "size_mb": 50.0, "total_records": 50000, "settings": { "auto_vacuum": 0, # Disabled "synchronous": 1, }, } recommendations = get_maintenance_recommendations(db_info) assert any("auto-vacuum" in rec.lower() for rec in recommendations) def test_get_maintenance_recommendations_full_synchronous(self): """Test recommendations for full synchronous mode.""" db_info = { "exists": True, "size_mb": 50.0, "total_records": 50000, "settings": { "auto_vacuum": 1, "synchronous": 2, # FULL }, } recommendations = get_maintenance_recommendations(db_info) assert any( "synchronous" in rec.lower() and "performance" in rec.lower() for rec in recommendations ) def test_get_maintenance_recommendations_good_condition(self): """Test recommendations for database in good condition.""" db_info = { "exists": True, "size_mb": 50.0, "total_records": 50000, "settings": { "auto_vacuum": 1, "synchronous": 1, }, } recommendations = get_maintenance_recommendations(db_info) assert len(recommendations) == 1 assert "good condition" in recommendations[0] def test_get_maintenance_recommendations_multiple_issues(self): """Test recommendations when multiple issues exist.""" db_info = { "exists": True, "size_mb": 150.0, # Large "total_records": 150000, # High record count "settings": { "auto_vacuum": 0, # Disabled "synchronous": 2, # FULL }, } recommendations = get_maintenance_recommendations(db_info) assert len(recommendations) == 4 # All issues should be detected assert any("large" in rec.lower() for rec in recommendations) assert any("record count" in rec.lower() for rec in recommendations) assert any("auto-vacuum" in rec.lower() for rec in recommendations) assert any("synchronous" in rec.lower() for rec in recommendations) def test_get_maintenance_recommendations_missing_settings(self): """Test recommendations with missing settings.""" db_info = { "exists": True, "size_mb": 50.0, "total_records": 50000, "settings": {}, # Missing settings } recommendations = get_maintenance_recommendations(db_info) # Should not crash and provide default recommendation assert len(recommendations) >= 1 def test_get_maintenance_recommendations_missing_fields(self): """Test recommendations with missing fields.""" db_info = {"exists": True} # Missing other fields recommendations = get_maintenance_recommendations(db_info) # Should handle missing fields gracefully assert len(recommendations) >= 1 class TestEdgeCases: """Test edge cases and error conditions.""" def test_empty_database(self, empty_db_path): """Test operations on empty database.""" # Create empty database file sqlite3.connect(str(empty_db_path)).close() manager = DatabaseMaintenanceManager(empty_db_path) results = manager.run_maintenance() # Should complete successfully even with empty database assert results["database_size_before"] >= 0 assert all( op["success"] for op in results["operations"].values() if "success" in op ) def test_locked_database(self, temp_db_path): """Test operations on locked database.""" manager = DatabaseMaintenanceManager(temp_db_path) # Open a connection to lock the database lock_conn = sqlite3.connect(str(temp_db_path)) lock_conn.execute("BEGIN EXCLUSIVE") try: # Mock the SQLAlchemy engine connection to simulate lock timeout with patch.object(manager.engine, "connect") as mock_connect: mock_connect.side_effect = sqlite3.OperationalError( "database is locked" ) # Should handle locked database gracefully results = manager.run_maintenance(operations=["analyze"]) assert "analyze" in results["operations"] assert results["operations"]["analyze"]["success"] is False finally: lock_conn.close() def test_corrupted_database_file(self, temp_db_path): """Test operations on corrupted database file.""" # Corrupt the database by writing invalid data with open(temp_db_path, "wb") as f: f.write(b"invalid database content") manager = DatabaseMaintenanceManager(temp_db_path) # Should handle corruption gracefully results = manager.run_maintenance() # Operations may fail, but should not crash assert "operations" in results def test_permission_denied_database(self, temp_db_path): """Test operations when database access is denied.""" manager = DatabaseMaintenanceManager(temp_db_path) # Mock SQLAlchemy engine connection to simulate permission denied with patch.object(manager.engine, "connect") as mock_connect: mock_connect.side_effect = sqlite3.OperationalError("access is denied") results = manager.run_maintenance(operations=["analyze"]) assert "analyze" in results["operations"] assert results["operations"]["analyze"]["success"] is False def test_disk_full_during_vacuum(self, temp_db_path): """Test vacuum operation when disk is full.""" manager = DatabaseMaintenanceManager(temp_db_path) # Mock the _run_vacuum method directly to simulate disk full error with patch.object(manager, "_run_vacuum") as mock_vacuum: mock_vacuum.return_value = { "success": False, "duration": 0.1, "size_before_bytes": 1024, "size_after_bytes": 1024, "space_reclaimed_bytes": 0, "space_reclaimed_percent": 0.0, "description": "VACUUM operation failed: database or disk is full", "error": "database or disk is full", } result = manager._run_vacuum() # Should handle disk full error gracefully assert "success" in result assert result["success"] is False assert "error" in result assert "disk is full" in result["error"] class TestIntegration: """Integration tests for maintenance operations.""" def test_full_maintenance_workflow(self, temp_db_path): """Test complete maintenance workflow.""" manager = DatabaseMaintenanceManager(temp_db_path) # Get initial database info info_before = manager.get_database_info() initial_records = info_before["total_records"] # Run full maintenance results = manager.run_maintenance() # Verify all operations completed assert len(results["operations"]) == 4 assert all(op["success"] for op in results["operations"].values()) # Get final database info info_after = manager.get_database_info() # Database should still be healthy assert info_after["exists"] is True assert ( info_after["total_records"] <= initial_records ) # May be less due to cleanup def test_maintenance_with_recommendations(self, temp_db_path): """Test maintenance workflow with recommendations.""" manager = DatabaseMaintenanceManager(temp_db_path) # Get database info and recommendations db_info = manager.get_database_info() recommendations = get_maintenance_recommendations(db_info) # Run maintenance based on recommendations if any("vacuum" in rec.lower() for rec in recommendations): results = manager.run_maintenance(operations=["vacuum"]) assert results["operations"]["vacuum"]["success"] is True if any("cleanup" in rec.lower() for rec in recommendations): results = manager.run_maintenance(operations=["cleanup"]) assert results["operations"]["cleanup"]["success"] is True def test_repeated_maintenance_operations(self, temp_db_path): """Test running maintenance operations multiple times.""" manager = DatabaseMaintenanceManager(temp_db_path) # Run maintenance multiple times for i in range(3): results = manager.run_maintenance(operations=["analyze", "vacuum"]) # Should succeed each time assert results["operations"]["analyze"]["success"] is True assert results["operations"]["vacuum"]["success"] is True def test_maintenance_on_actively_used_database(self, temp_db_path): """Test maintenance while database is being used.""" manager = DatabaseMaintenanceManager(temp_db_path) # Open a read connection to simulate active usage read_conn = sqlite3.connect(str(temp_db_path)) cursor = read_conn.cursor() try: # Run maintenance (should work with read connections open) results = manager.run_maintenance(operations=["analyze"]) # Should succeed assert results["operations"]["analyze"]["success"] is True # Read connection should still work cursor.execute("SELECT COUNT(*) FROM mcp_tool_executions") count = cursor.fetchone()[0] assert count >= 0 finally: read_conn.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server