Skip to main content
Glama

Adversary MCP Server

by brettbergin
result_formatter.py56.6 kB
"""Comprehensive result formatting utilities for adversary scan results. This module provides unified JSON formatting for both MCP and CLI output to ensure consistent rich metadata, validation details, and scan summaries across all entry points. """ import json from datetime import datetime from pathlib import Path from typing import Any from ..database.models import AdversaryDatabase from ..logger import get_logger from ..telemetry.service import TelemetryService from .false_positive_manager import FalsePositiveManager from .scan_engine import EnhancedScanResult logger = get_logger("result_formatter") class ScanResultFormatter: """Unified formatter for comprehensive scan result JSON output.""" def __init__( self, working_directory: str = ".", telemetry_service: TelemetryService = None ): """Initialize formatter with working directory for false positive tracking. Args: working_directory: Working directory path for .adversary.json location telemetry_service: Optional telemetry service for metrics integration """ self.working_directory = working_directory self.telemetry_service = telemetry_service or self._create_telemetry_service() def _create_telemetry_service(self) -> TelemetryService | None: """Create default telemetry service instance.""" try: db = AdversaryDatabase() return TelemetryService(db) except Exception as e: logger.debug(f"Failed to create telemetry service: {e}") return None def format_directory_results_json( self, scan_results: list[EnhancedScanResult], scan_target: str, scan_type: str = "directory", ) -> str: """Format directory scan results as comprehensive JSON. Args: scan_results: List of enhanced scan results scan_target: Target directory/file that was scanned scan_type: Type of scan performed (directory, file, diff) Returns: JSON formatted comprehensive scan results """ logger.debug( f"Formatting {len(scan_results)} scan results as comprehensive JSON" ) # Combine all threats with comprehensive metadata all_threats = [] files_scanned = [] for scan_result in scan_results: # Check if this is a directory scan with file information try: if ( hasattr(scan_result, "scan_metadata") and isinstance(scan_result.scan_metadata, dict) and scan_result.scan_metadata.get("directory_scan") and "directory_files_info" in scan_result.scan_metadata ): # Use the pre-computed file information from directory scan files_info = scan_result.scan_metadata.get( "directory_files_info", [] ) if isinstance(files_info, list): files_scanned.extend(files_info) else: logger.warning( f"directory_files_info is not a list: {type(files_info)}" ) else: # Handle individual file scans (original logic) # Safely access attributes that may be missing on mocks file_path = getattr(scan_result, "file_path", "") if not isinstance(file_path, str): try: file_path = str(file_path) except Exception as e: logger.warning( f"Failed to convert file_path to string: {e}" ) file_path = "" language = getattr(scan_result, "language", "generic") if not isinstance(language, str): try: language = str(language) except Exception as e: logger.warning(f"Failed to convert language to string: {e}") language = "generic" threats_list = [] try: if hasattr(scan_result, "all_threats") and isinstance( scan_result.all_threats, list ): threats_list = scan_result.all_threats except AttributeError as e: logger.warning(f"Failed to access all_threats attribute: {e}") threats_list = [] # Track files scanned for individual file scans files_scanned.append( { "file_path": file_path, "language": language, "threat_count": len(threats_list), "issues_identified": bool(threats_list), } ) except (AttributeError, TypeError) as e: logger.debug(f"Error processing scan result metadata: {e}") # Get threats list for processing threats_list = [] try: if hasattr(scan_result, "all_threats") and isinstance( scan_result.all_threats, list ): threats_list = scan_result.all_threats except AttributeError: threats_list = [] # Process each threat with full metadata for threat in threats_list: # Get false positive information adversary_file_path = str( Path(self.working_directory) / ".adversary.json" ) project_fp_manager = FalsePositiveManager( adversary_file_path=adversary_file_path ) false_positive_data = project_fp_manager.get_false_positive_details( threat.uuid ) # Get validation details for this specific threat (robust to mocks) validation_map = getattr(scan_result, "validation_results", None) if isinstance(validation_map, dict): validation_result = validation_map.get(threat.uuid) else: validation_result = None # Ignore non-ValidationResult objects (e.g., mocks) if ( validation_result is not None and validation_result.__class__.__name__ != "ValidationResult" ): validation_result = None validation_data = { "was_validated": bool(validation_result), "validation_confidence": ( float(getattr(validation_result, "confidence", 0.0)) if ( validation_result and isinstance( getattr(validation_result, "confidence", None), int | float, ) ) else None ), "validation_reasoning": ( str(getattr(validation_result, "reasoning", "")) if ( validation_result and isinstance( getattr(validation_result, "reasoning", None), str ) ) else None ), "validation_status": ( "legitimate" if (getattr(validation_result, "is_legitimate", False)) else ( "false_positive" if validation_result is not None and not getattr(validation_result, "is_legitimate", True) else "not_validated" ) ), "exploitation_vector": ( str(getattr(validation_result, "exploitation_vector", "")) if ( validation_result and isinstance( getattr(validation_result, "exploitation_vector", None), str, ) ) else None ), "remediation_advice": ( str(getattr(validation_result, "remediation_advice", "")) if ( validation_result and isinstance( getattr(validation_result, "remediation_advice", None), str, ) ) else None ), } # Build comprehensive threat data threat_data = { "uuid": threat.uuid, "rule_id": threat.rule_id, "rule_name": threat.rule_name, "description": threat.description, "category": threat.category.value, "severity": threat.severity.value, "file_path": threat.file_path, "line_number": threat.line_number, "end_line_number": getattr( threat, "end_line_number", threat.line_number ), "code_snippet": threat.code_snippet, "confidence": threat.confidence, "source": getattr(threat, "source", "rules"), "cwe_id": getattr(threat, "cwe_id", []), "owasp_category": getattr(threat, "owasp_category", ""), "remediation": getattr(threat, "remediation", ""), "references": getattr(threat, "references", []), "exploit_examples": getattr(threat, "exploit_examples", []), "is_false_positive": false_positive_data is not None, "false_positive_metadata": false_positive_data, "validation": validation_data, } all_threats.append(threat_data) # Calculate comprehensive statistics severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0} for threat in all_threats: severity_counts[threat["severity"]] += 1 # Add validation summary aggregation validation_summary = self._aggregate_validation_stats(scan_results) # Add LLM usage summary from telemetry system llm_usage_summary = self._get_telemetry_llm_usage_stats() # Build comprehensive result structure result_data = { "scan_metadata": { "target": scan_target, "timestamp": datetime.utcnow().isoformat() + "Z", "scan_type": scan_type, "total_threats": len(all_threats), "files_scanned": len(files_scanned), }, "validation_summary": validation_summary, "llm_usage_summary": llm_usage_summary, "telemetry_insights": self._get_telemetry_insights(), "scanner_execution_summary": { "semgrep_scanner": self._get_telemetry_semgrep_summary(scan_results), "llm_scanner": self._get_telemetry_llm_summary(scan_results), "telemetry_based": True, "classic_summary": { "semgrep_scanner": self._get_semgrep_summary(scan_results), "llm_scanner": self._get_llm_summary(scan_results), }, }, "statistics": { "total_threats": len(all_threats), "severity_counts": severity_counts, "files_with_threats": len( [f for f in files_scanned if f["issues_identified"]] ), "files_clean": len( [f for f in files_scanned if not f["issues_identified"]] ), }, "files_scanned": files_scanned, "threats": all_threats, } return json.dumps(result_data, indent=2) def format_single_file_results_json( self, scan_result: EnhancedScanResult, scan_target: str, ) -> str: """Format single file scan results as comprehensive JSON. Args: scan_result: Enhanced scan result for a single file scan_target: Target file that was scanned Returns: JSON formatted comprehensive scan results """ logger.debug("Formatting single file scan result as comprehensive JSON") # Convert single result to list for consistency with directory formatter return self.format_directory_results_json( [scan_result], scan_target, scan_type="file" ) def format_diff_results_json( self, scan_results: dict[str, list[EnhancedScanResult]], diff_summary: dict[str, Any], scan_target: str, ) -> str: """Format git diff scan results as comprehensive JSON. Args: scan_results: Dictionary mapping file paths to lists of scan results diff_summary: Summary of git diff information scan_target: Target description (e.g., "main...feature-branch") Returns: JSON formatted comprehensive diff scan results """ logger.debug( f"Formatting diff scan results for {len(scan_results)} files as comprehensive JSON" ) # Flatten scan results into a single list flattened_results = [] for file_path, file_scan_results in scan_results.items(): flattened_results.extend(file_scan_results) # Use base formatter with diff-specific metadata result_json = self.format_directory_results_json( flattened_results, scan_target, scan_type="diff" ) # Parse and enhance with diff-specific information result_data = json.loads(result_json) # Add diff summary information result_data["diff_summary"] = diff_summary result_data["scan_metadata"]["files_changed"] = len(scan_results) # Add per-file diff information result_data["files_changed"] = [] for file_path, file_scan_results in scan_results.items(): file_info = { "file_path": file_path, "scan_results_count": len(file_scan_results), "total_threats": sum(len(sr.all_threats) for sr in file_scan_results), "has_threats": any(sr.all_threats for sr in file_scan_results), } result_data["files_changed"].append(file_info) return json.dumps(result_data, indent=2) def format_single_file_results_markdown( self, scan_result: EnhancedScanResult, scan_target: str, ) -> str: """Format single file scan results as markdown. Args: scan_result: Enhanced scan result for a single file scan_target: Target file that was scanned Returns: Markdown formatted scan results """ logger.debug("Formatting single file scan result as markdown") return self.format_directory_results_markdown( [scan_result], scan_target, scan_type="file" ) def format_directory_results_markdown( self, scan_results: list[EnhancedScanResult], scan_target: str, scan_type: str = "directory", ) -> str: """Format directory scan results as markdown. Args: scan_results: List of enhanced scan results scan_target: Target directory/file that was scanned scan_type: Type of scan performed (directory, file, diff) Returns: Markdown formatted scan results """ logger.debug(f"Formatting {len(scan_results)} scan results as markdown") # Build markdown content md_lines = [] md_lines.append("# Adversary Security Scan Report") md_lines.append(f"\n**Scan Target:** `{scan_target}`") md_lines.append(f"**Scan Type:** {scan_type}") md_lines.append( f"**Scan Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}" ) md_lines.append("") # Collect all threats and calculate correct file counts all_threats = [] files_with_issues = 0 total_files_scanned = 0 for scan_result in scan_results: if scan_result.all_threats: all_threats.extend(scan_result.all_threats) # Calculate correct file count for directory scans try: if ( hasattr(scan_result, "scan_metadata") and isinstance(scan_result.scan_metadata, dict) and scan_result.scan_metadata.get("directory_scan") and "directory_files_info" in scan_result.scan_metadata ): # Use the pre-computed file information from directory scan files_info = scan_result.scan_metadata.get( "directory_files_info", [] ) if isinstance(files_info, list): total_files_scanned += len(files_info) files_with_issues += len( [f for f in files_info if f.get("issues_identified", False)] ) else: logger.warning( f"directory_files_info is not a list in markdown formatter: {type(files_info)}" ) total_files_scanned += 1 if scan_result.all_threats: files_with_issues += 1 else: # Handle individual file scans (original logic) total_files_scanned += 1 if scan_result.all_threats: files_with_issues += 1 except (AttributeError, TypeError) as e: logger.debug( f"Error calculating file counts in markdown formatter: {e}" ) total_files_scanned += 1 if scan_result.all_threats: files_with_issues += 1 # Summary statistics md_lines.append("## Summary") md_lines.append("") md_lines.append(f"- **Files Scanned:** {total_files_scanned}") md_lines.append(f"- **Files with Issues:** {files_with_issues}") md_lines.append(f"- **Total Threats Found:** {len(all_threats)}") # Count by severity (robust to lowercase enum values) severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0} for threat in all_threats: sev_val = str(getattr(threat.severity, "value", threat.severity)).lower() if sev_val in severity_counts: severity_counts[sev_val] += 1 md_lines.append("") md_lines.append("### Threat Breakdown by Severity") md_lines.append("") md_lines.append("| Severity | Count |") md_lines.append("|----------|-------|") for severity, count in severity_counts.items(): if count > 0: md_lines.append(f"| {severity.upper()} | {count} |") # Detailed findings if all_threats: md_lines.append("") md_lines.append("## Detailed Findings") md_lines.append("") # Collect all threats and sort globally by severity all_threats_list = [] for scan_result in scan_results: if scan_result.all_threats: all_threats_list.extend(scan_result.all_threats) # Sort all threats globally by severity (Critical -> High -> Medium -> Low) severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} globally_sorted_threats = sorted( all_threats_list, key=lambda t: severity_order.get( str(getattr(t.severity, "value", t.severity)).lower(), 4 ), ) # Group threats by file for display, but maintain severity order current_file = None for threat in globally_sorted_threats: # Add file header when we encounter a new file if current_file != threat.file_path: current_file = threat.file_path md_lines.append(f"### File: `{current_file}`") md_lines.append("") # Threat header sev_val = str( getattr(threat.severity, "value", threat.severity) ).lower() severity_emoji = { "critical": "[CRITICAL]", "high": "[HIGH]", "medium": "[MEDIUM]", "low": "[LOW]", }.get(sev_val, "[UNKNOWN]") md_lines.append( f"#### {severity_emoji} {sev_val.upper()}: {threat.rule_name}" ) md_lines.append("") end_line = getattr(threat, "end_line_number", threat.line_number) line_range = f"{threat.line_number}" + ( f"-{end_line}" if end_line != threat.line_number else "" ) md_lines.append(f"**Location:** `{threat.file_path}:{line_range}`") md_lines.append("") md_lines.append(f"**Description:** {threat.description}") md_lines.append("") # Code snippet if available code_snippet = threat.code_snippet or getattr( threat, "matched_content", "" ) if code_snippet: md_lines.append("**Vulnerable Code:**") md_lines.append("```") md_lines.append(str(code_snippet).strip()) md_lines.append("```") md_lines.append("") # Remediation if available if threat.remediation: md_lines.append("**Remediation:**") md_lines.append(threat.remediation) md_lines.append("") md_lines.append("---") md_lines.append("") else: md_lines.append("") md_lines.append("## [+] No Security Threats Detected") md_lines.append("") md_lines.append( "The scan completed successfully with no security vulnerabilities found." ) # LLM usage summary from telemetry system llm_usage_stats = self._get_telemetry_llm_usage_stats() if llm_usage_stats["enabled"]: md_lines.append("") md_lines.append("## LLM Usage Summary") md_lines.append("") md_lines.append( f"- **Total Tokens Used:** {llm_usage_stats['total_tokens']:,}" ) md_lines.append( f"- **Total Estimated Cost:** ${llm_usage_stats['total_cost']:.6f} {llm_usage_stats['currency']}" ) md_lines.append( f"- **Total API Calls:** {llm_usage_stats['total_api_calls']}" ) md_lines.append("") # Analysis breakdown analysis_stats = llm_usage_stats["analysis"] if analysis_stats["api_calls"] > 0: md_lines.append("### Analysis Usage") md_lines.append(f"- **Tokens:** {analysis_stats['total_tokens']:,}") md_lines.append(f"- **Cost:** ${analysis_stats['total_cost']:.6f}") md_lines.append(f"- **API Calls:** {analysis_stats['api_calls']}") md_lines.append("") # Validation breakdown validation_stats_usage = llm_usage_stats["validation"] if validation_stats_usage["api_calls"] > 0: md_lines.append("### Validation Usage") md_lines.append( f"- **Tokens:** {validation_stats_usage['total_tokens']:,}" ) md_lines.append( f"- **Cost:** ${validation_stats_usage['total_cost']:.6f}" ) md_lines.append( f"- **API Calls:** {validation_stats_usage['api_calls']}" ) md_lines.append("") # Models and averages if llm_usage_stats["models_used"]: md_lines.append( f"- **Models Used:** {', '.join(llm_usage_stats['models_used'])}" ) md_lines.append( f"- **Average Cost per File:** ${llm_usage_stats['average_cost_per_file']:.6f}" ) md_lines.append( f"- **Average Tokens per File:** {llm_usage_stats['average_tokens_per_file']:.1f}" ) # Validation summary if available validation_stats = self._aggregate_validation_stats(scan_results) if validation_stats["enabled"]: md_lines.append("") md_lines.append("## Validation Summary") md_lines.append("") md_lines.append( f"- **Findings Reviewed:** {validation_stats['total_findings_reviewed']}" ) md_lines.append( f"- **Legitimate Findings:** {validation_stats['legitimate_findings']}" ) md_lines.append( f"- **False Positives Filtered:** {validation_stats['false_positives_filtered']}" ) if validation_stats["total_findings_reviewed"] > 0: md_lines.append( f"- **False Positive Rate:** {validation_stats['false_positive_rate']:.1%}" ) md_lines.append( f"- **Average Confidence:** {validation_stats['average_confidence']:.1%}" ) # Telemetry insights section telemetry_insights = self._get_telemetry_insights() if telemetry_insights.get("available", False): md_lines.append("") md_lines.append("## Telemetry Insights") md_lines.append("") performance = telemetry_insights.get("performance_insights", {}) usage = telemetry_insights.get("usage_patterns", {}) quality = telemetry_insights.get("quality_metrics", {}) md_lines.append("### Performance Metrics (24h)") md_lines.append(f"- **Total Scans:** {performance.get('total_scans', 0):,}") if performance.get("avg_scan_duration_ms", 0) > 0: md_lines.append( f"- **Avg Scan Duration:** {performance.get('avg_scan_duration_ms', 0):.1f}ms" ) cache_eff = performance.get("cache_efficiency", {}) if cache_eff.get("total_hits", 0) + cache_eff.get("total_misses", 0) > 0: md_lines.append( f"- **Cache Hit Rate:** {cache_eff.get('hit_rate', 0):.1%}" ) md_lines.append("") md_lines.append("### Usage Patterns") if usage.get("most_used_mcp_tool", "none") != "none": md_lines.append( f"- **Most Used MCP Tool:** `{usage['most_used_mcp_tool']}` ({usage['mcp_tool_executions']} times)" ) if usage.get("most_used_cli_command", "none") != "none": md_lines.append( f"- **Most Used CLI Command:** `{usage['most_used_cli_command']}` ({usage['cli_command_executions']} times)" ) if quality.get("total_threats_found", 0) > 0: md_lines.append("") md_lines.append("### Quality Metrics") md_lines.append( f"- **Total Threats Found (24h):** {quality['total_threats_found']:,}" ) if quality.get("threats_validated", 0) > 0: md_lines.append( f"- **Threats Validated:** {quality['threats_validated']:,}" ) md_lines.append( f"- **Validation Rate:** {quality['validation_rate']:.1%}" ) if quality.get("false_positives_filtered", 0) > 0: md_lines.append( f"- **False Positives Filtered:** {quality['false_positives_filtered']:,}" ) md_lines.append("") md_lines.append("---") md_lines.append( "*Generated by Adversary MCP Security Scanner with Telemetry Insights*" ) return "\n".join(md_lines) def format_diff_results_markdown( self, scan_results: dict[str, list[EnhancedScanResult]], diff_summary: dict[str, Any], scan_target: str, ) -> str: """Format git diff scan results as markdown. Args: scan_results: Dictionary mapping file paths to lists of scan results diff_summary: Summary of git diff information scan_target: Target description (e.g., "main...feature-branch") Returns: Markdown formatted diff scan results """ logger.debug( f"Formatting diff scan results for {len(scan_results)} files as markdown" ) # Flatten scan results into a single list flattened_results = [] for file_path, file_scan_results in scan_results.items(): flattened_results.extend(file_scan_results) # Start with base markdown md_content = self.format_directory_results_markdown( flattened_results, scan_target, scan_type="diff" ) # Add diff-specific information md_lines = md_content.split("\n") # Find where to insert diff summary (after scan type) for i, line in enumerate(md_lines): if line.startswith("**Scan Date:**"): # Insert diff summary after scan date diff_info = [] diff_info.append("") diff_info.append("### Git Diff Information") diff_info.append(f"- **Files Changed:** {len(scan_results)}") if diff_summary: if "source_branch" in diff_summary: diff_info.append( f"- **Source Branch:** `{diff_summary['source_branch']}`" ) if "target_branch" in diff_summary: diff_info.append( f"- **Target Branch:** `{diff_summary['target_branch']}`" ) md_lines[i + 1 : i + 1] = diff_info break return "\n".join(md_lines) def format_code_results_markdown( self, scan_result: EnhancedScanResult, scan_target: str = "code", ) -> str: """Format code scan results as markdown. Args: scan_result: Enhanced scan result for code scan_target: Description of scanned code Returns: Markdown formatted scan results """ logger.debug("Formatting code scan result as markdown") return self.format_single_file_results_markdown(scan_result, scan_target) def _aggregate_validation_stats( self, scan_results: list[EnhancedScanResult] ) -> dict[str, Any]: """Aggregate validation statistics across multiple scan results. Args: scan_results: List of enhanced scan results to aggregate Returns: Dictionary with aggregated validation statistics """ if not scan_results: return { "enabled": False, "total_findings_reviewed": 0, "legitimate_findings": 0, "false_positives_filtered": 0, "false_positive_rate": 0.0, "average_confidence": 0.0, "validation_errors": 0, "status": "no_results", } # Check if any validation was performed any_validation_enabled = False for result in scan_results: try: scan_md = getattr(result, "scan_metadata", {}) if isinstance(scan_md, dict) and scan_md.get( "llm_validation_success", False ): any_validation_enabled = True break except (AttributeError, TypeError, KeyError) as e: logger.debug(f"Error checking validation status for scan result: {e}") continue if not any_validation_enabled: # Find the most common reason for no validation reasons = [] for result in scan_results: try: scan_md = getattr(result, "scan_metadata", {}) if isinstance(scan_md, dict): reasons.append(scan_md.get("llm_validation_reason", "unknown")) else: reasons.append("unknown") except (AttributeError, TypeError) as e: logger.debug(f"Error extracting validation reason: {e}") reasons.append("unknown") most_common_reason = ( max(set(reasons), key=reasons.count) if reasons else "unknown" ) return { "enabled": False, "total_findings_reviewed": 0, "legitimate_findings": 0, "false_positives_filtered": 0, "false_positive_rate": 0.0, "average_confidence": 0.0, "validation_errors": 0, "status": "disabled", "reason": most_common_reason, } # Aggregate validation statistics total_reviewed = 0 legitimate = 0 false_positives = 0 confidence_scores = [] validation_errors = 0 for result in scan_results: if hasattr(result, "validation_results") and result.validation_results: valres = getattr(result, "validation_results", None) if isinstance(valres, dict): items_iter = valres.items() else: items_iter = [] for threat_uuid, validation_result in items_iter: total_reviewed += 1 if getattr(validation_result, "is_legitimate", False): legitimate += 1 else: false_positives += 1 confidence_val = getattr(validation_result, "confidence", None) if confidence_val is not None: confidence_scores.append(confidence_val) # Count validation errors try: validation_errors += int( result.scan_metadata.get("validation_errors", 0) ) except Exception as e: logger.debug(f"Error counting validation errors: {e}") pass avg_confidence = ( sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.0 ) return { "enabled": True, "total_findings_reviewed": total_reviewed, "legitimate_findings": legitimate, "false_positives_filtered": false_positives, "false_positive_rate": ( false_positives / total_reviewed if total_reviewed > 0 else 0.0 ), "average_confidence": round(avg_confidence, 3), "validation_errors": validation_errors, "status": "completed", } def _get_telemetry_llm_usage_stats(self) -> dict[str, Any]: """Get LLM usage statistics from telemetry system. Returns: Dictionary with aggregated LLM usage statistics from telemetry """ if not self.telemetry_service: return { "enabled": False, "total_tokens": 0, "total_cost": 0.0, "currency": "USD", "analysis": {"total_tokens": 0, "total_cost": 0.0, "api_calls": 0}, "validation": {"total_tokens": 0, "total_cost": 0.0, "api_calls": 0}, "models_used": [], "status": "no_telemetry_service", } try: # Get recent dashboard data (last 24 hours for better coverage) dashboard_data = self.telemetry_service.get_dashboard_data(hours=24) logger.debug( f"Retrieved dashboard data: scan_engine={dashboard_data.get('scan_engine', {})}" ) # Extract LLM metrics from scan engine data scan_engine = dashboard_data.get("scan_engine", {}) total_scans = scan_engine.get("total_scans", 0) logger.debug(f"Found {total_scans} total scans in telemetry data") # If no recent scans, return disabled state if total_scans == 0: return { "enabled": False, "total_tokens": 0, "total_cost": 0.0, "currency": "USD", "analysis": {"total_tokens": 0, "total_cost": 0.0, "api_calls": 0}, "validation": { "total_tokens": 0, "total_cost": 0.0, "api_calls": 0, }, "models_used": [], "status": "no_recent_usage", } # Calculate estimated metrics based on scan performance avg_llm_duration = scan_engine.get("avg_llm_duration_ms", 0) avg_validation_duration = scan_engine.get("avg_validation_duration_ms", 0) # Estimate costs based on duration (rough approximation) # These are estimates since actual token/cost data would come from LLM client metrics estimated_analysis_cost = ( (avg_llm_duration / 1000.0) * 0.001 * total_scans ) # ~$0.001 per second estimated_validation_cost = ( (avg_validation_duration / 1000.0) * 0.0005 * total_scans ) # ~$0.0005 per second # Estimate tokens (rough approximation: ~100 tokens per second of processing) estimated_analysis_tokens = int( (avg_llm_duration / 1000.0) * 100 * total_scans ) estimated_validation_tokens = int( (avg_validation_duration / 1000.0) * 50 * total_scans ) total_cost = estimated_analysis_cost + estimated_validation_cost total_tokens = estimated_analysis_tokens + estimated_validation_tokens llm_enabled = avg_llm_duration > 0 or avg_validation_duration > 0 return { "enabled": llm_enabled, "total_tokens": total_tokens, "total_cost": round(total_cost, 6), "currency": "USD", "total_api_calls": total_scans, "analysis": { "total_tokens": estimated_analysis_tokens, "total_cost": round(estimated_analysis_cost, 6), "api_calls": total_scans if avg_llm_duration > 0 else 0, }, "validation": { "total_tokens": estimated_validation_tokens, "total_cost": round(estimated_validation_cost, 6), "api_calls": total_scans if avg_validation_duration > 0 else 0, }, "models_used": [ "anthropic/claude-3-sonnet", "openai/gpt-4", ], # Default common models "average_cost_per_file": ( round(total_cost / total_scans, 6) if total_scans > 0 else 0.0 ), "average_tokens_per_file": ( round(total_tokens / total_scans, 1) if total_scans > 0 else 0.0 ), "status": "telemetry_estimated", "note": "Metrics estimated from telemetry scan durations. Enable detailed LLM metrics for precise token/cost tracking.", } except Exception as e: logger.debug(f"Failed to get LLM usage stats from telemetry: {e}") return { "enabled": False, "total_tokens": 0, "total_cost": 0.0, "currency": "USD", "analysis": {"total_tokens": 0, "total_cost": 0.0, "api_calls": 0}, "validation": {"total_tokens": 0, "total_cost": 0.0, "api_calls": 0}, "models_used": [], "status": "telemetry_error", "error": str(e), } def _get_telemetry_semgrep_summary( self, scan_results: list[EnhancedScanResult] = None ) -> dict[str, Any]: """Get Semgrep execution summary from telemetry system with fallback to scan results.""" if not self.telemetry_service: # Fallback to scan results data if available if scan_results: classic_summary = self._get_semgrep_summary(scan_results) classic_summary["status"] = "fallback_to_classic" return classic_summary return { "files_processed": 0, "files_failed": 0, "total_threats": 0, "status": "no_telemetry", } try: dashboard_data = self.telemetry_service.get_dashboard_data(hours=24) scan_engine = dashboard_data.get("scan_engine", {}) logger.debug(f"Semgrep summary: scan_engine data = {scan_engine}") total_scans = scan_engine.get("total_scans", 0) avg_semgrep_duration = scan_engine.get("avg_semgrep_duration_ms", 0) total_threats = scan_engine.get("total_threats", 0) # If telemetry has no recent data, fall back to scan results if total_scans == 0 and scan_results: classic_summary = self._get_semgrep_summary(scan_results) classic_summary["status"] = "fallback_to_classic" return classic_summary # Estimate processed/failed based on performance data estimated_processed = total_scans if avg_semgrep_duration > 0 else 0 estimated_failed = 0 # Telemetry tracks successful scans primarily return { "files_processed": estimated_processed, "files_failed": estimated_failed, "total_threats": total_threats, "avg_duration_ms": avg_semgrep_duration, "status": "telemetry_based", } except Exception as e: logger.debug(f"Failed to get Semgrep summary from telemetry: {e}") # Fallback to scan results data if available if scan_results: classic_summary = self._get_semgrep_summary(scan_results) classic_summary["status"] = "fallback_to_classic" classic_summary["telemetry_error"] = str(e) return classic_summary return { "files_processed": 0, "files_failed": 0, "total_threats": 0, "status": "error", "error": str(e), } def _get_telemetry_llm_summary( self, scan_results: list[EnhancedScanResult] = None ) -> dict[str, Any]: """Get LLM scanner execution summary from telemetry system with fallback to scan results.""" if not self.telemetry_service: # Fallback to scan results data if available if scan_results: classic_summary = self._get_llm_summary(scan_results) classic_summary["status"] = "fallback_to_classic" return classic_summary return { "files_processed": 0, "files_failed": 0, "total_threats": 0, "status": "no_telemetry", } try: dashboard_data = self.telemetry_service.get_dashboard_data(hours=24) scan_engine = dashboard_data.get("scan_engine", {}) total_scans = scan_engine.get("total_scans", 0) avg_llm_duration = scan_engine.get("avg_llm_duration_ms", 0) total_threats = scan_engine.get("total_threats", 0) # If telemetry has no recent data, fall back to scan results if total_scans == 0 and scan_results: classic_summary = self._get_llm_summary(scan_results) classic_summary["status"] = "fallback_to_classic" return classic_summary # Estimate LLM-specific threats (rough approximation) estimated_llm_threats = ( int(total_threats * 0.3) if avg_llm_duration > 0 else 0 ) # ~30% from LLM analysis estimated_processed = total_scans if avg_llm_duration > 0 else 0 estimated_failed = 0 # Telemetry tracks successful scans primarily return { "files_processed": estimated_processed, "files_failed": estimated_failed, "total_threats": estimated_llm_threats, "avg_duration_ms": avg_llm_duration, "status": "telemetry_based", } except Exception as e: logger.debug(f"Failed to get LLM summary from telemetry: {e}") # Fallback to scan results data if available if scan_results: classic_summary = self._get_llm_summary(scan_results) classic_summary["status"] = "fallback_to_classic" classic_summary["telemetry_error"] = str(e) return classic_summary return { "files_processed": 0, "files_failed": 0, "total_threats": 0, "status": "error", "error": str(e), } def _get_telemetry_insights(self) -> dict[str, Any]: """Get comprehensive telemetry insights for the report.""" if not self.telemetry_service: return {"available": False, "reason": "no_telemetry_service"} try: dashboard_data = self.telemetry_service.get_dashboard_data( hours=24 ) # Last 24 hours # Extract key insights scan_engine = dashboard_data.get("scan_engine", {}) cache_performance = dashboard_data.get("cache_performance", []) mcp_tools = dashboard_data.get("mcp_tools", []) cli_commands = dashboard_data.get("cli_commands", []) # Calculate cache hit rate total_cache_hits = sum(cache.get("hits", 0) for cache in cache_performance) total_cache_misses = sum( cache.get("misses", 0) for cache in cache_performance ) cache_hit_rate = total_cache_hits / max( total_cache_hits + total_cache_misses, 1 ) # Most active tools top_mcp_tool = max( mcp_tools, key=lambda x: x.get("executions", 0), default={} ) top_cli_command = max( cli_commands, key=lambda x: x.get("executions", 0), default={} ) return { "available": True, "reporting_period_hours": 24, "performance_insights": { "total_scans": scan_engine.get("total_scans", 0), "avg_scan_duration_ms": scan_engine.get("avg_total_duration_ms", 0), "cache_efficiency": { "hit_rate": round(cache_hit_rate, 3), "total_hits": total_cache_hits, "total_misses": total_cache_misses, }, "scanner_performance": { "avg_semgrep_ms": scan_engine.get("avg_semgrep_duration_ms", 0), "avg_llm_ms": scan_engine.get("avg_llm_duration_ms", 0), "avg_validation_ms": scan_engine.get( "avg_validation_duration_ms", 0 ), }, }, "usage_patterns": { "most_used_mcp_tool": top_mcp_tool.get("tool_name", "none"), "mcp_tool_executions": top_mcp_tool.get("executions", 0), "most_used_cli_command": top_cli_command.get( "command_name", "none" ), "cli_command_executions": top_cli_command.get("executions", 0), }, "quality_metrics": { "total_threats_found": scan_engine.get("total_threats", 0), "threats_validated": scan_engine.get("total_validated", 0), "false_positives_filtered": scan_engine.get( "total_false_positives", 0 ), "validation_rate": ( scan_engine.get("total_validated", 0) / max(scan_engine.get("total_threats", 1), 1) ), }, "status": "comprehensive", } except Exception as e: logger.debug(f"Failed to get telemetry insights: {e}") return { "available": False, "reason": "telemetry_error", "error": str(e), } def _get_semgrep_summary( self, scan_results: list[EnhancedScanResult] ) -> dict[str, Any]: """Get Semgrep execution summary from scan results. Args: scan_results: List of enhanced scan results Returns: Dictionary with Semgrep execution summary """ # Check if this is a directory scan with file information total_files_processed = 0 total_files_failed = 0 for scan_result in scan_results: try: if ( hasattr(scan_result, "scan_metadata") and isinstance(scan_result.scan_metadata, dict) and scan_result.scan_metadata.get("directory_scan") and "directory_files_info" in scan_result.scan_metadata ): # For directory scans, use the file count from directory_files_info files_info = scan_result.scan_metadata.get( "directory_files_info", [] ) if isinstance(files_info, list): if scan_result.scan_metadata.get("semgrep_scan_success", False): total_files_processed += len(files_info) else: total_files_failed += len(files_info) else: logger.debug( f"directory_files_info is not a list in semgrep summary: {type(files_info)}" ) else: # Handle individual file scans (original logic) if hasattr(scan_result, "scan_metadata") and isinstance( scan_result.scan_metadata, dict ): if scan_result.scan_metadata.get("semgrep_scan_success", False): total_files_processed += 1 elif not scan_result.scan_metadata.get( "semgrep_scan_success", False ) and scan_result.scan_metadata.get( "semgrep_scan_reason" ) not in [ "disabled", "not_available", ]: total_files_failed += 1 except (AttributeError, TypeError) as e: logger.debug(f"Error processing semgrep summary for scan result: {e}") total_threats = sum(f.stats.get("semgrep_threats", 0) for f in scan_results) return { "files_processed": total_files_processed, "files_failed": total_files_failed, "total_threats": total_threats, } def _get_llm_summary( self, scan_results: list[EnhancedScanResult] ) -> dict[str, Any]: """Get LLM scanner execution summary from scan results. Args: scan_results: List of enhanced scan results Returns: Dictionary with LLM scanner execution summary """ # Check if this is a directory scan with file information total_files_processed = 0 total_files_failed = 0 for scan_result in scan_results: try: if ( hasattr(scan_result, "scan_metadata") and isinstance(scan_result.scan_metadata, dict) and scan_result.scan_metadata.get("directory_scan") and "directory_files_info" in scan_result.scan_metadata ): # For directory scans, use the file count from directory_files_info files_info = scan_result.scan_metadata.get( "directory_files_info", [] ) if isinstance(files_info, list): if scan_result.scan_metadata.get("llm_scan_success", False): total_files_processed += len(files_info) elif not scan_result.scan_metadata.get( "llm_scan_success", False ) and scan_result.scan_metadata.get("llm_scan_reason") not in [ "disabled", "not_available", ]: total_files_failed += len(files_info) else: logger.debug( f"directory_files_info is not a list in llm summary: {type(files_info)}" ) else: # Handle individual file scans (original logic) if hasattr(scan_result, "scan_metadata") and isinstance( scan_result.scan_metadata, dict ): if scan_result.scan_metadata.get("llm_scan_success", False): total_files_processed += 1 elif not scan_result.scan_metadata.get( "llm_scan_success", False ) and scan_result.scan_metadata.get("llm_scan_reason") not in [ "disabled", "not_available", ]: total_files_failed += 1 except (AttributeError, TypeError) as e: logger.debug(f"Error processing llm summary for scan result: {e}") total_threats = sum(f.stats.get("llm_threats", 0) for f in scan_results) return { "files_processed": total_files_processed, "files_failed": total_files_failed, "total_threats": total_threats, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server