Skip to main content
Glama

Adversary MCP Server

by brettbergin
maintenance.py22.6 kB
"""Database maintenance utilities for telemetry system optimization.""" import time from pathlib import Path from typing import Any from sqlalchemy import create_engine, inspect, text from sqlalchemy.orm import sessionmaker from ..logger import get_logger logger = get_logger("telemetry.maintenance") class DatabaseMaintenanceManager: """Manager for database maintenance and optimization operations.""" def __init__(self, db_path: Path): self.db_path = db_path self.engine = None self.SessionLocal = None if self.db_path.exists(): self._initialize_engine() def run_maintenance(self, operations: list[str] | None = None) -> dict[str, Any]: """Run comprehensive database maintenance.""" operations = operations or [ "analyze", "vacuum", "integrity_check", "index_check", ] results = { "started_at": time.time(), "operations": {}, "database_size_before": 0, "database_size_after": 0, "maintenance_duration": 0, } # Get initial database size if self.db_path.exists(): results["database_size_before"] = self.db_path.stat().st_size logger.info(f"Starting database maintenance on {self.db_path}") # Run each maintenance operation for operation in operations: try: if operation == "analyze": results["operations"]["analyze"] = self._run_analyze() elif operation == "vacuum": results["operations"]["vacuum"] = self._run_vacuum() elif operation == "integrity_check": results["operations"][ "integrity_check" ] = self._run_integrity_check() elif operation == "index_check": results["operations"]["index_check"] = self._check_indexes() elif operation == "cleanup": results["operations"]["cleanup"] = self._cleanup_old_data() else: logger.warning(f"Unknown maintenance operation: {operation}") except Exception as e: logger.error(f"Failed to run {operation}: {e}") results["operations"][operation] = {"success": False, "error": str(e)} # Get final database size if self.db_path.exists(): results["database_size_after"] = self.db_path.stat().st_size results["completed_at"] = time.time() # Type assertions for arithmetic operations to fix MyPy object arithmetic errors started_at = results["started_at"] completed_at = results["completed_at"] size_before = results["database_size_before"] size_after = results["database_size_after"] assert isinstance( started_at, int | float ), f"started_at should be numeric, got {type(started_at)}" assert isinstance( completed_at, int | float ), f"completed_at should be numeric, got {type(completed_at)}" assert isinstance( size_before, int ), f"size_before should be int, got {type(size_before)}" assert isinstance( size_after, int ), f"size_after should be int, got {type(size_after)}" results["maintenance_duration"] = completed_at - started_at results["size_reduction_bytes"] = size_before - size_after results["size_reduction_percent"] = ( ((size_before - size_after) / size_before) * 100 if size_before > 0 else 0 ) logger.info( f"Database maintenance completed in {results['maintenance_duration']:.2f}s" ) logger.info( f"Size change: {results['size_reduction_bytes']} bytes ({results['size_reduction_percent']:.1f}%)" ) return results def _initialize_engine(self) -> None: """Initialize SQLAlchemy engine for the database.""" self.engine = create_engine( f"sqlite:///{self.db_path}", echo=False, connect_args={"check_same_thread": False}, ) self.SessionLocal = sessionmaker(bind=self.engine) def _run_analyze(self) -> dict[str, Any]: """Update database statistics for query optimization.""" start_time = time.time() if not self.engine: return { "success": False, "error": "Database engine not initialized", "duration": 0, "tables_analyzed": 0, } try: with self.engine.connect() as conn: # Run global ANALYZE first conn.execute(text("ANALYZE")) # Use SQLAlchemy Inspector for safe schema introspection inspector = inspect(conn) table_names = inspector.get_table_names() # Analyze each table using safe identifier quoting valid_tables = [] for table_name in table_names: try: # Use SQLAlchemy's built-in identifier quoting for safety quoted_table = conn.dialect.identifier_preparer.quote( table_name ) conn.execute(text(f"ANALYZE {quoted_table}")) valid_tables.append(table_name) except Exception as e: # Skip problematic tables but continue with others logger.debug(f"Failed to analyze table {table_name}: {e}") continue conn.commit() duration = time.time() - start_time return { "success": True, "duration": duration, "tables_analyzed": len(valid_tables), "description": "Updated query planner statistics", } except Exception as e: return { "success": False, "error": str(e), "duration": time.time() - start_time, "tables_analyzed": 0, } def _run_vacuum(self) -> dict[str, Any]: """Reclaim unused space and defragment database.""" start_time = time.time() if not self.engine: return { "success": False, "error": "Database engine not initialized", "duration": 0, } size_before = self.db_path.stat().st_size if self.db_path.exists() else 0 try: with self.engine.connect() as conn: # Full vacuum to reclaim space conn.execute(text("VACUUM")) conn.commit() size_after = self.db_path.stat().st_size if self.db_path.exists() else 0 duration = time.time() - start_time space_reclaimed = size_before - size_after return { "success": True, "duration": duration, "size_before_bytes": size_before, "size_after_bytes": size_after, "space_reclaimed_bytes": space_reclaimed, "space_reclaimed_percent": ( (space_reclaimed / size_before * 100) if size_before > 0 else 0 ), "description": "Reclaimed unused space and defragmented database", } except Exception as e: return { "success": False, "error": str(e), "duration": time.time() - start_time, } def _run_integrity_check(self) -> dict[str, Any]: """Check database integrity.""" start_time = time.time() if not self.engine: return { "success": False, "error": "Database engine not initialized", "duration": 0, } try: with self.engine.connect() as conn: # Quick integrity check quick_result = conn.execute(text("PRAGMA quick_check")) quick_check_result = quick_result.fetchone()[0] issues = [] if quick_check_result != "ok": # Run full integrity check if quick check fails integrity_result = conn.execute(text("PRAGMA integrity_check")) integrity_results = integrity_result.fetchall() issues = [row[0] for row in integrity_results if row[0] != "ok"] duration = time.time() - start_time return { "success": len(issues) == 0, "duration": duration, "quick_check_result": quick_check_result, "issues_found": len(issues), "issues": issues[:10], # Limit to first 10 issues "description": f'Database integrity check {"passed" if len(issues) == 0 else "failed"}', } except Exception as e: return { "success": False, "error": str(e), "duration": time.time() - start_time, "issues_found": 0, "issues": [], } def _check_indexes(self) -> dict[str, Any]: """Check and analyze database indexes.""" start_time = time.time() if not self.engine: return { "success": False, "error": "Database engine not initialized", "duration": 0, } try: with self.engine.connect() as conn: # Use SQLAlchemy Inspector for safe schema introspection inspector = inspect(conn) table_names = inspector.get_table_names() index_stats = [] for table_name in table_names: try: # Get indexes for this table using Inspector table_indexes = inspector.get_indexes(table_name) for index_info in table_indexes: index_name = index_info["name"] # Get index usage statistics (approximation) quoted_table = conn.dialect.identifier_preparer.quote( table_name ) plan_result = conn.execute( text(f"EXPLAIN QUERY PLAN SELECT * FROM {quoted_table}") ) query_plan = plan_result.fetchall() uses_index = any( "USING INDEX" in str(step) for step in query_plan ) index_stats.append( { "name": index_name, "table": table_name, "columns": index_info["column_names"], "unique": index_info["unique"], "appears_used": uses_index, } ) except Exception as e: # Skip problematic tables but continue with others logger.debug( f"Failed to check indexes for table {table_name}: {e}" ) continue duration = time.time() - start_time total_indexes = len(index_stats) used_indexes = sum(1 for idx in index_stats if idx["appears_used"]) return { "success": True, "duration": duration, "total_indexes": total_indexes, "used_indexes": used_indexes, "unused_indexes": total_indexes - used_indexes, "index_details": index_stats, "description": f"Analyzed {total_indexes} indexes, {used_indexes} appear to be used", } except Exception as e: return { "success": False, "error": str(e), "duration": time.time() - start_time, } def _cleanup_old_data(self, days_to_keep: int = 90) -> dict[str, Any]: """Clean up old telemetry data.""" start_time = time.time() cutoff_timestamp = time.time() - (days_to_keep * 24 * 3600) tables_to_clean = [ ("mcp_tool_executions", "execution_start"), ("cli_command_executions", "execution_start"), ("cache_operation_metrics", "timestamp"), ("scan_executions", "execution_start"), ("threat_findings", "timestamp"), ("system_health", "timestamp"), ] cleanup_results = {} total_deleted = 0 if not self.engine: return { "success": False, "error": "Database engine not initialized", "duration": 0, "total_records_deleted": 0, } try: with self.engine.connect() as conn: inspector = inspect(conn) existing_tables = inspector.get_table_names() for table_name, timestamp_column in tables_to_clean: try: # Check if table exists using Inspector if table_name not in existing_tables: cleanup_results[table_name] = { "records_deleted": 0, "success": True, } continue # Use quoted identifiers for safety quoted_table = conn.dialect.identifier_preparer.quote( table_name ) quoted_column = conn.dialect.identifier_preparer.quote( timestamp_column ) # Count records to be deleted count_result = conn.execute( text( f"SELECT COUNT(*) FROM {quoted_table} WHERE {quoted_column} < :cutoff" ), {"cutoff": cutoff_timestamp}, ) count_to_delete = count_result.fetchone()[0] if count_to_delete > 0: # Delete old records conn.execute( text( f"DELETE FROM {quoted_table} WHERE {quoted_column} < :cutoff" ), {"cutoff": cutoff_timestamp}, ) cleanup_results[table_name] = { "records_deleted": count_to_delete, "success": True, } total_deleted += count_to_delete except Exception as e: cleanup_results[table_name] = { "records_deleted": 0, "success": False, "error": str(e), } conn.commit() duration = time.time() - start_time return { "success": True, "duration": duration, "total_records_deleted": total_deleted, "cutoff_days": days_to_keep, "cutoff_timestamp": cutoff_timestamp, "table_results": cleanup_results, "description": f"Cleaned up {total_deleted} old records older than {days_to_keep} days", } except Exception as e: return { "success": False, "error": str(e), "duration": time.time() - start_time, "total_records_deleted": 0, } # Removed _get_connection method - now using SQLAlchemy engine directly def get_database_info(self) -> dict[str, Any]: """Get comprehensive database information.""" if not self.db_path.exists(): return {"exists": False} info = { "exists": True, "path": str(self.db_path), "size_bytes": self.db_path.stat().st_size, "size_mb": self.db_path.stat().st_size / (1024 * 1024), "modified_time": self.db_path.stat().st_mtime, } try: if not self.engine: self._initialize_engine() with self.engine.connect() as conn: # Get SQLite version and settings result = conn.execute(text("SELECT sqlite_version()")) info["sqlite_version"] = result.fetchone()[0] # Get pragma settings pragmas = [ "page_size", "cache_size", "synchronous", "journal_mode", "temp_store", "locking_mode", "auto_vacuum", ] info["settings"] = {} for pragma in pragmas: # PRAGMA commands are safe system commands pragma_result = conn.execute(text(f"PRAGMA {pragma}")) pragma_row = pragma_result.fetchone() info["settings"][pragma] = pragma_row[0] if pragma_row else None # Get table information using Inspector inspector = inspect(conn) table_names = inspector.get_table_names() info["tables"] = {} for table_name in table_names: try: # Get table columns for additional info columns = inspector.get_columns(table_name) # Get row count safely quoted_table = conn.dialect.identifier_preparer.quote( table_name ) count_result = conn.execute( text(f"SELECT COUNT(*) FROM {quoted_table}") ) row_count = count_result.fetchone()[0] info["tables"][table_name] = { "row_count": row_count, "column_count": len(columns), "columns": [col["name"] for col in columns], } except Exception as e: # Skip problematic tables but continue with others logger.debug(f"Failed to get info for table {table_name}: {e}") continue info["total_records"] = sum( t["row_count"] for t in info["tables"].values() ) # Get index information using Inspector total_indexes = 0 for table_name in table_names: try: table_indexes = inspector.get_indexes(table_name) total_indexes += len(table_indexes) except Exception as e: logger.debug( f"Failed to get indexes for table {table_name}: {e}" ) continue info["index_count"] = total_indexes except Exception as e: info["error"] = str(e) return info def run_database_maintenance( db_path: Path = None, operations: list[str] = None ) -> dict[str, Any]: """Run database maintenance operations.""" if db_path is None: db_path = Path.home() / ".local/share/adversary-mcp-server/cache/adversary.db" manager = DatabaseMaintenanceManager(db_path) return manager.run_maintenance(operations) def get_maintenance_recommendations(db_info: dict[str, Any]) -> list[str]: """Get maintenance recommendations based on database info.""" recommendations = [] # Check if database info indicates failure if db_info.get("success") is False: return [] # No recommendations for failed database info # Only check exists if it's explicitly set to False if db_info.get("exists") is False: return ["Database does not exist"] # Size-based recommendations size_mb = db_info.get("size_mb", 0) # Also check file_size_bytes for different test data formats file_size_bytes = db_info.get("file_size_bytes", 0) if file_size_bytes > 0: size_mb = file_size_bytes / (1024 * 1024) if size_mb > 100: recommendations.append( "Database is large (>100MB) - consider running VACUUM to reclaim space" ) # Record count recommendations - support multiple data formats total_records = db_info.get("total_records", 0) total_rows = db_info.get("total_rows", 0) if total_rows > 0: total_records = total_rows if total_records > 100000: recommendations.append( "High record count (>100k) - consider cleanup of old data" ) # Index analysis recommendations table_count = db_info.get("table_count", len(db_info.get("tables", {}))) index_count = db_info.get("index_count", 0) # Recommend index analysis if few indexes relative to tables if table_count > 5 and index_count < (table_count * 0.5): recommendations.append( "Few indexes relative to table count - consider index analysis" ) # Settings recommendations settings = db_info.get("settings", {}) if settings.get("auto_vacuum") == 0: recommendations.append( "Auto-vacuum disabled - consider enabling or running manual VACUUM" ) if settings.get("synchronous") == 2: # FULL recommendations.append( "Synchronous mode is FULL - consider NORMAL for better performance" ) if not recommendations: recommendations.append("Database appears to be in good condition") return recommendations

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server