Skip to main content
Glama
selection.py12.5 kB
"""Expert selection engine with technology detection and scoring.""" import json import asyncio from pathlib import Path from typing import Dict, List, Optional, Tuple from cachetools import TTLCache import aiofiles from .models import ( Expert, TechnologyDetectionResult, ExpertScore, ExpertSelectionResult, TaskType ) from .registry import RegistryManager class SelectionEngine: """Engine for technology detection and expert selection.""" # Scoring weights WEIGHTS = { "technology_match": 0.35, "workflow_compatibility": 0.30, "performance_history": 0.25, "capability_assessment": 0.10 } def __init__(self, registry_manager: RegistryManager, cache_ttl: int = 300): self.registry_manager = registry_manager self.cache = TTLCache(maxsize=100, ttl=cache_ttl) async def detect_technologies( self, scan_paths: List[str], include_content: bool = False ) -> TechnologyDetectionResult: """Detect technologies from file paths and optionally content.""" cache_key = f"tech:{':'.join(scan_paths)}:{include_content}" if cache_key in self.cache: return self.cache[cache_key] technologies = set() frameworks = set() for scan_path in scan_paths: path = Path(scan_path) # Check package.json if path.name == "package.json" and path.exists(): await self._detect_from_package_json(path, technologies, frameworks) # Check requirements.txt elif path.name == "requirements.txt" and path.exists(): await self._detect_from_requirements(path, technologies, frameworks) # Check directory elif path.is_dir() and path.exists(): await self._detect_from_directory(path, technologies, frameworks) result = TechnologyDetectionResult( technologies=sorted(list(technologies)), frameworks=sorted(list(frameworks)), confidence=0.8 if technologies else 0.3 ) self.cache[cache_key] = result return result async def _detect_from_package_json( self, path: Path, technologies: set, frameworks: set ): """Detect from package.json file.""" try: async with aiofiles.open(path, 'r') as f: content = await f.read() data = json.loads(content) deps = {**data.get('dependencies', {}), **data.get('devDependencies', {})} # AWS technologies if '@aws-amplify/backend' in deps: technologies.add('AWS Amplify Gen 2') frameworks.add('AWS Amplify') if '@cloudscape-design/components' in deps: technologies.add('AWS Cloudscape') frameworks.add('Cloudscape Design System') if '@aws-sdk/client-dynamodb' in deps: technologies.add('AWS DynamoDB') frameworks.add('AWS SDK') # Frontend frameworks if 'react' in deps: technologies.add('React') frameworks.add('React') if 'vue' in deps: technologies.add('Vue.js') frameworks.add('Vue') if '@angular/core' in deps: technologies.add('Angular') frameworks.add('Angular') # Backend frameworks if 'express' in deps: technologies.add('Express.js') frameworks.add('Express') if 'fastify' in deps: technologies.add('Fastify') frameworks.add('Fastify') except Exception: pass async def _detect_from_requirements( self, path: Path, technologies: set, frameworks: set ): """Detect from requirements.txt file.""" try: async with aiofiles.open(path, 'r') as f: content = await f.read() lines = content.lower().split('\n') # Python frameworks if any('django' in line for line in lines): technologies.add('Django') frameworks.add('Django') if any('flask' in line for line in lines): technologies.add('Flask') frameworks.add('Flask') if any('fastapi' in line for line in lines): technologies.add('FastAPI') frameworks.add('FastAPI') if any('boto3' in line for line in lines): technologies.add('AWS SDK Python') frameworks.add('Boto3') except Exception: pass async def _detect_from_directory( self, path: Path, technologies: set, frameworks: set ): """Detect from directory structure.""" try: entries = list(path.iterdir()) # Check file extensions extensions = {f.suffix for f in entries if f.is_file()} if '.tsx' in extensions or '.jsx' in extensions: technologies.add('React') if '.vue' in extensions: technologies.add('Vue.js') if '.py' in extensions: technologies.add('Python') if '.ts' in extensions or '.js' in extensions: technologies.add('TypeScript/JavaScript') # Check for specific files/folders names = {f.name for f in entries} if 'amplify' in names or 'amplify.yml' in names: technologies.add('AWS Amplify') if 'serverless.yml' in names: technologies.add('Serverless Framework') if 'docker-compose.yml' in names: technologies.add('Docker') except Exception: pass async def select_optimal_expert( self, task_description: str, technologies: Optional[List[str]] = None, task_type: Optional[TaskType] = None, strategy: str = "single" ) -> ExpertSelectionResult: """Select the best expert for a task.""" # Get all experts experts = await self.registry_manager.list_experts(include_metrics=True) # Score each expert scores = [] for expert in experts: score = await self._score_expert( expert, task_description, technologies or [], task_type or TaskType.GENERAL ) scores.append((expert, score)) # Sort by total score scores.sort(key=lambda x: x[1].total_score, reverse=True) if not scores: raise ValueError("No experts available") # Select based on strategy winner_expert, winner_score = scores[0] return ExpertSelectionResult( expert=winner_expert, score=winner_score, reasoning=self._generate_reasoning(winner_expert, winner_score), alternatives=[score for _, score in scores[1:4]], # Top 3 alternatives selection_strategy=strategy ) async def _score_expert( self, expert: Expert, task_description: str, technologies: List[str], task_type: TaskType ) -> ExpertScore: """Score an expert for a specific task.""" score = ExpertScore(expert_id=expert.id) # Technology match (35%) if technologies: tech_matches = sum( 1 for tech in technologies if any( tech.lower() in spec.technology.lower() for spec in expert.specializations ) ) score.technology_match = (tech_matches / len(technologies)) * self.WEIGHTS["technology_match"] else: score.technology_match = 0.5 * self.WEIGHTS["technology_match"] # Workflow compatibility (30%) workflow_score = expert.workflow_compatibility.get(task_type.value, 0.5) score.workflow_compatibility = workflow_score * self.WEIGHTS["workflow_compatibility"] # Performance history (25%) if expert.performance_metrics: if expert.performance_metrics.total_applications > 0: success_rate = ( expert.performance_metrics.successful_applications / expert.performance_metrics.total_applications ) score.performance_history = success_rate * self.WEIGHTS["performance_history"] else: score.performance_history = 0.5 * self.WEIGHTS["performance_history"] else: score.performance_history = 0.5 * self.WEIGHTS["performance_history"] # Capability assessment (10%) # For now, default high capability score.capability_assessment = 0.8 * self.WEIGHTS["capability_assessment"] # Calculate total score.total_score = ( score.technology_match + score.workflow_compatibility + score.performance_history + score.capability_assessment ) return score def _generate_reasoning(self, expert: Expert, score: ExpertScore) -> str: """Generate human-readable reasoning for selection.""" reasons = [ f"Selected {expert.name} (score: {score.total_score:.2f})", f"Technology match: {score.technology_match/self.WEIGHTS['technology_match']:.0%}", f"Workflow compatibility: {score.workflow_compatibility/self.WEIGHTS['workflow_compatibility']:.0%}" ] if expert.performance_metrics and expert.performance_metrics.total_applications > 0: success_rate = ( expert.performance_metrics.successful_applications / expert.performance_metrics.total_applications ) reasons.append(f"Success rate: {success_rate:.0%} from {expert.performance_metrics.total_applications} applications") return " | ".join(reasons) async def assess_capability( self, expert_id: str, task_description: str, constraints: Optional[Dict] = None ) -> float: """Assess expert capability for a specific task.""" # For now, return a high default score # In production, this could query an LLM or use more sophisticated analysis return 0.85 async def find_expert_combinations( self, requirements: List[str], team_size: int = 2 ) -> List[Tuple[List[Expert], float]]: """Find complementary expert combinations.""" experts = await self.registry_manager.list_experts() # Simple implementation: find experts that together cover all requirements combinations = [] # This is a simplified version - in production you'd want more sophisticated # combination algorithms for i, expert1 in enumerate(experts): for expert2 in experts[i+1:]: if team_size == 2: coverage = self._calculate_coverage([expert1, expert2], requirements) if coverage > 0.8: combinations.append(([expert1, expert2], coverage)) combinations.sort(key=lambda x: x[1], reverse=True) return combinations[:3] def _calculate_coverage(self, experts: List[Expert], requirements: List[str]) -> float: """Calculate how well a team of experts covers requirements.""" covered = set() for expert in experts: for spec in expert.specializations: for req in requirements: if req.lower() in spec.technology.lower(): covered.add(req) return len(covered) / len(requirements) if requirements else 0.0

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/agentience/expert-registry-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server