migrate_db_phase6.py•7.95 kB
#!/usr/bin/env python3
"""
Database migration script for Phase 6: Orchestration Framework
Adds tables for agent templates, orchestration flows, and caching
"""
import sqlite3
import sys
from pathlib import Path
from datetime import datetime
def migrate_database(db_path: Path):
"""Add Phase 6 orchestration tables to existing database"""
if not db_path.exists():
print(f"Database not found at {db_path}")
return False
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Check if migration already applied
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='agent_templates'
""")
if cursor.fetchone():
print("Phase 6 migration already applied")
return True
print("Applying Phase 6 migration...")
# 1. Agent Templates Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS agent_templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
type TEXT NOT NULL CHECK(type IN ('analyzer', 'generator', 'refactor', 'validator', 'orchestrator')),
description TEXT,
input_schema TEXT NOT NULL,
output_schema TEXT NOT NULL,
template_code TEXT NOT NULL,
dependencies TEXT,
priority INTEGER DEFAULT 100,
success_rate REAL DEFAULT 0.0,
avg_execution_ms INTEGER DEFAULT 0,
usage_count INTEGER DEFAULT 0,
embedding BLOB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 2. Orchestration Flows Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS orchestration_flows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
trigger_pattern TEXT,
agent_sequence TEXT NOT NULL,
parallel_groups TEXT,
cache_strategy TEXT CHECK(cache_strategy IN ('aggressive', 'conservative', 'none')),
success_count INTEGER DEFAULT 0,
failure_count INTEGER DEFAULT 0,
avg_duration_ms INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 3. Agent Execution Cache Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS agent_execution_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_name TEXT NOT NULL,
input_hash TEXT NOT NULL,
output TEXT NOT NULL,
execution_ms INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
hit_count INTEGER DEFAULT 0,
UNIQUE(agent_name, input_hash)
)
""")
# 4. Collaboration Patterns Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS collaboration_patterns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pattern_name TEXT UNIQUE NOT NULL,
agent_roles TEXT NOT NULL,
communication_protocol TEXT CHECK(communication_protocol IN ('pipeline', 'broadcast', 'consensus')),
aggregation_strategy TEXT CHECK(aggregation_strategy IN ('merge', 'vote', 'chain', 'best')),
example_usage TEXT,
success_rate REAL DEFAULT 0.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 5. Code Patterns Table
cursor.execute("""
CREATE TABLE IF NOT EXISTS code_patterns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pattern_type TEXT NOT NULL,
language TEXT NOT NULL,
template TEXT NOT NULL,
parameters TEXT,
usage_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used TIMESTAMP,
embedding BLOB
)
""")
# Create indexes for performance
print("Creating indexes...")
# Agent templates indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_templates_name
ON agent_templates(name)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_templates_type
ON agent_templates(type)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_templates_usage
ON agent_templates(usage_count DESC)
""")
# Orchestration flows indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_orchestration_flows_name
ON orchestration_flows(name)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_orchestration_flows_success
ON orchestration_flows(success_count DESC)
""")
# Cache indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_cache_lookup
ON agent_execution_cache(agent_name, input_hash)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_cache_expires
ON agent_execution_cache(expires_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_agent_cache_hits
ON agent_execution_cache(hit_count DESC)
""")
# Code patterns indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_code_patterns_type_lang
ON code_patterns(pattern_type, language)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_code_patterns_usage
ON code_patterns(usage_count DESC)
""")
# Add migration record
cursor.execute("""
CREATE TABLE IF NOT EXISTS migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
phase TEXT UNIQUE NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
INSERT OR IGNORE INTO migrations (phase) VALUES ('phase6_orchestration')
""")
conn.commit()
print("✅ Phase 6 migration completed successfully")
# Show table statistics
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table'
ORDER BY name
""")
tables = cursor.fetchall()
print(f"\nDatabase now contains {len(tables)} tables:")
for table in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table[0]}")
count = cursor.fetchone()[0]
print(f" - {table[0]}: {count} rows")
return True
except Exception as e:
print(f"❌ Migration failed: {e}")
conn.rollback()
return False
finally:
conn.close()
def main():
"""Run migration on default database"""
# Default path
db_path = Path(".claude-symbols/search.db")
# Allow custom path
if len(sys.argv) > 1:
db_path = Path(sys.argv[1])
print(f"Migrating database: {db_path}")
if migrate_database(db_path):
print("\n✅ Migration successful!")
print("\nNext steps:")
print("1. Run scripts/init_agents.py to load initial agent templates")
print("2. Start using the orchestration framework")
else:
print("\n❌ Migration failed!")
sys.exit(1)
if __name__ == "__main__":
main()