init_agents.py•16.2 kB
#!/usr/bin/env python3
"""
Initialize Agent Templates in Database
Loads prebuilt agent templates and orchestration flows
"""
import sys
import json
from pathlib import Path
from datetime import datetime
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.core.db_wrapper import ThreadSafeDB
from src.core.agent_templates import AgentTemplateManager
def init_agents():
"""Initialize all agent templates and orchestration flows"""
print("Initializing agent templates...")
# Connect to database
db_path = Path(".claude-symbols/search.db")
if not db_path.exists():
print(f"Error: Database not found at {db_path}")
print("Please run the indexing first to create the database")
return False
db = ThreadSafeDB(str(db_path))
manager = AgentTemplateManager(db)
# The agent templates are already loaded by AgentTemplateManager
# in its __init__ method (load_prebuilt_templates)
print(f"Loaded {len(manager.templates)} agent templates")
# Add additional orchestration-specific agents
additional_agents = [
{
'name': 'review_aggregator',
'type': 'analyzer',
'description': 'Aggregate results from multiple analysis agents',
'template_code': '''
def analyze(inputs: dict) -> dict:
"""Aggregate results from multiple agents into a unified review"""
aggregated = {
'summary': {},
'issues': [],
'suggestions': [],
'metrics': {},
'quality_score': 100
}
# Process each agent result
for key, value in inputs.items():
if not key.endswith('_result') or not isinstance(value, dict):
continue
agent_name = key.replace('_result', '')
# Handle import analyzer results
if 'unused' in value:
unused_count = len(value.get('unused', []))
if unused_count > 0:
aggregated['issues'].append({
'type': 'unused_imports',
'severity': 'low',
'count': unused_count,
'items': value['unused'][:5] # First 5 examples
})
aggregated['quality_score'] -= unused_count * 2
# Handle complexity results
if 'complexity' in value:
complexity = value['complexity']
if complexity > 10:
aggregated['issues'].append({
'type': 'high_complexity',
'severity': 'high' if complexity > 20 else 'medium',
'value': complexity
})
aggregated['quality_score'] -= min(20, complexity - 10)
# Handle test gap results
if 'untested' in value:
untested_count = len(value.get('untested', []))
if untested_count > 0:
coverage = value.get('coverage_percent', 0)
aggregated['issues'].append({
'type': 'missing_tests',
'severity': 'high' if coverage < 50 else 'medium',
'count': untested_count,
'coverage': coverage
})
aggregated['quality_score'] -= max(30, 100 - coverage) / 2
# Handle duplicate detection
if 'duplicates' in value:
dup_count = len(value.get('duplicates', []))
if dup_count > 0:
aggregated['issues'].append({
'type': 'code_duplication',
'severity': 'medium',
'count': dup_count,
'lines': value.get('total_duplicate_lines', 0)
})
aggregated['quality_score'] -= dup_count * 5
# Collect metrics
for metric_key in ['count', 'complexity', 'coverage_percent']:
if metric_key in value:
aggregated['metrics'][f"{agent_name}_{metric_key}"] = value[metric_key]
# Ensure quality score doesn't go negative
aggregated['quality_score'] = max(0, aggregated['quality_score'])
# Generate summary
issue_counts = {'high': 0, 'medium': 0, 'low': 0}
for issue in aggregated['issues']:
severity = issue.get('severity', 'low')
issue_counts[severity] += 1
aggregated['summary'] = {
'quality_score': aggregated['quality_score'],
'total_issues': len(aggregated['issues']),
'high_severity': issue_counts['high'],
'medium_severity': issue_counts['medium'],
'low_severity': issue_counts['low'],
'grade': 'A' if aggregated['quality_score'] >= 90 else
'B' if aggregated['quality_score'] >= 80 else
'C' if aggregated['quality_score'] >= 70 else
'D' if aggregated['quality_score'] >= 60 else 'F'
}
# Generate suggestions based on issues
if issue_counts['high'] > 0:
aggregated['suggestions'].append("Priority: Address high-severity issues first")
if any(i['type'] == 'missing_tests' for i in aggregated['issues']):
aggregated['suggestions'].append("Add tests for uncovered functions")
if any(i['type'] == 'high_complexity' for i in aggregated['issues']):
aggregated['suggestions'].append("Refactor complex functions into smaller units")
if any(i['type'] == 'code_duplication' for i in aggregated['issues']):
aggregated['suggestions'].append("Extract common code into reusable functions")
return aggregated
''',
'input_schema': {'type': 'object'},
'output_schema': {'type': 'object'}
},
{
'name': 'priority_ranker',
'type': 'analyzer',
'description': 'Rank and prioritize issues by impact',
'template_code': '''
def analyze(inputs: dict) -> dict:
"""Rank issues by priority and impact"""
issues = []
# Collect all issues from inputs
for key, value in inputs.items():
if not isinstance(value, dict):
continue
# Extract issues from review aggregator
if 'issues' in value:
issues.extend(value['issues'])
# Score each issue
for issue in issues:
score = 0
# Severity scoring
severity_scores = {'high': 30, 'medium': 20, 'low': 10}
score += severity_scores.get(issue.get('severity', 'low'), 0)
# Type-specific scoring
if issue['type'] == 'missing_tests':
score += 25 # Tests are critical
elif issue['type'] == 'high_complexity':
score += 20 # Complexity affects maintainability
elif issue['type'] == 'code_duplication':
score += 15 # Duplication increases maintenance burden
elif issue['type'] == 'unused_imports':
score += 5 # Minor but easy to fix
# Add count/value modifiers
if 'count' in issue:
score += min(10, issue['count'])
if 'value' in issue and issue['type'] == 'high_complexity':
score += min(15, issue['value'] - 10)
issue['priority_score'] = score
# Sort by priority
issues.sort(key=lambda x: x['priority_score'], reverse=True)
# Generate action plan
action_plan = []
for i, issue in enumerate(issues[:5], 1):
action = {
'order': i,
'type': issue['type'],
'severity': issue['severity'],
'action': ''
}
if issue['type'] == 'missing_tests':
action['action'] = f"Write tests for {issue.get('count', 'uncovered')} functions"
elif issue['type'] == 'high_complexity':
action['action'] = f"Refactor function with complexity {issue.get('value', 'N/A')}"
elif issue['type'] == 'code_duplication':
action['action'] = f"Extract {issue.get('lines', 'duplicated')} lines into reusable function"
elif issue['type'] == 'unused_imports':
action['action'] = f"Remove {issue.get('count', 'unused')} unused imports"
action_plan.append(action)
return {
'ranked_issues': issues,
'action_plan': action_plan,
'top_priority': issues[0] if issues else None,
'total_issues': len(issues),
'estimated_effort': sum(i['priority_score'] for i in issues)
}
''',
'input_schema': {'type': 'object'},
'output_schema': {'type': 'object'}
},
{
'name': 'debt_calculator',
'type': 'analyzer',
'description': 'Calculate technical debt and ROI for fixes',
'template_code': '''
def analyze(inputs: dict) -> dict:
"""Calculate technical debt metrics and ROI"""
debt_items = []
total_debt_hours = 0
# Process results from other agents
for key, value in inputs.items():
if not isinstance(value, dict):
continue
# Calculate debt from complexity
if 'complexity' in value:
complexity = value['complexity']
if complexity > 10:
hours = (complexity - 10) * 0.5 # 30 min per complexity point over 10
debt_items.append({
'type': 'complexity',
'hours': hours,
'cost': hours * 100, # $100/hour
'priority': 'high' if complexity > 20 else 'medium'
})
total_debt_hours += hours
# Calculate debt from missing tests
if 'untested' in value:
untested_count = len(value.get('untested', []))
if untested_count > 0:
hours = untested_count * 0.25 # 15 min per test
debt_items.append({
'type': 'missing_tests',
'hours': hours,
'cost': hours * 100,
'priority': 'high',
'count': untested_count
})
total_debt_hours += hours
# Calculate debt from duplicates
if 'duplicates' in value:
dup_count = len(value.get('duplicates', []))
if dup_count > 0:
hours = dup_count * 0.5 # 30 min per duplicate to refactor
debt_items.append({
'type': 'duplication',
'hours': hours,
'cost': hours * 100,
'priority': 'medium',
'count': dup_count
})
total_debt_hours += hours
# Calculate debt from unused imports
if 'unused' in value:
unused_count = len(value.get('unused', []))
if unused_count > 0:
hours = unused_count * 0.05 # 3 min per import
debt_items.append({
'type': 'unused_imports',
'hours': hours,
'cost': hours * 100,
'priority': 'low',
'count': unused_count
})
total_debt_hours += hours
# Sort by ROI (priority/hours)
for item in debt_items:
priority_value = {'high': 3, 'medium': 2, 'low': 1}[item['priority']]
item['roi'] = priority_value / max(0.1, item['hours'])
debt_items.sort(key=lambda x: x['roi'], reverse=True)
# Calculate summary metrics
total_cost = sum(item['cost'] for item in debt_items)
high_priority_hours = sum(item['hours'] for item in debt_items if item['priority'] == 'high')
return {
'debt_items': debt_items,
'total_debt_hours': round(total_debt_hours, 1),
'total_debt_cost': round(total_cost),
'high_priority_hours': round(high_priority_hours, 1),
'quick_wins': [item for item in debt_items if item['hours'] < 0.5][:3],
'recommendation': 'Start with quick wins' if any(item['hours'] < 0.5 for item in debt_items) else
'Focus on high-priority items' if high_priority_hours > 0 else
'Minimal technical debt detected'
}
''',
'input_schema': {'type': 'object'},
'output_schema': {'type': 'object'}
}
]
# Register additional agents
registered = 0
for agent_data in additional_agents:
from src.core.agent_templates import AgentTemplate
agent = AgentTemplate(
name=agent_data['name'],
agent_type=agent_data['type'],
description=agent_data['description'],
template_code=agent_data['template_code'],
input_schema=agent_data['input_schema'],
output_schema=agent_data['output_schema']
)
if agent.name not in manager.templates:
manager.register_template(agent)
registered += 1
print(f" Registered: {agent.name}")
print(f"\nRegistered {registered} additional agents")
print(f"Total agents available: {len(manager.templates)}")
# List all available agents
print("\nAvailable agents by type:")
agents_by_type = {}
for agent in manager.templates.values():
if agent.type not in agents_by_type:
agents_by_type[agent.type] = []
agents_by_type[agent.type].append(agent.name)
for agent_type, names in sorted(agents_by_type.items()):
print(f" {agent_type}: {', '.join(sorted(names))}")
# Initialize orchestration flows
print("\nInitializing orchestration flows...")
flows = [
{
'name': 'instant_review',
'description': 'Multi-agent code review (920/1000 value)',
'agent_sequence': json.dumps(['review_aggregator', 'priority_ranker']),
'parallel_groups': json.dumps([
['import_analyzer', 'complexity_analyzer', 'function_extractor', 'test_gap_finder']
]),
'cache_strategy': 'aggressive'
},
{
'name': 'debt_orchestrator',
'description': 'Technical debt analysis (900/1000 value)',
'agent_sequence': json.dumps(['debt_calculator']),
'parallel_groups': json.dumps([
['complexity_analyzer', 'duplicate_detector', 'test_gap_finder']
]),
'cache_strategy': 'aggressive'
},
{
'name': 'test_gap_analyzer',
'description': 'Find untested code (880/1000 value)',
'agent_sequence': json.dumps(['function_extractor', 'test_gap_finder', 'test_generator']),
'parallel_groups': json.dumps([]),
'cache_strategy': 'conservative'
},
{
'name': 'import_optimizer',
'description': 'Optimize imports (650/1000 value)',
'agent_sequence': json.dumps(['import_analyzer']),
'parallel_groups': json.dumps([]),
'cache_strategy': 'aggressive'
}
]
# Register flows in database
with db.get_connection() as conn:
for flow in flows:
conn.execute("""
INSERT OR REPLACE INTO orchestration_flows
(name, description, agent_sequence, parallel_groups, cache_strategy, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (
flow['name'],
flow['description'],
flow['agent_sequence'],
flow['parallel_groups'],
flow['cache_strategy'],
datetime.now().isoformat()
))
conn.commit()
print(f"Registered {len(flows)} orchestration flows")
print("\n✅ Agent initialization complete!")
print("\nNext steps:")
print("1. Update src/server.py to add orchestrated tools")
print("2. Test with: python3 -c \"from src.core.orchestrator import OrchestrationEngine; ...\"")
return True
if __name__ == "__main__":
success = init_agents()
sys.exit(0 if success else 1)