Skip to main content
Glama

Adversary MCP Server

by brettbergin
database_migration.py20.8 kB
"""Database migration utilities to consolidate existing SQLite files.""" import json import sqlite3 import time from pathlib import Path from typing import Any from sqlalchemy import create_engine, inspect, text from ..config import get_app_cache_dir, get_app_metrics_dir from ..database.models import AdversaryDatabase from ..logger import get_logger from ..telemetry.service import TelemetryService logger = get_logger("database_migration") class DatabaseMigrationManager: """Manages migration from legacy systems to unified telemetry database.""" def __init__(self, target_db_path: Path = None): """Initialize migration manager. Args: target_db_path: Path to target unified database (default: standard location) """ self.target_db_path = ( target_db_path or Path( "~/.local/share/adversary-mcp-server/cache/adversary.db" ).expanduser() ) self.cache_dir = get_app_cache_dir() self.metrics_dir = get_app_metrics_dir() logger.info(f"Initializing database migration to {self.target_db_path}") def run_full_migration(self, backup: bool = True) -> dict[str, Any]: """Run complete migration process. Args: backup: Whether to backup existing files before migration Returns: Migration results summary """ logger.info("=== Starting Full Database Migration ===") migration_results = { "started_at": time.time(), "backup_created": False, "legacy_sqlite_files": [], "json_metrics_files": [], "cache_metadata_migrated": False, "legacy_metrics_migrated": False, "errors": [], "warnings": [], } try: # Create backup if requested if backup: backup_path = self._create_backup() migration_results["backup_created"] = True migration_results["backup_path"] = str(backup_path) logger.info(f"Backup created at: {backup_path}") # Initialize target database target_db = AdversaryDatabase(self.target_db_path) telemetry_service = TelemetryService(target_db) # Migrate legacy SQLite files sqlite_results = self._migrate_legacy_sqlite_files(telemetry_service) migration_results.update(sqlite_results) # Migrate JSON metrics files json_results = self._migrate_json_metrics_files(telemetry_service) migration_results.update(json_results) # Clean up old files (optional) cleanup_results = self._cleanup_old_files() migration_results["cleanup_results"] = cleanup_results target_db.close() except Exception as e: error_msg = f"Migration failed: {e}" logger.error(error_msg, exc_info=True) migration_results["errors"].append(error_msg) migration_results["completed_at"] = time.time() # Type assertions for arithmetic operations to fix MyPy object arithmetic errors started_at = migration_results["started_at"] completed_at = migration_results["completed_at"] assert isinstance( started_at, int | float ), f"started_at should be numeric, got {type(started_at)}" assert isinstance( completed_at, int | float ), f"completed_at should be numeric, got {type(completed_at)}" migration_results["duration_seconds"] = completed_at - started_at logger.info("=== Database Migration Complete ===") self._log_migration_summary(migration_results) return migration_results def _create_backup(self) -> Path: """Create backup of existing data.""" timestamp = int(time.time()) backup_dir = self.target_db_path.parent / f"migration_backup_{timestamp}" backup_dir.mkdir(parents=True, exist_ok=True) # Backup cache directory if self.cache_dir.exists(): cache_backup = backup_dir / "cache" self._copy_directory(self.cache_dir, cache_backup) logger.debug(f"Cache directory backed up to: {cache_backup}") # Backup metrics directory if self.metrics_dir.exists(): metrics_backup = backup_dir / "metrics" self._copy_directory(self.metrics_dir, metrics_backup) logger.debug(f"Metrics directory backed up to: {metrics_backup}") return backup_dir def _copy_directory(self, src: Path, dst: Path) -> None: """Copy directory contents.""" import shutil if src.exists(): shutil.copytree(str(src), str(dst), dirs_exist_ok=True) def _migrate_legacy_sqlite_files( self, telemetry_service: TelemetryService ) -> dict[str, Any]: """Migrate existing SQLite database files.""" results = { "legacy_sqlite_files": [], "cache_metadata_migrated": False, "records_migrated": 0, } logger.info("Migrating legacy SQLite files...") # Look for existing cache metadata database cache_metadata_db = self.cache_dir / "cache_metadata.db" if cache_metadata_db.exists(): try: records_migrated = self._migrate_cache_metadata( cache_metadata_db, telemetry_service ) results["cache_metadata_migrated"] = True # Fix MyPy object arithmetic error with type assertion current_records = results.get("records_migrated", 0) assert isinstance( current_records, int ), f"records_migrated should be int, got {type(current_records)}" results["records_migrated"] = current_records + records_migrated results["legacy_sqlite_files"].append( { "path": str(cache_metadata_db), "type": "cache_metadata", "records_migrated": records_migrated, } ) logger.info(f"Migrated {records_migrated} cache metadata records") except Exception as e: error_msg = f"Failed to migrate cache metadata: {e}" logger.error(error_msg) results.setdefault("errors", []).append(error_msg) # Look for other SQLite files in cache directory sqlite_files = list(self.cache_dir.glob("*.db")) for sqlite_file in sqlite_files: if ( sqlite_file.name != "cache_metadata.db" and sqlite_file != self.target_db_path ): try: records_migrated = self._migrate_generic_sqlite( sqlite_file, telemetry_service ) # Fix MyPy object arithmetic error with type assertion current_records = results.get("records_migrated", 0) assert isinstance( current_records, int ), f"records_migrated should be int, got {type(current_records)}" results["records_migrated"] = current_records + records_migrated results["legacy_sqlite_files"].append( { "path": str(sqlite_file), "type": "generic", "records_migrated": records_migrated, } ) logger.info( f"Migrated {records_migrated} records from {sqlite_file.name}" ) except Exception as e: error_msg = f"Failed to migrate {sqlite_file.name}: {e}" logger.warning(error_msg) results.setdefault("warnings", []).append(error_msg) return results def _migrate_cache_metadata( self, db_path: Path, telemetry_service: TelemetryService ) -> int: """Migrate cache metadata database.""" conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cursor = conn.cursor() records_migrated = 0 try: # Check if cache_entries table exists cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='cache_entries'" ) if cursor.fetchone(): cursor.execute("SELECT * FROM cache_entries") for row in cursor.fetchall(): try: # Convert cache entry to telemetry cache operation telemetry_service.track_cache_operation( operation_type="historical", cache_name="legacy_cache", key_hash=str(row.get("key_hash", "unknown"))[:16], key_metadata={ "legacy_key": row.get("cache_key", ""), "migrated_from": "cache_metadata", }, size_bytes=row.get("size_bytes"), ttl_seconds=row.get("ttl_seconds"), access_time_ms=1.0, # Historical data ) records_migrated += 1 except Exception as e: logger.debug(f"Failed to migrate cache entry: {e}") finally: conn.close() return records_migrated def _migrate_generic_sqlite( self, db_path: Path, telemetry_service: TelemetryService ) -> int: """Migrate generic SQLite database files using SQLAlchemy Inspector for safety.""" records_migrated = 0 try: # Create SQLAlchemy engine for safe schema introspection engine = create_engine(f"sqlite:///{db_path}", echo=False) with engine.connect() as conn: # Use SQLAlchemy Inspector for safe schema introspection inspector = inspect(conn) table_names = inspector.get_table_names() for table_name in table_names: logger.debug(f"Processing table: {table_name}") try: # Use SQLAlchemy's safe identifier quoting quoted_table = conn.dialect.identifier_preparer.quote( table_name ) # Execute safe parameterized query with LIMIT for safety result = conn.execute( text(f"SELECT * FROM {quoted_table} LIMIT 100") ) for row in result: # Create generic system health record for historical data telemetry_service.record_system_health_snapshot( cpu_percent=None, memory_percent=None, memory_used_mb=None, db_size_mb=None, cache_entries_count=1, # Mark as historical record ) records_migrated += 1 except Exception as e: logger.debug(f"Failed to process table {table_name}: {e}") except Exception as e: logger.error(f"Failed to migrate SQLite database {db_path}: {e}") return records_migrated def _migrate_json_metrics_files( self, telemetry_service: TelemetryService ) -> dict[str, Any]: """Migrate JSON metrics files.""" results = { "json_metrics_files": [], "legacy_metrics_migrated": False, "metrics_records_migrated": 0, } logger.info("Migrating JSON metrics files...") # Look for JSON metrics files json_files = list(self.metrics_dir.glob("*.json")) if not json_files: logger.info("No JSON metrics files found") return results for json_file in json_files: try: records_migrated = self._migrate_json_file(json_file, telemetry_service) # Fix MyPy object arithmetic error with type assertion current_metrics = results.get("metrics_records_migrated", 0) assert isinstance( current_metrics, int ), f"metrics_records_migrated should be int, got {type(current_metrics)}" results["metrics_records_migrated"] = current_metrics + records_migrated results["json_metrics_files"].append( {"path": str(json_file), "records_migrated": records_migrated} ) if records_migrated > 0: results["legacy_metrics_migrated"] = True logger.info( f"Migrated {records_migrated} metrics from {json_file.name}" ) except Exception as e: error_msg = f"Failed to migrate {json_file.name}: {e}" logger.warning(error_msg) results.setdefault("warnings", []).append(error_msg) return results def _migrate_json_file( self, json_file: Path, telemetry_service: TelemetryService ) -> int: """Migrate individual JSON metrics file.""" with open(json_file) as f: data = json.load(f) records_migrated = 0 # Extract metadata metadata = data.get("metadata", {}) collected_at = metadata.get("collected_at", time.time()) # Migrate legacy metrics as system health snapshots legacy_metrics = data.get("legacy_metrics", {}) if legacy_metrics: # Create a system health snapshot from legacy metrics telemetry_service.record_system_health_snapshot( cpu_percent=None, # Legacy data may not have system metrics memory_percent=None, memory_used_mb=None, db_size_mb=metadata.get("db_size_mb"), cache_entries_count=len(legacy_metrics), scans_per_hour=0.0, # Will be populated by actual scan data error_rate_1h=0.0, ) records_migrated += 1 # Migrate scan metrics as historical CLI commands scan_metrics = data.get("scan_metrics", {}) if scan_metrics and scan_metrics.get("total_scans", 0) > 0: # Create historical CLI command execution cli_exec = telemetry_service.start_cli_command_tracking( command_name="historical_scan", args={"migrated_from": str(json_file.name)}, subcommand="legacy_data", ) telemetry_service.complete_cli_command_tracking( execution_id=cli_exec.id, exit_code=0, findings_count=scan_metrics.get("total_findings", 0), ) records_migrated += 1 return records_migrated def _cleanup_old_files(self, confirm_cleanup: bool = False) -> dict[str, Any]: """Clean up old files after successful migration.""" cleanup_results = { "files_removed": [], "files_kept": [], "cleanup_enabled": confirm_cleanup, } if not confirm_cleanup: logger.info("Cleanup skipped (confirm_cleanup=False)") return cleanup_results # This would remove old files - keeping conservative for safety logger.info("File cleanup would occur here if enabled") return cleanup_results def _log_migration_summary(self, results: dict[str, Any]) -> None: """Log migration summary.""" duration = results.get("duration_seconds", 0) logger.info(f"Migration completed in {duration:.2f} seconds") logger.info( f"Legacy SQLite files processed: {len(results.get('legacy_sqlite_files', []))}" ) logger.info( f"JSON metrics files processed: {len(results.get('json_metrics_files', []))}" ) logger.info( f"Total records migrated: {results.get('records_migrated', 0) + results.get('metrics_records_migrated', 0)}" ) if results.get("errors"): logger.warning(f"Migration errors: {len(results['errors'])}") for error in results["errors"]: logger.warning(f" - {error}") if results.get("warnings"): logger.info(f"Migration warnings: {len(results['warnings'])}") def check_migration_needed(self) -> dict[str, Any]: """Check if migration is needed and what would be migrated.""" check_results = { "migration_needed": False, "target_db_exists": self.target_db_path.exists(), "legacy_sqlite_files": [], "json_metrics_files": [], "estimated_records": 0, } # Check for legacy SQLite files if self.cache_dir.exists(): sqlite_files = list(self.cache_dir.glob("*.db")) for sqlite_file in sqlite_files: if sqlite_file != self.target_db_path: check_results["legacy_sqlite_files"].append(str(sqlite_file)) check_results["migration_needed"] = True # Check for JSON metrics files if self.metrics_dir.exists(): json_files = list(self.metrics_dir.glob("*.json")) for json_file in json_files: check_results["json_metrics_files"].append(str(json_file)) check_results["migration_needed"] = True # Estimate record count with type assertions to fix MyPy len() errors legacy_sqlite_files = check_results["legacy_sqlite_files"] json_metrics_files = check_results["json_metrics_files"] assert isinstance( legacy_sqlite_files, list ), f"legacy_sqlite_files should be list, got {type(legacy_sqlite_files)}" assert isinstance( json_metrics_files, list ), f"json_metrics_files should be list, got {type(json_metrics_files)}" check_results["estimated_records"] = ( len(legacy_sqlite_files) * 10 + len(json_metrics_files) * 5 ) return check_results def run_migration_cli( target_db_path: str | None = None, backup: bool = True, confirm_cleanup: bool = False, ) -> int: """CLI entry point for running migration. Args: target_db_path: Target database path (optional) backup: Whether to create backup confirm_cleanup: Whether to cleanup old files after migration Returns: Exit code (0 for success, 1 for failure) """ try: target_path = Path(target_db_path) if target_db_path else None migration_manager = DatabaseMigrationManager(target_path) # Check if migration is needed check_results = migration_manager.check_migration_needed() if not check_results["migration_needed"]: logger.info("No migration needed - no legacy files found") return 0 logger.info("Migration Check Results:") logger.info( f" - Legacy SQLite files: {len(check_results['legacy_sqlite_files'])}" ) logger.info( f" - JSON metrics files: {len(check_results['json_metrics_files'])}" ) logger.info(f" - Estimated records: {check_results['estimated_records']}") logger.info(f" - Target DB exists: {check_results['target_db_exists']}") # Run migration logger.info("Starting migration...") results = migration_manager.run_full_migration(backup=backup) if results.get("errors"): logger.error(f"Migration completed with {len(results['errors'])} errors") return 1 else: logger.info("Migration completed successfully") return 0 except Exception as e: logger.error(f"Migration failed: {e}") return 1 if __name__ == "__main__": import sys # Simple CLI interface backup = "--no-backup" not in sys.argv cleanup = "--cleanup" in sys.argv target_path = None for i, arg in enumerate(sys.argv): if arg == "--target" and i + 1 < len(sys.argv): target_path = sys.argv[i + 1] exit_code = run_migration_cli(target_path, backup, cleanup) sys.exit(exit_code)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server