Skip to main content
Glama

Adversary MCP Server

by brettbergin
benchmark_runner.py10.5 kB
"""Simple benchmark runner for performance testing.""" import asyncio import gc import os import tempfile import time from pathlib import Path import psutil from ..credentials import CredentialManager, get_credential_manager from ..logger import get_logger from ..scanner.llm_scanner import LLMScanner from .results import BenchmarkResult, BenchmarkSummary from .test_scenarios import TestScenarios logger = get_logger("benchmark_runner") class BenchmarkRunner: """Simple performance benchmark runner.""" def __init__(self, credential_manager: CredentialManager | None = None): """Initialize benchmark runner. Args: credential_manager: Optional credential manager for LLM access """ self.credential_manager = credential_manager or get_credential_manager() self.process = psutil.Process() logger.info("BenchmarkRunner initialized") def get_system_info(self) -> dict: """Get basic system information.""" try: return { "cpu_count": psutil.cpu_count(), "memory_total_gb": round(psutil.virtual_memory().total / (1024**3), 2), "python_version": f"{os.sys.version_info.major}.{os.sys.version_info.minor}.{os.sys.version_info.micro}", "platform": os.name, } except Exception as e: logger.warning(f"Failed to get system info: {e}") return {"error": str(e)} async def run_all_benchmarks(self) -> BenchmarkSummary: """Run all standard benchmarks.""" summary = BenchmarkSummary(system_info=self.get_system_info()) # Get all benchmark scenarios scenarios = TestScenarios.get_benchmark_scenarios() for scenario_name, scenario_config in scenarios.items(): logger.info(f"Running benchmark: {scenario_config['name']}") try: if scenario_name == "cache_test": # Special handling for cache test result = await self._run_cache_benchmark( scenario_name, scenario_config ) else: # Standard benchmark result = await self._run_standard_benchmark( scenario_name, scenario_config ) summary.add_result(result) logger.info( f"Completed: {result.name} - {result.duration_seconds:.2f}s" ) except Exception as e: logger.error(f"Benchmark {scenario_name} failed: {e}") error_result = BenchmarkResult( name=scenario_config["name"], duration_seconds=0.0, success=False, error_message=str(e), ) summary.add_result(error_result) return summary async def _run_standard_benchmark( self, scenario_name: str, scenario_config: dict ) -> BenchmarkResult: """Run a standard benchmark scenario.""" # Create temporary directory and test files with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) files = TestScenarios.create_scenario_files(scenario_name, temp_path) # Initialize scanner scanner = LLMScanner(self.credential_manager) # Measure memory before gc.collect() # Force garbage collection memory_start = self.process.memory_info().rss / (1024 * 1024) # MB # Run benchmark start_time = time.time() try: if len(files) == 1: # Single file analysis findings = await scanner.analyze_file(files[0], "python") total_findings = len(findings) else: # Directory analysis findings = await scanner.analyze_directory(temp_path) total_findings = len(findings) success = True error_message = None except Exception as e: success = False total_findings = 0 error_message = str(e) end_time = time.time() duration = end_time - start_time # Measure memory after gc.collect() memory_end = self.process.memory_info().rss / (1024 * 1024) # MB memory_peak = max(memory_start, memory_end) # Get cache stats if available cache_hits = 0 cache_misses = 0 if hasattr(scanner, "cache_manager") and scanner.cache_manager: # This is a simplified way to get cache stats # In a real implementation, we'd need proper cache metrics cache_hits = getattr(scanner.cache_manager, "_cache_hits", 0) cache_misses = getattr(scanner.cache_manager, "_cache_misses", 0) return BenchmarkResult( name=scenario_config["name"], duration_seconds=duration, success=success, files_processed=len(files) if success else 0, findings_count=total_findings, memory_peak_mb=memory_peak, cache_hits=cache_hits, cache_misses=cache_misses, error_message=error_message, metadata={ "scenario": scenario_name, "file_count": len(files), "expected_findings": scenario_config.get("expected_findings", 0), }, ) async def _run_cache_benchmark( self, scenario_name: str, scenario_config: dict ) -> BenchmarkResult: """Run cache performance benchmark with repeated scans.""" with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) files = TestScenarios.create_scenario_files(scenario_name, temp_path) scanner = LLMScanner(self.credential_manager) repeat_count = scenario_config.get("repeat_count", 3) gc.collect() memory_start = self.process.memory_info().rss / (1024 * 1024) start_time = time.time() total_findings = 0 success = True error_message = None try: # Run multiple times to test cache effectiveness for i in range(repeat_count): logger.debug(f"Cache test iteration {i+1}/{repeat_count}") findings = await scanner.analyze_directory(temp_path) total_findings = len(findings) # Keep last count # Small delay between iterations await asyncio.sleep(0.1) except Exception as e: success = False error_message = str(e) end_time = time.time() duration = end_time - start_time gc.collect() memory_end = self.process.memory_info().rss / (1024 * 1024) memory_peak = max(memory_start, memory_end) # Cache stats cache_hits = 0 cache_misses = 0 if hasattr(scanner, "cache_manager") and scanner.cache_manager: cache_hits = getattr(scanner.cache_manager, "_cache_hits", 0) cache_misses = getattr(scanner.cache_manager, "_cache_misses", 0) return BenchmarkResult( name=scenario_config["name"], duration_seconds=duration, success=success, files_processed=len(files) * repeat_count if success else 0, findings_count=total_findings, memory_peak_mb=memory_peak, cache_hits=cache_hits, cache_misses=cache_misses, error_message=error_message, metadata={ "scenario": scenario_name, "file_count": len(files), "repeat_count": repeat_count, "expected_findings": scenario_config.get("expected_findings", 0), }, ) async def run_single_benchmark(self, scenario_name: str) -> BenchmarkResult: """Run a single benchmark scenario.""" scenarios = TestScenarios.get_benchmark_scenarios() if scenario_name not in scenarios: raise ValueError(f"Unknown scenario: {scenario_name}") scenario_config = scenarios[scenario_name] if scenario_name == "cache_test": return await self._run_cache_benchmark(scenario_name, scenario_config) else: return await self._run_standard_benchmark(scenario_name, scenario_config) async def run_custom_benchmark( self, name: str, files: list[Path], description: str = "" ) -> BenchmarkResult: """Run a custom benchmark with provided files.""" scanner = LLMScanner(self.credential_manager) gc.collect() memory_start = self.process.memory_info().rss / (1024 * 1024) start_time = time.time() try: if len(files) == 1: findings = await scanner.analyze_file(files[0], "generic") else: # For multiple files, analyze the directory containing them parent_dir = files[0].parent findings = await scanner.analyze_directory(parent_dir) total_findings = len(findings) success = True error_message = None except Exception as e: success = False total_findings = 0 error_message = str(e) end_time = time.time() duration = end_time - start_time gc.collect() memory_end = self.process.memory_info().rss / (1024 * 1024) memory_peak = max(memory_start, memory_end) return BenchmarkResult( name=name, duration_seconds=duration, success=success, files_processed=len(files) if success else 0, findings_count=total_findings, memory_peak_mb=memory_peak, error_message=error_message, metadata={ "description": description, "file_count": len(files), "custom_benchmark": True, }, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server