#!/usr/bin/env python3
"""
CodeBase Optimizer MCP Server
============================
Model Context Protocol server that exposes codebase analysis and optimization functions.
This server makes the CodeBase Optimizer Engine available as MCP tools in Claude Code.
Author: AI Assistant
License: MIT
"""
import asyncio
import json
import logging
import os
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
# Add the current directory to Python path so we can import our engine
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
from codebase_optimizer_engine import CodebaseAnalyzer, AnalysisResult
except ImportError as e:
logging.error(f"Failed to import CodebaseAnalyzer: {e}")
sys.exit(1)
# MCP imports
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
import mcp.types as types
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("codebase-optimizer-mcp")
# Create the MCP server
server = Server("codebase-optimizer")
def format_analysis_result(result: AnalysisResult, title: str) -> str:
"""Format analysis result for display"""
if not result.success:
return f"## {title}\n❌ **Analysis Failed**\n\nErrors:\n" + "\n".join(f"- {error}" for error in result.errors)
output = f"## {title}\n✅ **Analysis Successful** (Confidence: {result.confidence:.1%})\n\n"
if result.warnings:
output += "### ⚠️ Warnings\n" + "\n".join(f"- {warning}" for warning in result.warnings) + "\n\n"
if result.recommendations:
output += "### 💡 Recommendations\n" + "\n".join(f"- {rec}" for rec in result.recommendations) + "\n\n"
# Add key data points
if result.data:
output += "### 📊 Key Findings\n"
for key, value in result.data.items():
if isinstance(value, (str, int, float, bool)):
output += f"- **{key.replace('_', ' ').title()}**: {value}\n"
elif isinstance(value, list) and len(value) <= 5:
output += f"- **{key.replace('_', ' ').title()}**: {len(value)} items\n"
elif isinstance(value, dict) and len(value) <= 10:
output += f"- **{key.replace('_', ' ').title()}**: {len(value)} entries\n"
return output
def format_comprehensive_report(report: Dict[str, Any]) -> str:
"""Format comprehensive analysis report"""
output = "# 🔍 Comprehensive CodeBase Analysis Report\n\n"
# Header information
output += f"**Project Path**: `{report.get('project_path', 'Unknown')}`\n"
output += f"**Analysis Date**: {report.get('analysis_date', 'Unknown')}\n"
output += f"**Health Score**: {report.get('health_score', 0)}/100\n\n"
# Project metrics
metrics = report.get('metrics', {})
output += "## 📊 Project Metrics\n\n"
output += f"- **Total Files**: {metrics.get('total_files', 0):,}\n"
output += f"- **Lines of Code**: {metrics.get('lines_of_code', 0):,}\n"
output += f"- **Duplicated Lines**: {metrics.get('duplicated_lines', 0):,}\n"
output += f"- **Duplication Percentage**: {metrics.get('duplication_percentage', 0):.1f}%\n"
output += f"- **Organization Score**: {metrics.get('organization_score', 0):.1f}/1.0\n"
output += f"- **Security Score**: {metrics.get('security_score', 0):.1f}/1.0\n\n"
# Analysis results summary
output += "## 🔍 Analysis Results Summary\n\n"
analysis_results = report.get('analysis_results', {})
for name, result in analysis_results.items():
status = "✅" if result.get('success', False) else "❌"
confidence = result.get('confidence', 0)
rec_count = result.get('recommendations_count', 0)
output += f"- **{name.replace('_', ' ').title()}**: {status} (Confidence: {confidence:.1%}, {rec_count} recommendations)\n"
# Top recommendations
output += "\n## 🎯 Prioritized Recommendations\n\n"
recommendations = report.get('prioritized_recommendations', [])
for i, rec in enumerate(recommendations[:10], 1):
priority_emoji = {"high": "🔴", "medium": "🟡", "low": "🟢"}.get(rec.get('priority', 'medium'), "⚪")
output += f"{i}. {priority_emoji} **[{rec.get('priority', 'medium').upper()}]** {rec.get('recommendation', '')}\n"
output += f" - Impact: {rec.get('impact', 'medium')} | Effort: {rec.get('effort', 'medium')}\n\n"
# Action plan
output += "## 📋 Action Plan\n\n"
action_plan = report.get('action_plan', [])
for phase in action_plan:
output += f"### {phase.get('phase', 'Unknown Phase')} ({phase.get('estimated_time', 'Unknown')})\n"
output += f"{phase.get('description', '')}\n\n"
for item in phase.get('items', [])[:3]: # Show first 3 items
output += f"- {item.get('recommendation', '')}\n"
if len(phase.get('items', [])) > 3:
output += f"- ... and {len(phase.get('items', [])) - 3} more items\n"
output += "\n"
# Improvement potential
output += "## 🚀 Improvement Potential\n\n"
improvement = report.get('improvement_potential', {})
for area, potential in improvement.items():
output += f"- **{area.replace('_', ' ').title()}**: {potential}\n"
return output
@server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available tools"""
return [
Tool(
name="analyze_project_structure",
description="Analyze project structure, organization, and architecture type",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the project directory to analyze"
}
},
"required": ["path"]
}
),
Tool(
name="detect_code_duplicates",
description="Detect code duplications across the project in multiple languages",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the project directory to analyze"
},
"languages": {
"type": "array",
"items": {"type": "string"},
"description": "List of languages to analyze (e.g., ['python', 'javascript', 'typescript'])",
"default": []
}
},
"required": ["path"]
}
),
Tool(
name="validate_microservices_architecture",
description="Validate microservices architecture patterns and detect issues",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the project directory to analyze"
}
},
"required": ["path"]
}
),
Tool(
name="optimize_configurations",
description="Analyze and optimize configuration management patterns",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the project directory to analyze"
}
},
"required": ["path"]
}
),
Tool(
name="generate_improvement_report",
description="Generate comprehensive improvement report with actionable recommendations",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the project directory to analyze"
}
},
"required": ["path"]
}
),
Tool(
name="run_complete_project_analysis",
description="🎯 MASTER FUNCTION: Run complete project analysis with all checks in optimal order",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the project directory to analyze"
},
"depth": {
"type": "string",
"enum": ["quick", "standard", "deep"],
"description": "Analysis depth level",
"default": "standard"
}
},
"required": ["path"]
}
)
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Handle tool calls"""
try:
project_path = arguments.get("path")
if not project_path:
return [types.TextContent(type="text", text="Error: Project path is required")]
if not os.path.exists(project_path):
return [types.TextContent(type="text", text=f"Error: Project path '{project_path}' does not exist")]
# Initialize analyzer
analyzer = CodebaseAnalyzer(project_path)
if name == "analyze_project_structure":
result = analyzer.analyze_project_structure()
output = format_analysis_result(result, "Project Structure Analysis")
return [types.TextContent(type="text", text=output)]
elif name == "detect_code_duplicates":
languages = arguments.get("languages", [])
result = analyzer.detect_code_duplicates(languages if languages else None)
output = format_analysis_result(result, "Code Duplication Analysis")
return [types.TextContent(type="text", text=output)]
elif name == "validate_microservices_architecture":
result = analyzer.validate_microservices_architecture()
output = format_analysis_result(result, "Microservices Architecture Validation")
return [types.TextContent(type="text", text=output)]
elif name == "optimize_configurations":
result = analyzer.optimize_configurations()
output = format_analysis_result(result, "Configuration Optimization Analysis")
return [types.TextContent(type="text", text=output)]
elif name == "generate_improvement_report":
# Run all analyses first
structure_result = analyzer.analyze_project_structure()
duplicates_result = analyzer.detect_code_duplicates()
microservices_result = analyzer.validate_microservices_architecture()
config_result = analyzer.optimize_configurations()
results = {
'structure': structure_result,
'duplicates': duplicates_result,
'microservices': microservices_result,
'configuration': config_result
}
report = analyzer.generate_improvement_report(results)
output = format_comprehensive_report(report)
return [types.TextContent(type="text", text=output)]
elif name == "run_complete_project_analysis":
depth = arguments.get("depth", "standard")
output = f"# 🚀 Complete CodeBase Analysis\n\n"
output += f"**Project**: `{project_path}`\n"
output += f"**Analysis Depth**: {depth}\n"
output += f"**Started**: {analyzer.pattern_db.init_database()}\n\n"
# Phase 1: Project Structure Analysis
output += "## Phase 1: Project Structure & Architecture Detection\n"
structure_result = analyzer.analyze_project_structure()
output += format_analysis_result(structure_result, "Structure Analysis")
output += "\n---\n\n"
# Phase 2: Code Duplicate Detection
output += "## Phase 2: Code Duplicate Detection\n"
duplicates_result = analyzer.detect_code_duplicates()
output += format_analysis_result(duplicates_result, "Duplicate Detection")
output += "\n---\n\n"
# Phase 3: Microservices Validation (if applicable)
output += "## Phase 3: Microservices Architecture Validation\n"
microservices_result = analyzer.validate_microservices_architecture()
output += format_analysis_result(microservices_result, "Microservices Validation")
output += "\n---\n\n"
# Phase 4: Configuration Optimization
output += "## Phase 4: Configuration Optimization\n"
config_result = analyzer.optimize_configurations()
output += format_analysis_result(config_result, "Configuration Analysis")
output += "\n---\n\n"
# Phase 5: Generate Comprehensive Report
output += "## Phase 5: Comprehensive Analysis Report\n"
results = {
'structure': structure_result,
'duplicates': duplicates_result,
'microservices': microservices_result,
'configuration': config_result
}
report = analyzer.generate_improvement_report(results)
# Summary
output += f"### 🎯 Analysis Complete!\n\n"
output += f"**Health Score**: {report.get('health_score', 0)}/100\n"
output += f"**Total Recommendations**: {len(report.get('prioritized_recommendations', []))}\n"
output += f"**Action Plan Phases**: {len(report.get('action_plan', []))}\n\n"
# Top 5 immediate actions
output += "### 🔥 Top 5 Immediate Actions\n\n"
top_recs = report.get('prioritized_recommendations', [])[:5]
for i, rec in enumerate(top_recs, 1):
priority_emoji = {"high": "🔴", "medium": "🟡", "low": "🟢"}.get(rec.get('priority', 'medium'), "⚪")
output += f"{i}. {priority_emoji} {rec.get('recommendation', '')}\n"
output += f"\n💡 **Use `generate_improvement_report` for the complete detailed report**\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Error: Unknown tool '{name}'")]
except Exception as e:
logger.error(f"Error in tool '{name}': {e}")
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async def main():
"""Main function to run the MCP server"""
# Import here to avoid issues if mcp is not installed
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="codebase-optimizer",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
# Check if we can import MCP
try:
import mcp
import mcp.server
import mcp.types
except ImportError:
print("Error: MCP library not found. Please install with: pip install mcp")
sys.exit(1)
asyncio.run(main())