Skip to main content
Glama

Smart Code Search MCP Server

init_agents.py16.2 kB
#!/usr/bin/env python3 """ Initialize Agent Templates in Database Loads prebuilt agent templates and orchestration flows """ import sys import json from pathlib import Path from datetime import datetime # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from src.core.db_wrapper import ThreadSafeDB from src.core.agent_templates import AgentTemplateManager def init_agents(): """Initialize all agent templates and orchestration flows""" print("Initializing agent templates...") # Connect to database db_path = Path(".claude-symbols/search.db") if not db_path.exists(): print(f"Error: Database not found at {db_path}") print("Please run the indexing first to create the database") return False db = ThreadSafeDB(str(db_path)) manager = AgentTemplateManager(db) # The agent templates are already loaded by AgentTemplateManager # in its __init__ method (load_prebuilt_templates) print(f"Loaded {len(manager.templates)} agent templates") # Add additional orchestration-specific agents additional_agents = [ { 'name': 'review_aggregator', 'type': 'analyzer', 'description': 'Aggregate results from multiple analysis agents', 'template_code': ''' def analyze(inputs: dict) -> dict: """Aggregate results from multiple agents into a unified review""" aggregated = { 'summary': {}, 'issues': [], 'suggestions': [], 'metrics': {}, 'quality_score': 100 } # Process each agent result for key, value in inputs.items(): if not key.endswith('_result') or not isinstance(value, dict): continue agent_name = key.replace('_result', '') # Handle import analyzer results if 'unused' in value: unused_count = len(value.get('unused', [])) if unused_count > 0: aggregated['issues'].append({ 'type': 'unused_imports', 'severity': 'low', 'count': unused_count, 'items': value['unused'][:5] # First 5 examples }) aggregated['quality_score'] -= unused_count * 2 # Handle complexity results if 'complexity' in value: complexity = value['complexity'] if complexity > 10: aggregated['issues'].append({ 'type': 'high_complexity', 'severity': 'high' if complexity > 20 else 'medium', 'value': complexity }) aggregated['quality_score'] -= min(20, complexity - 10) # Handle test gap results if 'untested' in value: untested_count = len(value.get('untested', [])) if untested_count > 0: coverage = value.get('coverage_percent', 0) aggregated['issues'].append({ 'type': 'missing_tests', 'severity': 'high' if coverage < 50 else 'medium', 'count': untested_count, 'coverage': coverage }) aggregated['quality_score'] -= max(30, 100 - coverage) / 2 # Handle duplicate detection if 'duplicates' in value: dup_count = len(value.get('duplicates', [])) if dup_count > 0: aggregated['issues'].append({ 'type': 'code_duplication', 'severity': 'medium', 'count': dup_count, 'lines': value.get('total_duplicate_lines', 0) }) aggregated['quality_score'] -= dup_count * 5 # Collect metrics for metric_key in ['count', 'complexity', 'coverage_percent']: if metric_key in value: aggregated['metrics'][f"{agent_name}_{metric_key}"] = value[metric_key] # Ensure quality score doesn't go negative aggregated['quality_score'] = max(0, aggregated['quality_score']) # Generate summary issue_counts = {'high': 0, 'medium': 0, 'low': 0} for issue in aggregated['issues']: severity = issue.get('severity', 'low') issue_counts[severity] += 1 aggregated['summary'] = { 'quality_score': aggregated['quality_score'], 'total_issues': len(aggregated['issues']), 'high_severity': issue_counts['high'], 'medium_severity': issue_counts['medium'], 'low_severity': issue_counts['low'], 'grade': 'A' if aggregated['quality_score'] >= 90 else 'B' if aggregated['quality_score'] >= 80 else 'C' if aggregated['quality_score'] >= 70 else 'D' if aggregated['quality_score'] >= 60 else 'F' } # Generate suggestions based on issues if issue_counts['high'] > 0: aggregated['suggestions'].append("Priority: Address high-severity issues first") if any(i['type'] == 'missing_tests' for i in aggregated['issues']): aggregated['suggestions'].append("Add tests for uncovered functions") if any(i['type'] == 'high_complexity' for i in aggregated['issues']): aggregated['suggestions'].append("Refactor complex functions into smaller units") if any(i['type'] == 'code_duplication' for i in aggregated['issues']): aggregated['suggestions'].append("Extract common code into reusable functions") return aggregated ''', 'input_schema': {'type': 'object'}, 'output_schema': {'type': 'object'} }, { 'name': 'priority_ranker', 'type': 'analyzer', 'description': 'Rank and prioritize issues by impact', 'template_code': ''' def analyze(inputs: dict) -> dict: """Rank issues by priority and impact""" issues = [] # Collect all issues from inputs for key, value in inputs.items(): if not isinstance(value, dict): continue # Extract issues from review aggregator if 'issues' in value: issues.extend(value['issues']) # Score each issue for issue in issues: score = 0 # Severity scoring severity_scores = {'high': 30, 'medium': 20, 'low': 10} score += severity_scores.get(issue.get('severity', 'low'), 0) # Type-specific scoring if issue['type'] == 'missing_tests': score += 25 # Tests are critical elif issue['type'] == 'high_complexity': score += 20 # Complexity affects maintainability elif issue['type'] == 'code_duplication': score += 15 # Duplication increases maintenance burden elif issue['type'] == 'unused_imports': score += 5 # Minor but easy to fix # Add count/value modifiers if 'count' in issue: score += min(10, issue['count']) if 'value' in issue and issue['type'] == 'high_complexity': score += min(15, issue['value'] - 10) issue['priority_score'] = score # Sort by priority issues.sort(key=lambda x: x['priority_score'], reverse=True) # Generate action plan action_plan = [] for i, issue in enumerate(issues[:5], 1): action = { 'order': i, 'type': issue['type'], 'severity': issue['severity'], 'action': '' } if issue['type'] == 'missing_tests': action['action'] = f"Write tests for {issue.get('count', 'uncovered')} functions" elif issue['type'] == 'high_complexity': action['action'] = f"Refactor function with complexity {issue.get('value', 'N/A')}" elif issue['type'] == 'code_duplication': action['action'] = f"Extract {issue.get('lines', 'duplicated')} lines into reusable function" elif issue['type'] == 'unused_imports': action['action'] = f"Remove {issue.get('count', 'unused')} unused imports" action_plan.append(action) return { 'ranked_issues': issues, 'action_plan': action_plan, 'top_priority': issues[0] if issues else None, 'total_issues': len(issues), 'estimated_effort': sum(i['priority_score'] for i in issues) } ''', 'input_schema': {'type': 'object'}, 'output_schema': {'type': 'object'} }, { 'name': 'debt_calculator', 'type': 'analyzer', 'description': 'Calculate technical debt and ROI for fixes', 'template_code': ''' def analyze(inputs: dict) -> dict: """Calculate technical debt metrics and ROI""" debt_items = [] total_debt_hours = 0 # Process results from other agents for key, value in inputs.items(): if not isinstance(value, dict): continue # Calculate debt from complexity if 'complexity' in value: complexity = value['complexity'] if complexity > 10: hours = (complexity - 10) * 0.5 # 30 min per complexity point over 10 debt_items.append({ 'type': 'complexity', 'hours': hours, 'cost': hours * 100, # $100/hour 'priority': 'high' if complexity > 20 else 'medium' }) total_debt_hours += hours # Calculate debt from missing tests if 'untested' in value: untested_count = len(value.get('untested', [])) if untested_count > 0: hours = untested_count * 0.25 # 15 min per test debt_items.append({ 'type': 'missing_tests', 'hours': hours, 'cost': hours * 100, 'priority': 'high', 'count': untested_count }) total_debt_hours += hours # Calculate debt from duplicates if 'duplicates' in value: dup_count = len(value.get('duplicates', [])) if dup_count > 0: hours = dup_count * 0.5 # 30 min per duplicate to refactor debt_items.append({ 'type': 'duplication', 'hours': hours, 'cost': hours * 100, 'priority': 'medium', 'count': dup_count }) total_debt_hours += hours # Calculate debt from unused imports if 'unused' in value: unused_count = len(value.get('unused', [])) if unused_count > 0: hours = unused_count * 0.05 # 3 min per import debt_items.append({ 'type': 'unused_imports', 'hours': hours, 'cost': hours * 100, 'priority': 'low', 'count': unused_count }) total_debt_hours += hours # Sort by ROI (priority/hours) for item in debt_items: priority_value = {'high': 3, 'medium': 2, 'low': 1}[item['priority']] item['roi'] = priority_value / max(0.1, item['hours']) debt_items.sort(key=lambda x: x['roi'], reverse=True) # Calculate summary metrics total_cost = sum(item['cost'] for item in debt_items) high_priority_hours = sum(item['hours'] for item in debt_items if item['priority'] == 'high') return { 'debt_items': debt_items, 'total_debt_hours': round(total_debt_hours, 1), 'total_debt_cost': round(total_cost), 'high_priority_hours': round(high_priority_hours, 1), 'quick_wins': [item for item in debt_items if item['hours'] < 0.5][:3], 'recommendation': 'Start with quick wins' if any(item['hours'] < 0.5 for item in debt_items) else 'Focus on high-priority items' if high_priority_hours > 0 else 'Minimal technical debt detected' } ''', 'input_schema': {'type': 'object'}, 'output_schema': {'type': 'object'} } ] # Register additional agents registered = 0 for agent_data in additional_agents: from src.core.agent_templates import AgentTemplate agent = AgentTemplate( name=agent_data['name'], agent_type=agent_data['type'], description=agent_data['description'], template_code=agent_data['template_code'], input_schema=agent_data['input_schema'], output_schema=agent_data['output_schema'] ) if agent.name not in manager.templates: manager.register_template(agent) registered += 1 print(f" Registered: {agent.name}") print(f"\nRegistered {registered} additional agents") print(f"Total agents available: {len(manager.templates)}") # List all available agents print("\nAvailable agents by type:") agents_by_type = {} for agent in manager.templates.values(): if agent.type not in agents_by_type: agents_by_type[agent.type] = [] agents_by_type[agent.type].append(agent.name) for agent_type, names in sorted(agents_by_type.items()): print(f" {agent_type}: {', '.join(sorted(names))}") # Initialize orchestration flows print("\nInitializing orchestration flows...") flows = [ { 'name': 'instant_review', 'description': 'Multi-agent code review (920/1000 value)', 'agent_sequence': json.dumps(['review_aggregator', 'priority_ranker']), 'parallel_groups': json.dumps([ ['import_analyzer', 'complexity_analyzer', 'function_extractor', 'test_gap_finder'] ]), 'cache_strategy': 'aggressive' }, { 'name': 'debt_orchestrator', 'description': 'Technical debt analysis (900/1000 value)', 'agent_sequence': json.dumps(['debt_calculator']), 'parallel_groups': json.dumps([ ['complexity_analyzer', 'duplicate_detector', 'test_gap_finder'] ]), 'cache_strategy': 'aggressive' }, { 'name': 'test_gap_analyzer', 'description': 'Find untested code (880/1000 value)', 'agent_sequence': json.dumps(['function_extractor', 'test_gap_finder', 'test_generator']), 'parallel_groups': json.dumps([]), 'cache_strategy': 'conservative' }, { 'name': 'import_optimizer', 'description': 'Optimize imports (650/1000 value)', 'agent_sequence': json.dumps(['import_analyzer']), 'parallel_groups': json.dumps([]), 'cache_strategy': 'aggressive' } ] # Register flows in database with db.get_connection() as conn: for flow in flows: conn.execute(""" INSERT OR REPLACE INTO orchestration_flows (name, description, agent_sequence, parallel_groups, cache_strategy, created_at) VALUES (?, ?, ?, ?, ?, ?) """, ( flow['name'], flow['description'], flow['agent_sequence'], flow['parallel_groups'], flow['cache_strategy'], datetime.now().isoformat() )) conn.commit() print(f"Registered {len(flows)} orchestration flows") print("\n✅ Agent initialization complete!") print("\nNext steps:") print("1. Update src/server.py to add orchestrated tools") print("2. Test with: python3 -c \"from src.core.orchestrator import OrchestrationEngine; ...\"") return True if __name__ == "__main__": success = init_agents() sys.exit(0 if success else 1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stevenjjobson/scs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server