Skip to main content
Glama

Adversary MCP Server

by brettbergin
repository.py26.4 kB
"""Comprehensive repository for all telemetry data with rich query methods.""" import time from typing import Any from sqlalchemy import and_, case, func from sqlalchemy.orm import Session from ..database.models import ( CacheOperationMetric, CLICommandExecution, MCPToolExecution, ScanEngineExecution, SystemHealth, ThreatFinding, ) class ComprehensiveTelemetryRepository: """Comprehensive repository for all telemetry data with rich query methods.""" def __init__(self, session: Session): self.session = session self._auto_commit = True # Default to auto-commit for backward compatibility def _maybe_commit(self): """Commit the session if auto-commit is enabled.""" if self._auto_commit: self.session.commit() # === MCP TOOL TRACKING === def track_mcp_tool_execution( self, tool_name: str, session_id: str, request_params: dict, **kwargs ) -> MCPToolExecution: """Track MCP tool execution start.""" execution = MCPToolExecution( tool_name=tool_name, session_id=session_id, request_params=request_params, execution_start=time.time(), validation_enabled=kwargs.get("validation_enabled", False), llm_enabled=kwargs.get("llm_enabled", False), ) self.session.add(execution) self.session.flush() # Ensure object is in DB before refresh self.session.refresh(execution) self._maybe_commit() # Create a new detached object with the same data to avoid DetachedInstanceError detached_execution = MCPToolExecution( tool_name=execution.tool_name, session_id=execution.session_id, request_params=execution.request_params, execution_start=execution.execution_start, validation_enabled=execution.validation_enabled, llm_enabled=execution.llm_enabled, execution_end=execution.execution_end, success=execution.success, findings_count=execution.findings_count, error_message=execution.error_message, ) detached_execution.id = execution.id detached_execution.created_at = execution.created_at return detached_execution def complete_mcp_tool_execution( self, execution_id: int, success: bool = True, findings_count: int = 0, error_message: str = None, ): """Complete MCP tool execution tracking.""" execution = ( self.session.query(MCPToolExecution).filter_by(id=execution_id).first() ) if execution: execution.execution_end = time.time() execution.success = success execution.findings_count = findings_count execution.error_message = error_message self._maybe_commit() # === CLI COMMAND TRACKING === def track_cli_command_execution( self, command_name: str, args: dict, subcommand: str = None, **kwargs ) -> CLICommandExecution: """Track CLI command execution.""" execution = CLICommandExecution( command_name=command_name, subcommand=subcommand, args=args, execution_start=time.time(), validation_enabled=kwargs.get("validation_enabled", False), ) self.session.add(execution) self.session.flush() # Ensure object is in DB before refresh self.session.refresh(execution) self._maybe_commit() # Create a new detached object with the same data to avoid DetachedInstanceError detached_execution = CLICommandExecution( command_name=execution.command_name, subcommand=execution.subcommand, args=execution.args, execution_start=execution.execution_start, validation_enabled=execution.validation_enabled, execution_end=execution.execution_end, exit_code=execution.exit_code, stdout_lines=execution.stdout_lines, stderr_lines=execution.stderr_lines, findings_count=execution.findings_count, ) detached_execution.id = execution.id detached_execution.created_at = execution.created_at return detached_execution def complete_cli_command_execution( self, execution_id: int, exit_code: int = 0, stdout_lines: int = 0, stderr_lines: int = 0, findings_count: int = 0, ): """Complete CLI command execution tracking.""" execution = ( self.session.query(CLICommandExecution).filter_by(id=execution_id).first() ) if execution: execution.execution_end = time.time() execution.exit_code = exit_code execution.stdout_lines = stdout_lines execution.stderr_lines = stderr_lines execution.findings_count = findings_count self._maybe_commit() # === CACHE OPERATIONS === def track_cache_operation( self, operation_type: str, cache_name: str, key_hash: str, key_metadata: dict = None, size_bytes: int = None, access_time_ms: float = None, **kwargs, ) -> CacheOperationMetric: """Track cache operation.""" metric = CacheOperationMetric( operation_type=operation_type, cache_name=cache_name, key_hash=key_hash, key_metadata=key_metadata, size_bytes=size_bytes, access_time_ms=access_time_ms, ttl_seconds=kwargs.get("ttl_seconds"), eviction_reason=kwargs.get("eviction_reason"), timestamp=time.time(), ) self.session.add(metric) self.session.flush() # Ensure object is in DB before refresh self.session.refresh(metric) self._maybe_commit() # Create a new detached object with the same data to avoid DetachedInstanceError detached_metric = CacheOperationMetric( operation_type=metric.operation_type, cache_name=metric.cache_name, key_hash=metric.key_hash, key_metadata=metric.key_metadata, size_bytes=metric.size_bytes, access_time_ms=metric.access_time_ms, ttl_seconds=metric.ttl_seconds, eviction_reason=metric.eviction_reason, timestamp=metric.timestamp, ) detached_metric.id = metric.id detached_metric.created_at = metric.created_at return detached_metric # === SCAN ENGINE TRACKING === def track_scan_execution( self, scan_id: str, trigger_source: str, scan_type: str, target_path: str, **kwargs, ) -> ScanEngineExecution: """Track scan engine execution start.""" execution = ScanEngineExecution( scan_id=scan_id, trigger_source=trigger_source, scan_type=scan_type, target_path=target_path, file_count=kwargs.get("file_count", 1), language_detected=kwargs.get("language_detected"), file_size_bytes=kwargs.get("file_size_bytes"), semgrep_enabled=kwargs.get("semgrep_enabled", True), llm_enabled=kwargs.get("llm_enabled", False), validation_enabled=kwargs.get("validation_enabled", False), execution_start=time.time(), ) self.session.add(execution) self.session.flush() # Ensure object is in DB before refresh self.session.refresh(execution) self._maybe_commit() # Create a new detached object with the same data to avoid DetachedInstanceError detached_execution = ScanEngineExecution( scan_id=execution.scan_id, trigger_source=execution.trigger_source, scan_type=execution.scan_type, target_path=execution.target_path, file_count=execution.file_count, language_detected=execution.language_detected, file_size_bytes=execution.file_size_bytes, semgrep_enabled=execution.semgrep_enabled, llm_enabled=execution.llm_enabled, validation_enabled=execution.validation_enabled, execution_start=execution.execution_start, execution_end=execution.execution_end, total_duration_ms=execution.total_duration_ms, semgrep_duration_ms=execution.semgrep_duration_ms, llm_duration_ms=execution.llm_duration_ms, validation_duration_ms=execution.validation_duration_ms, cache_lookup_ms=execution.cache_lookup_ms, threats_found=execution.threats_found, threats_validated=execution.threats_validated, false_positives_filtered=execution.false_positives_filtered, cache_hit=execution.cache_hit, success=execution.success, error_message=execution.error_message, ) detached_execution.id = execution.id detached_execution.created_at = execution.created_at return detached_execution def complete_scan_execution(self, scan_id: str, success: bool = True, **kwargs): """Complete scan execution with results.""" execution = ( self.session.query(ScanEngineExecution).filter_by(scan_id=scan_id).first() ) if execution: execution.execution_end = time.time() execution.total_duration_ms = kwargs.get("total_duration_ms") execution.semgrep_duration_ms = kwargs.get("semgrep_duration_ms") execution.llm_duration_ms = kwargs.get("llm_duration_ms") execution.validation_duration_ms = kwargs.get("validation_duration_ms") execution.cache_lookup_ms = kwargs.get("cache_lookup_ms") execution.threats_found = kwargs.get("threats_found", 0) execution.threats_validated = kwargs.get("threats_validated", 0) execution.false_positives_filtered = kwargs.get( "false_positives_filtered", 0 ) execution.cache_hit = kwargs.get("cache_hit", False) execution.success = success execution.error_message = kwargs.get("error_message") self._maybe_commit() # === THREAT FINDINGS === def record_threat_finding( self, scan_id: str, finding_uuid: str, scanner_source: str, category: str, severity: str, file_path: str, line_start: int, line_end: int, title: str, **kwargs, ) -> ThreatFinding: """Record a threat finding.""" finding = ThreatFinding( scan_id=scan_id, finding_uuid=finding_uuid, scanner_source=scanner_source, rule_id=kwargs.get("rule_id"), category=category, severity=severity, confidence=kwargs.get("confidence"), file_path=file_path, line_start=line_start, line_end=line_end, column_start=kwargs.get("column_start"), column_end=kwargs.get("column_end"), code_snippet=kwargs.get("code_snippet"), title=title, description=kwargs.get("description"), remediation=kwargs.get("remediation"), references=kwargs.get("references"), is_validated=kwargs.get("is_validated", False), is_false_positive=kwargs.get("is_false_positive", False), validation_reason=kwargs.get("validation_reason"), marked_by=kwargs.get("marked_by"), timestamp=time.time(), ) self.session.add(finding) self.session.flush() # Ensure object is in DB before refresh self.session.refresh(finding) self._maybe_commit() # Create a new detached object with the same data to avoid DetachedInstanceError detached_finding = ThreatFinding( scan_id=finding.scan_id, finding_uuid=finding.finding_uuid, scanner_source=finding.scanner_source, rule_id=finding.rule_id, category=finding.category, severity=finding.severity, confidence=finding.confidence, file_path=finding.file_path, line_start=finding.line_start, line_end=finding.line_end, column_start=finding.column_start, column_end=finding.column_end, code_snippet=finding.code_snippet, title=finding.title, description=finding.description, remediation=finding.remediation, references=finding.references, is_validated=finding.is_validated, is_false_positive=finding.is_false_positive, validation_reason=finding.validation_reason, marked_by=finding.marked_by, timestamp=finding.timestamp, ) detached_finding.id = finding.id detached_finding.created_at = finding.created_at return detached_finding def update_threat_finding_validation( self, finding_uuid: str, is_validated: bool, validation_confidence: float | None, validation_reasoning: str | None, is_false_positive: bool | None = None, validation_reason: str | None = None, exploitation_vector: str | None = None, ): """Update threat finding with validation results.""" finding = ( self.session.query(ThreatFinding) .filter_by(finding_uuid=finding_uuid) .first() ) if finding: finding.is_validated = is_validated finding.validation_confidence = validation_confidence finding.validation_reasoning = validation_reasoning if is_false_positive is not None: finding.is_false_positive = is_false_positive if validation_reason is not None: finding.validation_reason = validation_reason # Note: exploitation_vector could be added to ThreatFinding model if needed self._maybe_commit() return finding return None # === SYSTEM HEALTH === def record_system_health_snapshot(self, **metrics) -> SystemHealth: """Record system health metrics snapshot.""" health = SystemHealth( timestamp=time.time(), cpu_percent=metrics.get("cpu_percent"), memory_percent=metrics.get("memory_percent"), memory_used_mb=metrics.get("memory_used_mb"), disk_usage_percent=metrics.get("disk_usage_percent"), db_size_mb=metrics.get("db_size_mb"), db_connections=metrics.get("db_connections"), cache_size_mb=metrics.get("cache_size_mb"), cache_hit_rate_1h=metrics.get("cache_hit_rate_1h"), cache_entries_count=metrics.get("cache_entries_count"), avg_scan_duration_1h=metrics.get("avg_scan_duration_1h"), scans_per_hour=metrics.get("scans_per_hour"), error_rate_1h=metrics.get("error_rate_1h"), ) self.session.add(health) self.session.flush() # Ensure object is in DB before refresh self.session.refresh(health) self._maybe_commit() # Create a new detached object with the same data to avoid DetachedInstanceError detached_health = SystemHealth( timestamp=health.timestamp, cpu_percent=health.cpu_percent, memory_percent=health.memory_percent, memory_used_mb=health.memory_used_mb, disk_usage_percent=health.disk_usage_percent, db_size_mb=health.db_size_mb, db_connections=health.db_connections, cache_size_mb=health.cache_size_mb, cache_hit_rate_1h=health.cache_hit_rate_1h, cache_entries_count=health.cache_entries_count, avg_scan_duration_1h=health.avg_scan_duration_1h, scans_per_hour=health.scans_per_hour, error_rate_1h=health.error_rate_1h, ) detached_health.id = health.id detached_health.created_at = health.created_at return detached_health # === COMPREHENSIVE ANALYTICS QUERIES === def get_dashboard_data(self, hours: int = 24) -> dict[str, Any]: """Get comprehensive dashboard data.""" since = time.time() - (hours * 3600) # MCP Tool Statistics mcp_stats = ( self.session.query( MCPToolExecution.tool_name, func.count(MCPToolExecution.id).label("executions"), func.avg( case( ( MCPToolExecution.execution_end.isnot(None), ( MCPToolExecution.execution_end - MCPToolExecution.execution_start ) * 1000, ), else_=None, ) ).label("avg_duration_ms"), func.sum(MCPToolExecution.findings_count).label("total_findings"), func.sum(case((MCPToolExecution.success, 1), else_=0)).label( "successes" ), ) .filter(MCPToolExecution.execution_start > since) .group_by(MCPToolExecution.tool_name) .all() ) # CLI Command Statistics cli_stats = ( self.session.query( CLICommandExecution.command_name, func.count(CLICommandExecution.id).label("executions"), func.avg( case( ( CLICommandExecution.execution_end.isnot(None), ( CLICommandExecution.execution_end - CLICommandExecution.execution_start ) * 1000, ), else_=None, ) ).label("avg_duration_ms"), func.sum(CLICommandExecution.findings_count).label("total_findings"), func.sum(case((CLICommandExecution.exit_code == 0, 1), else_=0)).label( "successes" ), ) .filter(CLICommandExecution.execution_start > since) .group_by(CLICommandExecution.command_name) .all() ) # Cache Performance cache_stats = ( self.session.query( CacheOperationMetric.cache_name, func.sum( case((CacheOperationMetric.operation_type == "hit", 1), else_=0) ).label("hits"), func.sum( case((CacheOperationMetric.operation_type == "miss", 1), else_=0) ).label("misses"), func.avg(CacheOperationMetric.access_time_ms).label( "avg_access_time_ms" ), func.sum(CacheOperationMetric.size_bytes).label("total_size_bytes"), ) .filter(CacheOperationMetric.timestamp > since) .group_by(CacheOperationMetric.cache_name) .all() ) # Scan Engine Performance scan_stats = ( self.session.query( func.count(ScanEngineExecution.id).label("total_scans"), func.avg(ScanEngineExecution.total_duration_ms).label( "avg_total_duration" ), func.avg(ScanEngineExecution.semgrep_duration_ms).label( "avg_semgrep_duration_ms" ), func.avg(ScanEngineExecution.llm_duration_ms).label( "avg_llm_duration_ms" ), func.avg(ScanEngineExecution.validation_duration_ms).label( "avg_validation_duration_ms" ), func.sum(ScanEngineExecution.threats_found).label("total_threats"), func.sum(ScanEngineExecution.threats_validated).label( "total_validated" ), func.sum(ScanEngineExecution.false_positives_filtered).label( "total_false_positives" ), func.sum(case((ScanEngineExecution.cache_hit, 1), else_=0)).label( "cache_hits" ), ) .filter(ScanEngineExecution.execution_start > since) .first() ) # Threat Findings by Category threat_categories = ( self.session.query( ThreatFinding.category, ThreatFinding.severity, func.count(ThreatFinding.id).label("count"), func.avg(ThreatFinding.confidence).label("avg_confidence"), ) .filter(ThreatFinding.timestamp > since) .group_by(ThreatFinding.category, ThreatFinding.severity) .order_by(func.count(ThreatFinding.id).desc()) .all() ) # Language Performance language_performance = ( self.session.query( ScanEngineExecution.language_detected, func.count(ScanEngineExecution.id).label("scans"), func.avg(ScanEngineExecution.total_duration_ms).label("avg_duration"), func.sum(ScanEngineExecution.threats_found).label("threats_found"), ) .filter( and_( ScanEngineExecution.execution_start > since, ScanEngineExecution.language_detected.isnot(None), ) ) .group_by(ScanEngineExecution.language_detected) .order_by(func.count(ScanEngineExecution.id).desc()) .all() ) # Get individual threat findings for detailed dashboard table threat_findings = ( self.session.query(ThreatFinding) .filter(ThreatFinding.timestamp > since) .order_by(ThreatFinding.timestamp.desc()) .limit(100) # Limit to prevent overwhelming the dashboard .all() ) return { "mcp_tools": [ { "tool_name": stat.tool_name, "executions": stat.executions, "avg_duration_ms": float(stat.avg_duration_ms or 0), "total_findings": stat.total_findings or 0, "success_rate": (stat.successes or 0) / max(stat.executions, 1), } for stat in mcp_stats ], "cli_commands": [ { "command_name": stat.command_name, "executions": stat.executions, "avg_duration_ms": float(stat.avg_duration_ms or 0), "total_findings": stat.total_findings or 0, "success_rate": (stat.successes or 0) / max(stat.executions, 1), } for stat in cli_stats ], "cache_performance": [ { "cache_name": stat.cache_name, "hits": stat.hits or 0, "misses": stat.misses or 0, "hit_rate": (stat.hits or 0) / max((stat.hits or 0) + (stat.misses or 0), 1), "avg_access_time_ms": float(stat.avg_access_time_ms or 0), "total_size_mb": (stat.total_size_bytes or 0) / 1024 / 1024, } for stat in cache_stats ], "scan_engine": { "total_scans": scan_stats.total_scans or 0, "avg_total_duration_ms": float(scan_stats.avg_total_duration or 0), "avg_semgrep_duration_ms": float( scan_stats.avg_semgrep_duration_ms or 0 ), "avg_llm_duration_ms": float(scan_stats.avg_llm_duration_ms or 0), "avg_validation_duration_ms": float( scan_stats.avg_validation_duration_ms or 0 ), "total_threats_found": scan_stats.total_threats or 0, "total_threats_validated": scan_stats.total_validated or 0, "false_positives_filtered": scan_stats.total_false_positives or 0, "cache_hit_rate": (scan_stats.cache_hits or 0) / max(scan_stats.total_scans or 1, 1), }, "threat_categories": [ { "category": stat.category, "severity": stat.severity, "count": stat.count, "avg_confidence": float(stat.avg_confidence or 0), } for stat in threat_categories ], "language_performance": [ { "language": stat.language_detected, "scans": stat.scans, "avg_duration_ms": float(stat.avg_duration or 0), "threats_found": stat.threats_found or 0, } for stat in language_performance ], "threat_findings": [ { "finding_uuid": finding.finding_uuid, "title": finding.title, "category": finding.category, "severity": finding.severity, "scanner_source": finding.scanner_source, "file_path": finding.file_path, "line_start": finding.line_start, "confidence": finding.confidence or 0, "is_validated": finding.is_validated, "is_false_positive": finding.is_false_positive, "timestamp": finding.timestamp, "scan_id": finding.scan_id, } for finding in threat_findings ], }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server