Skip to main content
Glama

Grants Search MCP Server

""" Adaptive Testing Agent Orchestrator This module implements the main orchestration system for the adaptive testing agent that continuously evolves with the codebase. It coordinates all testing agents, monitors code changes, and manages the testing lifecycle. """ import asyncio import json import logging import time from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple from dataclasses import dataclass, asdict from enum import Enum import hashlib import ast from testing.risk.risk_analyzer import RiskAnalyzer, RiskLevel from testing.compliance.checker import ComplianceChecker from testing.audit.trail_manager import AuditTrailManager from testing.generators.test_generator import TestCaseGenerator logger = logging.getLogger(__name__) class TestingPhase(Enum): """Testing phases in the adaptive pipeline.""" DISCOVERY = "discovery" GENERATION = "generation" EXECUTION = "execution" ANALYSIS = "analysis" OPTIMIZATION = "optimization" @dataclass class CodeChangeEvent: """Represents a code change event that triggers adaptive testing.""" file_path: str change_type: str # added, modified, deleted timestamp: datetime file_hash: str complexity_score: float affected_modules: List[str] test_requirements: List[str] @dataclass class TestGenerationRequest: """Request for test case generation.""" source_file: str test_category: str priority: int complexity_metrics: Dict[str, float] dependencies: List[str] business_context: str @dataclass class AdaptiveTestSession: """Represents a complete adaptive testing session.""" session_id: str start_time: datetime end_time: Optional[datetime] phase: TestingPhase code_changes: List[CodeChangeEvent] generated_tests: List[str] execution_results: Dict[str, Any] risk_scores: Dict[str, float] compliance_status: Dict[str, bool] class AdaptiveTestingOrchestrator: """ Main orchestrator for the adaptive testing agent system. Coordinates continuous monitoring, test generation, risk assessment, and compliance checking in response to code changes. """ def __init__(self, project_root: Path, config: Dict[str, Any]): """Initialize the orchestrator.""" self.project_root = project_root self.config = config # Initialize components self.risk_analyzer = RiskAnalyzer(config.get("risk_config", {})) self.compliance_checker = ComplianceChecker(config.get("compliance_config", {})) self.audit_manager = AuditTrailManager(project_root / "testing" / "audit") self.test_generator = TestCaseGenerator(project_root, config.get("generation_config", {})) # Session management self.current_session: Optional[AdaptiveTestSession] = None self.session_history: List[AdaptiveTestSession] = [] # File monitoring self.monitored_files: Dict[str, str] = {} # path -> hash self.file_dependencies: Dict[str, Set[str]] = {} self.test_cache: Dict[str, datetime] = {} # Performance tracking self.performance_metrics = { "test_generation_time": [], "execution_time": [], "risk_analysis_time": [], "compliance_check_time": [] } logger.info("Initialized Adaptive Testing Orchestrator") async def start_continuous_monitoring(self) -> None: """Start continuous code monitoring and adaptive testing.""" logger.info("Starting continuous monitoring for adaptive testing") # Initial code scan await self._perform_initial_scan() # Start monitoring loop while True: try: await self._monitoring_cycle() await asyncio.sleep(self.config.get("monitoring_interval", 30)) except Exception as e: logger.error(f"Error in monitoring cycle: {e}") await asyncio.sleep(60) # Wait longer on error async def _perform_initial_scan(self) -> None: """Perform initial scan of the codebase.""" logger.info("Performing initial codebase scan") # Scan source files src_files = list(self.project_root.glob("src/**/*.py")) + list(self.project_root.glob("src/**/*.ts")) for file_path in src_files: await self._analyze_file(file_path, is_initial=True) # Build dependency graph await self._build_dependency_graph() logger.info(f"Initial scan complete: {len(src_files)} files analyzed") async def _monitoring_cycle(self) -> None: """Execute one monitoring cycle.""" start_time = time.time() # Detect changes changes = await self._detect_code_changes() if changes: # Start new testing session session = await self._start_testing_session(changes) try: # Execute adaptive testing pipeline await self._execute_testing_pipeline(session) # Complete session session.end_time = datetime.now() session.phase = TestingPhase.ANALYSIS self.session_history.append(session) # Audit logging await self.audit_manager.log_session(session) except Exception as e: logger.error(f"Error in testing pipeline: {e}") session.phase = TestingPhase.ANALYSIS await self.audit_manager.log_error(str(e), session.session_id) finally: self.current_session = None cycle_time = time.time() - start_time logger.debug(f"Monitoring cycle completed in {cycle_time:.2f}s") async def _detect_code_changes(self) -> List[CodeChangeEvent]: """Detect changes in monitored files.""" changes = [] # Check existing files for modifications for file_path_str, old_hash in self.monitored_files.items(): file_path = Path(file_path_str) if file_path.exists(): new_hash = await self._calculate_file_hash(file_path) if new_hash != old_hash: change = await self._create_change_event(file_path, "modified") changes.append(change) self.monitored_files[file_path_str] = new_hash else: # File was deleted change = CodeChangeEvent( file_path=file_path_str, change_type="deleted", timestamp=datetime.now(), file_hash="", complexity_score=0.0, affected_modules=[], test_requirements=["cleanup"] ) changes.append(change) del self.monitored_files[file_path_str] # Check for new files src_files = list(self.project_root.glob("src/**/*.py")) + list(self.project_root.glob("src/**/*.ts")) for file_path in src_files: file_path_str = str(file_path) if file_path_str not in self.monitored_files: change = await self._create_change_event(file_path, "added") changes.append(change) self.monitored_files[file_path_str] = await self._calculate_file_hash(file_path) return changes async def _create_change_event(self, file_path: Path, change_type: str) -> CodeChangeEvent: """Create a code change event from a file change.""" file_hash = await self._calculate_file_hash(file_path) complexity_score = await self._calculate_complexity(file_path) affected_modules = await self._get_affected_modules(file_path) test_requirements = await self._determine_test_requirements(file_path, change_type) return CodeChangeEvent( file_path=str(file_path), change_type=change_type, timestamp=datetime.now(), file_hash=file_hash, complexity_score=complexity_score, affected_modules=affected_modules, test_requirements=test_requirements ) async def _start_testing_session(self, changes: List[CodeChangeEvent]) -> AdaptiveTestSession: """Start a new adaptive testing session.""" session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}" session = AdaptiveTestSession( session_id=session_id, start_time=datetime.now(), end_time=None, phase=TestingPhase.DISCOVERY, code_changes=changes, generated_tests=[], execution_results={}, risk_scores={}, compliance_status={} ) self.current_session = session logger.info(f"Started testing session {session_id} with {len(changes)} changes") return session async def _execute_testing_pipeline(self, session: AdaptiveTestSession) -> None: """Execute the complete adaptive testing pipeline.""" # Phase 1: Discovery and Risk Analysis session.phase = TestingPhase.DISCOVERY await self._discovery_phase(session) # Phase 2: Test Generation session.phase = TestingPhase.GENERATION await self._generation_phase(session) # Phase 3: Test Execution session.phase = TestingPhase.EXECUTION await self._execution_phase(session) # Phase 4: Analysis and Optimization session.phase = TestingPhase.OPTIMIZATION await self._optimization_phase(session) async def _discovery_phase(self, session: AdaptiveTestSession) -> None: """Execute discovery phase: risk analysis and compliance checking.""" logger.info(f"Executing discovery phase for session {session.session_id}") start_time = time.time() for change in session.code_changes: # Risk analysis risk_score = await self.risk_analyzer.analyze_change(change) session.risk_scores[change.file_path] = risk_score.overall_score # Compliance checking compliance_results = await self.compliance_checker.check_file(Path(change.file_path)) session.compliance_status[change.file_path] = compliance_results.is_compliant # Log critical issues if risk_score.level in [RiskLevel.HIGH, RiskLevel.CRITICAL]: await self.audit_manager.log_high_risk_change(change, risk_score) discovery_time = time.time() - start_time self.performance_metrics["risk_analysis_time"].append(discovery_time) logger.info(f"Discovery phase completed in {discovery_time:.2f}s") async def _generation_phase(self, session: AdaptiveTestSession) -> None: """Execute test generation phase.""" logger.info(f"Executing generation phase for session {session.session_id}") start_time = time.time() # Generate test requests based on changes and risk scores test_requests = [] for change in session.code_changes: risk_score = session.risk_scores.get(change.file_path, 0.5) priority = self._calculate_test_priority(risk_score, change.complexity_score) for requirement in change.test_requirements: request = TestGenerationRequest( source_file=change.file_path, test_category=requirement, priority=priority, complexity_metrics={"complexity": change.complexity_score, "risk": risk_score}, dependencies=list(change.affected_modules), business_context=self._get_business_context(change.file_path) ) test_requests.append(request) # Generate tests for request in sorted(test_requests, key=lambda x: x.priority, reverse=True): try: test_files = await self.test_generator.generate_tests(request) session.generated_tests.extend(test_files) except Exception as e: logger.error(f"Error generating tests for {request.source_file}: {e}") generation_time = time.time() - start_time self.performance_metrics["test_generation_time"].append(generation_time) logger.info(f"Generated {len(session.generated_tests)} test files in {generation_time:.2f}s") async def _execution_phase(self, session: AdaptiveTestSession) -> None: """Execute generated tests.""" logger.info(f"Executing tests for session {session.session_id}") start_time = time.time() if not session.generated_tests: logger.warning("No tests generated for execution") return # Execute tests using pytest import subprocess try: # Run tests with parallel execution cmd = [ "python", "-m", "pytest", "-v", "--tb=short", "--junit-xml=test-results/adaptive-session.xml", "--cov=src", "--cov-report=json:test-results/coverage-adaptive.json", "-n", "auto", # parallel execution "--durations=10" ] + session.generated_tests result = subprocess.run( cmd, cwd=self.project_root, capture_output=True, text=True, timeout=600 # 10 minute timeout ) session.execution_results = { "return_code": result.returncode, "stdout": result.stdout, "stderr": result.stderr, "test_count": len(session.generated_tests) } except subprocess.TimeoutExpired: session.execution_results = { "return_code": -1, "error": "Test execution timeout", "test_count": len(session.generated_tests) } execution_time = time.time() - start_time self.performance_metrics["execution_time"].append(execution_time) logger.info(f"Test execution completed in {execution_time:.2f}s") async def _optimization_phase(self, session: AdaptiveTestSession) -> None: """Execute optimization phase: analyze results and improve.""" logger.info(f"Executing optimization phase for session {session.session_id}") # Analyze test results await self._analyze_test_results(session) # Update test cache and dependencies await self._update_test_metadata(session) # Generate recommendations recommendations = await self._generate_recommendations(session) # Store optimization data optimization_data = { "session_id": session.session_id, "recommendations": recommendations, "performance_metrics": self._get_session_metrics(session), "improvement_opportunities": await self._identify_improvements(session) } await self.audit_manager.log_optimization_results(optimization_data) logger.info("Optimization phase completed") async def _calculate_file_hash(self, file_path: Path) -> str: """Calculate hash of file contents.""" try: content = file_path.read_text(encoding='utf-8') return hashlib.md5(content.encode()).hexdigest() except Exception as e: logger.error(f"Error calculating hash for {file_path}: {e}") return "" async def _calculate_complexity(self, file_path: Path) -> float: """Calculate complexity score for a file.""" try: if file_path.suffix == ".py": return await self._calculate_python_complexity(file_path) elif file_path.suffix == ".ts": return await self._calculate_typescript_complexity(file_path) else: return 1.0 # Default complexity except Exception as e: logger.error(f"Error calculating complexity for {file_path}: {e}") return 1.0 async def _calculate_python_complexity(self, file_path: Path) -> float: """Calculate complexity for Python files using AST analysis.""" try: content = file_path.read_text() tree = ast.parse(content) complexity_score = 1.0 # Base complexity # Count various complexity factors for node in ast.walk(tree): if isinstance(node, (ast.If, ast.While, ast.For, ast.Try, ast.With)): complexity_score += 0.2 elif isinstance(node, ast.FunctionDef): complexity_score += 0.3 elif isinstance(node, ast.ClassDef): complexity_score += 0.5 elif isinstance(node, ast.AsyncFunctionDef): complexity_score += 0.4 return min(complexity_score, 10.0) # Cap at 10.0 except Exception as e: logger.error(f"Error analyzing Python complexity: {e}") return 1.0 async def _calculate_typescript_complexity(self, file_path: Path) -> float: """Calculate complexity for TypeScript files (simplified).""" try: content = file_path.read_text() complexity_score = 1.0 # Simple heuristics for TypeScript complexity_score += content.count("function") * 0.3 complexity_score += content.count("class") * 0.5 complexity_score += content.count("if") * 0.2 complexity_score += content.count("for") * 0.2 complexity_score += content.count("while") * 0.2 complexity_score += content.count("try") * 0.3 complexity_score += content.count("async") * 0.2 return min(complexity_score, 10.0) except Exception as e: logger.error(f"Error analyzing TypeScript complexity: {e}") return 1.0 def _calculate_test_priority(self, risk_score: float, complexity_score: float) -> int: """Calculate test generation priority based on risk and complexity.""" # Priority scale: 1 (lowest) to 10 (highest) base_priority = 5 # Risk factor (0-5 points) risk_points = min(risk_score * 5, 5) # Complexity factor (0-3 points) complexity_points = min(complexity_score / 10 * 3, 3) # Business criticality (0-2 points) business_points = 2 # Default high priority for grants domain total_priority = base_priority + risk_points + complexity_points + business_points return min(int(total_priority), 10) def _get_business_context(self, file_path: str) -> str: """Determine business context for a file.""" if "grants" in file_path.lower(): return "grants_processing" elif "api" in file_path.lower(): return "api_integration" elif "financial" in file_path.lower(): return "financial_calculations" elif "search" in file_path.lower(): return "search_functionality" else: return "general" async def _get_affected_modules(self, file_path: Path) -> List[str]: """Get modules that might be affected by changes to this file.""" # This would use the dependency graph built during initialization file_str = str(file_path) return list(self.file_dependencies.get(file_str, set())) async def _determine_test_requirements(self, file_path: Path, change_type: str) -> List[str]: """Determine what types of tests are needed.""" requirements = [] if change_type == "added": requirements = ["unit", "integration"] elif change_type == "modified": requirements = ["unit", "regression"] elif change_type == "deleted": requirements = ["cleanup"] # Add domain-specific requirements file_str = str(file_path).lower() if "api" in file_str: requirements.append("contract") if "financial" in file_str or "grant" in file_str: requirements.append("compliance") if "performance" in file_str or "analytics" in file_str: requirements.append("performance") return requirements async def _build_dependency_graph(self) -> None: """Build dependency graph for the codebase.""" logger.info("Building dependency graph") # This is a simplified implementation # In practice, you'd use AST analysis to build a proper dependency graph src_files = list(self.project_root.glob("src/**/*.py")) for file_path in src_files: try: content = file_path.read_text() file_str = str(file_path) dependencies = set() # Simple import analysis for line in content.split('\n'): line = line.strip() if line.startswith(('import ', 'from ')): # Extract import information if 'mcp_server' in line: dependencies.add(line.split()[1]) self.file_dependencies[file_str] = dependencies except Exception as e: logger.error(f"Error analyzing dependencies for {file_path}: {e}") async def _analyze_file(self, file_path: Path, is_initial: bool = False) -> None: """Analyze a single file.""" file_hash = await self._calculate_file_hash(file_path) self.monitored_files[str(file_path)] = file_hash if not is_initial: logger.debug(f"Analyzed file: {file_path}") async def _analyze_test_results(self, session: AdaptiveTestSession) -> None: """Analyze test execution results.""" if "return_code" not in session.execution_results: return # Parse JUnit XML if available junit_file = self.project_root / "test-results" / "adaptive-session.xml" if junit_file.exists(): # Parse test results (simplified) session.execution_results["junit_parsed"] = True async def _update_test_metadata(self, session: AdaptiveTestSession) -> None: """Update test cache and metadata.""" for test_file in session.generated_tests: self.test_cache[test_file] = session.start_time async def _generate_recommendations(self, session: AdaptiveTestSession) -> List[str]: """Generate recommendations for improvement.""" recommendations = [] # Analyze execution results if session.execution_results.get("return_code", 0) != 0: recommendations.append("Review failed tests and fix implementation issues") # Analyze risk scores high_risk_files = [ path for path, score in session.risk_scores.items() if score > 0.7 ] if high_risk_files: recommendations.append(f"Focus on high-risk files: {', '.join(high_risk_files[:3])}") # Analyze compliance non_compliant_files = [ path for path, compliant in session.compliance_status.items() if not compliant ] if non_compliant_files: recommendations.append(f"Address compliance issues in: {', '.join(non_compliant_files[:3])}") return recommendations def _get_session_metrics(self, session: AdaptiveTestSession) -> Dict[str, Any]: """Get performance metrics for the session.""" return { "session_duration": (session.end_time - session.start_time).total_seconds() if session.end_time else 0, "changes_processed": len(session.code_changes), "tests_generated": len(session.generated_tests), "avg_risk_score": sum(session.risk_scores.values()) / len(session.risk_scores) if session.risk_scores else 0, "compliance_rate": sum(session.compliance_status.values()) / len(session.compliance_status) if session.compliance_status else 1.0 } async def _identify_improvements(self, session: AdaptiveTestSession) -> List[str]: """Identify opportunities for improvement.""" improvements = [] # Performance improvements if self.performance_metrics["test_generation_time"]: avg_gen_time = sum(self.performance_metrics["test_generation_time"]) / len(self.performance_metrics["test_generation_time"]) if avg_gen_time > 30: # More than 30 seconds improvements.append("Optimize test generation performance") # Coverage improvements if len(session.generated_tests) == 0: improvements.append("Improve test generation coverage") return improvements async def get_status(self) -> Dict[str, Any]: """Get current status of the orchestrator.""" return { "monitoring_active": self.current_session is not None, "monitored_files": len(self.monitored_files), "sessions_completed": len(self.session_history), "current_session": self.current_session.session_id if self.current_session else None, "performance_metrics": { key: { "count": len(values), "avg": sum(values) / len(values) if values else 0, "max": max(values) if values else 0 } for key, values in self.performance_metrics.items() } } # Configuration factory def create_orchestrator_config() -> Dict[str, Any]: """Create default configuration for the orchestrator.""" return { "monitoring_interval": 30, # seconds "risk_config": { "security_weight": 0.3, "complexity_weight": 0.2, "business_impact_weight": 0.5 }, "compliance_config": { "data_privacy_rules": ["PII", "financial_data"], "api_security_rules": ["authentication", "rate_limiting"], "grants_specific_rules": ["calculation_accuracy", "audit_trail"] }, "generation_config": { "max_tests_per_file": 10, "test_timeout": 30, "parallel_generation": True } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Tar-ive/grants-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server