"""
Migration utility for upgrading from legacy LLMDatabaseRouter to new architecture
Provides automated migration tools and compatibility checks
"""
import os
import shutil
import json
import logging
import time
from typing import Dict, List, Any, Optional
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime
import ast
import re
logger = logging.getLogger(__name__)
@dataclass
class MigrationResult:
"""Result of a migration operation"""
success: bool
files_processed: int
changes_made: int
warnings: List[str]
errors: List[str]
backup_location: Optional[str] = None
class CodeAnalyzer:
"""Analyzes code for legacy patterns and suggests migrations"""
def __init__(self):
self.legacy_patterns = {
# Old imports
'old_imports': [
r'from\s+llmDatabaseRouter\s+import\s+LLMDatabaseRouter',
r'import\s+llmDatabaseRouter'
],
# Old method calls
'old_methods': [
r'\.answer_question\(',
r'\.safe_run_sql\(',
r'\.semantic_rows\(',
r'\.get_schema_summary\(',
r'\._get_all_table_names\(',
r'\._clean_markdown_output\(',
r'\._check_vector_extension\('
],
# Old class usage
'old_classes': [
r'LLMDatabaseRouter\('
]
}
self.modern_replacements = {
# New imports
'new_imports': {
'from llmDatabaseRouter import LLMDatabaseRouter':
'from services.smart_search_service import SmartSearchService',
'import llmDatabaseRouter':
'from services.smart_search_service import SmartSearchService'
},
# New method mappings
'method_mappings': {
'answer_question': 'search',
'safe_run_sql': 'execute_query', # PostgresRepository
'semantic_rows': 'search', # SemanticService
'get_schema_summary': 'get_schema_info', # SchemaService
'_get_all_table_names': 'get_all_table_names', # PostgresRepository
'_clean_markdown_output': 'clean_markdown', # SynthesisService
'_check_vector_extension': 'has_vector_extension' # VectorRepository
}
}
def analyze_file(self, file_path: Path) -> Dict[str, Any]:
"""Analyze a single file for legacy patterns"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
analysis = {
'file_path': str(file_path),
'legacy_imports': [],
'legacy_methods': [],
'legacy_classes': [],
'migration_suggestions': [],
'complexity_score': 0
}
# Check for legacy imports
for pattern in self.legacy_patterns['old_imports']:
matches = re.findall(pattern, content)
analysis['legacy_imports'].extend(matches)
# Check for legacy method calls
for pattern in self.legacy_patterns['old_methods']:
matches = re.findall(pattern, content)
analysis['legacy_methods'].extend(matches)
# Check for legacy class usage
for pattern in self.legacy_patterns['old_classes']:
matches = re.findall(pattern, content)
analysis['legacy_classes'].extend(matches)
# Generate migration suggestions
analysis['migration_suggestions'] = self._generate_suggestions(analysis)
# Calculate complexity score
analysis['complexity_score'] = self._calculate_complexity(analysis)
return analysis
except Exception as e:
logger.error(f"Failed to analyze file {file_path}: {e}")
return {
'file_path': str(file_path),
'error': str(e),
'complexity_score': 10 # High complexity for error cases
}
def _generate_suggestions(self, analysis: Dict[str, Any]) -> List[str]:
"""Generate migration suggestions based on analysis"""
suggestions = []
if analysis['legacy_imports']:
suggestions.append(
"Replace legacy imports with new architecture components:\n"
"- Import SmartSearchService for intelligent search\n"
"- Import individual services for specific operations\n"
"- See docs/MIGRATION.md for detailed import mapping"
)
if analysis['legacy_methods']:
suggestions.append(
"Update method calls to new architecture:\n"
"- answer_question() → SmartSearchService.search()\n"
"- safe_run_sql() → PostgresRepository.execute_query()\n"
"- semantic_rows() → SemanticService.search()\n"
"- See method mapping guide for complete list"
)
if analysis['legacy_classes']:
suggestions.append(
"Migrate from LLMDatabaseRouter to new architecture:\n"
"1. Use SmartSearchService for intelligent search\n"
"2. Use individual services for specific operations\n"
"3. Use create_modern_router() helper for compatibility\n"
"4. See examples/ directory for migration patterns"
)
return suggestions
def _calculate_complexity(self, analysis: Dict[str, Any]) -> int:
"""Calculate migration complexity score (0-10)"""
score = 0
# Legacy usage increases complexity
score += len(analysis['legacy_imports']) * 2
score += len(analysis['legacy_methods']) * 1
score += len(analysis['legacy_classes']) * 3
return min(score, 10)
class MigrationUtility:
"""Main migration utility for upgrading to new architecture"""
def __init__(self, project_root: str):
self.project_root = Path(project_root)
self.analyzer = CodeAnalyzer()
self.backup_dir = self.project_root / 'migration_backup'
def analyze_project(self, include_patterns: List[str] = None) -> Dict[str, Any]:
"""Analyze entire project for migration needs"""
if include_patterns is None:
include_patterns = ['*.py']
files_to_analyze = []
for pattern in include_patterns:
files_to_analyze.extend(self.project_root.glob(f"**/{pattern}"))
analysis_results = []
total_complexity = 0
for file_path in files_to_analyze:
# Skip certain directories
if any(skip in str(file_path) for skip in [
'__pycache__', '.git', 'venv', 'env', 'node_modules',
'migration_backup', 'docs'
]):
continue
result = self.analyzer.analyze_file(file_path)
analysis_results.append(result)
total_complexity += result.get('complexity_score', 0)
# Generate summary
files_with_legacy = [r for r in analysis_results if (
r.get('legacy_imports') or
r.get('legacy_methods') or
r.get('legacy_classes')
)]
summary = {
'total_files_analyzed': len(analysis_results),
'files_with_legacy_code': len(files_with_legacy),
'total_complexity_score': total_complexity,
'average_complexity': total_complexity / len(analysis_results) if analysis_results else 0,
'files_needing_migration': files_with_legacy,
'detailed_analysis': analysis_results
}
return summary
def create_backup(self) -> str:
"""Create backup of project before migration"""
try:
backup_path = self.backup_dir / f"backup_{int(time.time())}"
backup_path.mkdir(parents=True, exist_ok=True)
# Copy Python files
for py_file in self.project_root.glob("**/*.py"):
if 'migration_backup' not in str(py_file):
relative_path = py_file.relative_to(self.project_root)
backup_file = backup_path / relative_path
backup_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(py_file, backup_file)
logger.info(f"Backup created at {backup_path}")
return str(backup_path)
except Exception as e:
logger.error(f"Failed to create backup: {e}")
raise
def migrate_file(self, file_path: Path, dry_run: bool = True) -> Dict[str, Any]:
"""Migrate a single file to new architecture"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
original_content = f.read()
modified_content = original_content
changes_made = 0
warnings = []
# Replace imports
for old_import, new_import in self.analyzer.modern_replacements['new_imports'].items():
if old_import in modified_content:
modified_content = modified_content.replace(old_import, new_import)
changes_made += 1
warnings.append(f"Updated import: {old_import} → {new_import}")
# Add compatibility wrapper suggestion
if 'LLMDatabaseRouter(' in original_content:
warnings.append(
"Consider using create_modern_router() helper for easier migration:\n"
"router = create_modern_router(engine, config)"
)
# Method call suggestions (not automatically replaced due to context sensitivity)
for old_method, new_method in self.analyzer.modern_replacements['method_mappings'].items():
if f'.{old_method}(' in original_content:
warnings.append(
f"Method '{old_method}' should be updated to use new architecture. "
f"Consider '{new_method}' in appropriate service class."
)
result = {
'file_path': str(file_path),
'changes_made': changes_made,
'warnings': warnings,
'modified_content': modified_content if changes_made > 0 else None
}
# Write changes if not dry run
if not dry_run and changes_made > 0:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(modified_content)
logger.info(f"Migrated file: {file_path}")
return result
except Exception as e:
logger.error(f"Failed to migrate file {file_path}: {e}")
return {
'file_path': str(file_path),
'error': str(e),
'changes_made': 0,
'warnings': []
}
def migrate_project(self, dry_run: bool = True, create_backup: bool = True) -> MigrationResult:
"""Migrate entire project to new architecture"""
warnings = []
errors = []
backup_location = None
try:
# Analyze project first
analysis = self.analyze_project()
files_to_migrate = analysis['files_needing_migration']
if not files_to_migrate:
return MigrationResult(
success=True,
files_processed=0,
changes_made=0,
warnings=["No legacy code patterns found. Project appears to be already migrated."],
errors=[]
)
# Create backup if requested
if create_backup and not dry_run:
backup_location = self.create_backup()
# Migrate files
total_changes = 0
files_processed = 0
for file_info in files_to_migrate:
file_path = Path(file_info['file_path'])
if not file_path.exists():
errors.append(f"File not found: {file_path}")
continue
result = self.migrate_file(file_path, dry_run)
if 'error' in result:
errors.append(f"Migration error in {file_path}: {result['error']}")
else:
files_processed += 1
total_changes += result['changes_made']
warnings.extend(result['warnings'])
# Add general migration warnings
warnings.extend([
"Migration completed with automatic changes only.",
"Manual review and testing required for:",
"- Method call context and parameter changes",
"- Error handling adaptations",
"- Configuration updates",
"- Testing and validation",
"",
"See docs/MIGRATION.md for detailed migration guide.",
"Review migration_backup/ for original files."
])
return MigrationResult(
success=len(errors) == 0,
files_processed=files_processed,
changes_made=total_changes,
warnings=warnings,
errors=errors,
backup_location=backup_location
)
except Exception as e:
logger.error(f"Migration failed: {e}")
return MigrationResult(
success=False,
files_processed=0,
changes_made=0,
warnings=warnings,
errors=[f"Migration failed: {str(e)}"],
backup_location=backup_location
)
def generate_migration_report(self, output_file: str = None) -> str:
"""Generate comprehensive migration report"""
analysis = self.analyze_project()
report_lines = [
"# MCP Architecture Migration Report",
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"## Summary",
f"- **Total files analyzed:** {analysis['total_files_analyzed']}",
f"- **Files with legacy code:** {analysis['files_with_legacy_code']}",
f"- **Total complexity score:** {analysis['total_complexity_score']}",
f"- **Average complexity:** {analysis['average_complexity']:.2f}",
"",
"## Migration Priority",
]
# Sort files by complexity
files_by_complexity = sorted(
analysis['files_needing_migration'],
key=lambda x: x.get('complexity_score', 0),
reverse=True
)
for file_info in files_by_complexity[:10]: # Top 10 most complex
complexity = file_info.get('complexity_score', 0)
file_path = file_info['file_path']
if complexity >= 7:
priority = "🔴 HIGH"
elif complexity >= 4:
priority = "🟡 MEDIUM"
else:
priority = "🟢 LOW"
report_lines.append(f"- {priority} `{file_path}` (complexity: {complexity})")
report_lines.extend([
"",
"## Detailed Analysis",
""
])
for file_info in files_by_complexity:
file_path = file_info['file_path']
suggestions = file_info.get('migration_suggestions', [])
report_lines.extend([
f"### `{file_path}`",
f"**Complexity Score:** {file_info.get('complexity_score', 0)}",
""
])
if suggestions:
report_lines.append("**Migration Suggestions:**")
for suggestion in suggestions:
report_lines.append(f"- {suggestion}")
report_lines.append("")
report_lines.extend([
"## Next Steps",
"",
"1. **Review this report** and prioritize high-complexity files",
"2. **Create backup** before making changes",
"3. **Run migration utility** with dry-run first",
"4. **Manual review** of automatically migrated code",
"5. **Update configuration** to use new architecture",
"6. **Test thoroughly** after migration",
"7. **Update documentation** and team knowledge",
"",
"## Migration Commands",
"",
"```bash",
"# Analyze project",
"python -m migration.migration_utility analyze /path/to/project",
"",
"# Dry run migration",
"python -m migration.migration_utility migrate /path/to/project --dry-run",
"",
"# Perform migration",
"python -m migration.migration_utility migrate /path/to/project",
"```",
"",
"For detailed guidance, see: docs/MIGRATION.md"
])
report_content = "\n".join(report_lines)
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report_content)
logger.info(f"Migration report saved to {output_file}")
return report_content
def main():
"""Command-line interface for migration utility"""
import argparse
import time
from datetime import datetime
parser = argparse.ArgumentParser(description="MCP Architecture Migration Utility")
parser.add_argument("command", choices=["analyze", "migrate", "report"],
help="Migration command to run")
parser.add_argument("project_path", help="Path to project directory")
parser.add_argument("--dry-run", action="store_true",
help="Perform dry run without making changes")
parser.add_argument("--no-backup", action="store_true",
help="Skip creating backup (not recommended)")
parser.add_argument("--output", help="Output file for reports")
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
utility = MigrationUtility(args.project_path)
if args.command == "analyze":
print("Analyzing project for legacy patterns...")
analysis = utility.analyze_project()
print(f"\n📊 Analysis Results:")
print(f" Files analyzed: {analysis['total_files_analyzed']}")
print(f" Files with legacy code: {analysis['files_with_legacy_code']}")
print(f" Total complexity: {analysis['total_complexity_score']}")
if analysis['files_with_legacy_code'] > 0:
print(f"\n🔄 Migration recommended for {analysis['files_with_legacy_code']} files")
else:
print(f"\n✅ No legacy patterns found - project appears migrated")
elif args.command == "migrate":
dry_run = args.dry_run
create_backup = not args.no_backup
if dry_run:
print("🔍 Performing dry run migration analysis...")
else:
print("🚀 Performing project migration...")
if create_backup:
print(" Creating backup first...")
result = utility.migrate_project(dry_run=dry_run, create_backup=create_backup)
if result.success:
print(f"\n✅ Migration completed successfully!")
print(f" Files processed: {result.files_processed}")
print(f" Changes made: {result.changes_made}")
if result.backup_location:
print(f" Backup created: {result.backup_location}")
else:
print(f"\n❌ Migration failed with {len(result.errors)} errors")
for error in result.errors:
print(f" Error: {error}")
if result.warnings:
print(f"\n⚠️ Warnings ({len(result.warnings)}):")
for warning in result.warnings[:5]: # Show first 5 warnings
print(f" {warning}")
if len(result.warnings) > 5:
print(f" ... and {len(result.warnings) - 5} more warnings")
elif args.command == "report":
print("📋 Generating migration report...")
output_file = args.output or f"migration_report_{int(time.time())}.md"
utility.generate_migration_report(output_file)
print(f" Report saved to: {output_file}")
if __name__ == "__main__":
main()