Skip to main content
Glama

Adversary MCP Server

by brettbergin
models.py13.9 kB
"""SQLAlchemy ORM models for comprehensive telemetry and monitoring.""" import json from datetime import UTC, datetime from pathlib import Path from sqlalchemy import ( Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, String, Text, create_engine, ) from sqlalchemy.orm import DeclarativeBase, Session, relationship, sessionmaker from sqlalchemy.types import VARCHAR, TypeDecorator class Base(DeclarativeBase): """SQLAlchemy declarative base class.""" pass class JSONType(TypeDecorator): """SQLAlchemy type for storing JSON data.""" impl = VARCHAR def process_bind_param(self, value, dialect): if value is not None: return json.dumps(value) return value def process_result_value(self, value, dialect): if value is not None: return json.loads(value) return value # === COMPREHENSIVE TELEMETRY MODELS === class CacheEntry(Base): """SQLAlchemy ORM model for cache entries.""" __tablename__ = "cache_entries" id = Column(Integer, primary_key=True, autoincrement=True) key = Column(String(255), unique=True, nullable=False, index=True) cache_type = Column(String(50), nullable=False, index=True) content_hash = Column(String(64), nullable=False, index=True) data_size_bytes = Column(Integer, nullable=False) access_count = Column(Integer, nullable=False, default=0) last_accessed = Column(Float, nullable=False, index=True) created_at = Column(Float, nullable=False, index=True) expires_at = Column(Float, nullable=True, index=True) cache_metadata = Column( JSONType, nullable=True ) # Renamed to avoid reserved 'metadata' attribute __table_args__ = ( Index("idx_cache_type_accessed", "cache_type", "last_accessed"), Index("idx_expires_at", "expires_at"), ) def __repr__(self): return f"<CacheEntry(key={self.key}, type={self.cache_type})>" class MCPToolExecution(Base): """Track all MCP tool executions and performance.""" __tablename__ = "mcp_tool_executions" id = Column(Integer, primary_key=True, autoincrement=True) tool_name = Column( String(100), nullable=False, index=True ) # 'adv_scan_file', 'adv_scan_folder', etc. session_id = Column(String(36), nullable=False, index=True) # Track user sessions request_params = Column(JSONType, nullable=False) # Tool parameters execution_start = Column(Float, nullable=False, index=True) execution_end = Column(Float, nullable=True) success = Column(Boolean, nullable=False, default=True, index=True) error_message = Column(Text, nullable=True) findings_count = Column(Integer, nullable=False, default=0) validation_enabled = Column(Boolean, nullable=False, default=False) llm_enabled = Column(Boolean, nullable=False, default=False) created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) __table_args__ = ( Index("idx_tool_session_time", "tool_name", "session_id", "execution_start"), Index("idx_success_time", "success", "execution_start"), ) def __repr__(self): return f"<MCPToolExecution(tool={self.tool_name}, success={self.success})>" class CLICommandExecution(Base): """Track all CLI command executions and performance.""" __tablename__ = "cli_command_executions" id = Column(Integer, primary_key=True, autoincrement=True) command_name = Column( String(100), nullable=False, index=True ) # 'scan', 'configure', 'status' subcommand = Column( String(100), nullable=True, index=True ) # file path or directory args = Column(JSONType, nullable=False) # Command line arguments execution_start = Column(Float, nullable=False, index=True) execution_end = Column(Float, nullable=True) exit_code = Column(Integer, nullable=False, default=0, index=True) stdout_lines = Column(Integer, nullable=False, default=0) stderr_lines = Column(Integer, nullable=False, default=0) findings_count = Column(Integer, nullable=False, default=0) validation_enabled = Column(Boolean, nullable=False, default=False) created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) __table_args__ = ( Index("idx_command_exit_time", "command_name", "exit_code", "execution_start"), Index("idx_subcommand_time", "subcommand", "execution_start"), ) def __repr__(self): return f"<CLICommandExecution(command={self.command_name}, exit_code={self.exit_code})>" class CacheOperationMetric(Base): """Comprehensive cache operation tracking.""" __tablename__ = "cache_operations" id = Column(Integer, primary_key=True, autoincrement=True) operation_type = Column( String(20), nullable=False, index=True ) # 'hit', 'miss', 'set', 'evict', 'cleanup' cache_name = Column( String(50), nullable=False, index=True ) # 'scan_results', 'semgrep_cache', 'llm_analysis' key_hash = Column(String(64), nullable=False, index=True) key_metadata = Column(JSONType, nullable=True) # File path, language, etc. size_bytes = Column(Integer, nullable=True) ttl_seconds = Column(Integer, nullable=True) access_time_ms = Column(Float, nullable=True) # Time to retrieve/store eviction_reason = Column( String(50), nullable=True ) # 'ttl_expired', 'lru', 'manual' timestamp = Column(Float, nullable=False, index=True) created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) __table_args__ = ( Index("idx_cache_operation_type", "cache_name", "operation_type"), Index("idx_cache_timestamp", "cache_name", "timestamp"), ) def __repr__(self): return ( f"<CacheOperationMetric(cache={self.cache_name}, op={self.operation_type})>" ) class ScanEngineExecution(Base): """Detailed scan engine execution tracking.""" __tablename__ = "scan_executions" id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(String(36), unique=True, nullable=False, index=True) trigger_source = Column( String(20), nullable=False, index=True ) # 'mcp_tool', 'cli', 'api' scan_type = Column( String(20), nullable=False, index=True ) # 'file', 'directory', 'code', 'diff' target_path = Column(Text, nullable=False) file_count = Column(Integer, nullable=False, default=1) language_detected = Column(String(50), nullable=True, index=True) file_size_bytes = Column(Integer, nullable=True) # Timing breakdown total_duration_ms = Column(Float, nullable=True) semgrep_duration_ms = Column(Float, nullable=True) llm_duration_ms = Column(Float, nullable=True) validation_duration_ms = Column(Float, nullable=True) cache_lookup_ms = Column(Float, nullable=True) # Results threats_found = Column(Integer, nullable=False, default=0) threats_validated = Column(Integer, nullable=False, default=0) false_positives_filtered = Column(Integer, nullable=False, default=0) cache_hit = Column(Boolean, nullable=False, default=False, index=True) # Configuration semgrep_enabled = Column(Boolean, nullable=False, default=True) llm_enabled = Column(Boolean, nullable=False, default=False) validation_enabled = Column(Boolean, nullable=False, default=False) execution_start = Column(Float, nullable=False, index=True) execution_end = Column(Float, nullable=True) success = Column(Boolean, nullable=False, default=True, index=True) error_message = Column(Text, nullable=True) created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) __table_args__ = ( Index("idx_scan_source_type", "trigger_source", "scan_type"), Index("idx_scan_language_time", "language_detected", "execution_start"), Index("idx_scan_success_time", "success", "execution_start"), ) def __repr__(self): return f"<ScanEngineExecution(scan_id={self.scan_id}, type={self.scan_type})>" class ThreatFinding(Base): """Individual threat findings with full context.""" __tablename__ = "threat_findings" id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column( String(36), ForeignKey("scan_executions.scan_id"), nullable=False, index=True ) finding_uuid = Column(String(36), unique=True, nullable=False, index=True) # Detection details scanner_source = Column(String(20), nullable=False, index=True) # 'semgrep', 'llm' rule_id = Column(String(100), nullable=True, index=True) category = Column( String(50), nullable=False, index=True ) # 'injection', 'xss', 'crypto', etc. severity = Column( String(20), nullable=False, index=True ) # 'low', 'medium', 'high', 'critical' confidence = Column(Float, nullable=True) # 0.0-1.0 for validation confidence # Location details file_path = Column(Text, nullable=False) line_start = Column(Integer, nullable=False) line_end = Column(Integer, nullable=False) column_start = Column(Integer, nullable=True) column_end = Column(Integer, nullable=True) code_snippet = Column(Text, nullable=True) # Finding details title = Column(String(200), nullable=False) description = Column(Text, nullable=True) remediation = Column(Text, nullable=True) references = Column(JSONType, nullable=True) # CWE, OWASP references # Validation and false positive tracking is_validated = Column(Boolean, nullable=False, default=False, index=True) is_false_positive = Column(Boolean, nullable=False, default=False, index=True) validation_reason = Column(Text, nullable=True) marked_by = Column(String(100), nullable=True) timestamp = Column(Float, nullable=False, index=True) created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) # Relationship scan_execution = relationship("ScanEngineExecution", backref="findings") __table_args__ = ( Index("idx_finding_category_severity", "category", "severity"), Index("idx_finding_scanner_validation", "scanner_source", "is_validated"), Index("idx_finding_file_line", "file_path", "line_start"), ) def __repr__(self): return f"<ThreatFinding(uuid={self.finding_uuid}, category={self.category})>" class SystemHealth(Base): """System health and performance snapshots.""" __tablename__ = "system_health" id = Column(Integer, primary_key=True, autoincrement=True) timestamp = Column(Float, nullable=False, index=True) # System metrics cpu_percent = Column(Float, nullable=True) memory_percent = Column(Float, nullable=True) memory_used_mb = Column(Float, nullable=True) disk_usage_percent = Column(Float, nullable=True) # Database metrics db_size_mb = Column(Float, nullable=True) db_connections = Column(Integer, nullable=True) # Cache metrics cache_size_mb = Column(Float, nullable=True) cache_hit_rate_1h = Column(Float, nullable=True) cache_entries_count = Column(Integer, nullable=True) # Performance metrics avg_scan_duration_1h = Column(Float, nullable=True) scans_per_hour = Column(Float, nullable=True) error_rate_1h = Column(Float, nullable=True) created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) __table_args__ = (Index("idx_health_timestamp", "timestamp"),) def __repr__(self): return f"<SystemHealth(timestamp={self.timestamp})>" class LLMAnalysisSession(Base): """Persistent storage for LLM analysis sessions.""" __tablename__ = "sessions" id = Column(Integer, primary_key=True, autoincrement=True) session_id = Column(String(36), nullable=False, unique=True, index=True) # Session metadata state = Column(String(20), nullable=False, index=True) created_at = Column(Float, nullable=False, index=True) last_activity = Column(Float, nullable=False, index=True) # Session data (JSON serialized) session_data = Column(Text, nullable=False) # Optimization indexes __table_args__ = ( Index("idx_session_activity", "last_activity"), Index("idx_session_state", "state"), Index("idx_session_created", "created_at"), ) def __repr__(self): return f"<LLMAnalysisSession(session_id={self.session_id}, state={self.state})>" # === UNIFIED DATABASE === class AdversaryDatabase: """Consolidated SQLAlchemy database for all telemetry and metrics.""" def __init__( self, db_path: Path = Path("~/.local/share/adversary-mcp-server/cache/adversary.db"), ): self.db_path = db_path.expanduser() self.db_path.parent.mkdir(parents=True, exist_ok=True) # Create SQLAlchemy engine with optimizations self.engine = create_engine( f"sqlite:///{self.db_path}", echo=False, # Set to True for SQL debugging pool_pre_ping=True, connect_args={ "check_same_thread": False, "timeout": 30, "isolation_level": None, }, ) # Create session factory self.SessionLocal = sessionmaker( autocommit=False, autoflush=True, bind=self.engine ) # Create all tables Base.metadata.create_all(bind=self.engine) # Set secure database file permissions (owner read/write only) if self.db_path.exists(): import os os.chmod(self.db_path, 0o600) def get_session(self) -> Session: """Get a new database session.""" return self.SessionLocal() def close(self): """Close database connections.""" self.engine.dispose()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server