#!/usr/bin/env python3
"""
Enhanced MCP vs Native Analysis Framework
Comprehensive tracking of retrieval methods, token usage, and edit behavior patterns.
"""
import json
import time
import os
import sys
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple, Union
from datetime import datetime
from dataclasses import dataclass, field, asdict
import logging
import uuid
import hashlib
from enum import Enum
from mcp_server.core.path_utils import PathUtils
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class RetrievalMethod(Enum):
"""Enumeration of retrieval methods"""
SEMANTIC = "semantic"
SQL_FTS = "sql_fts" # fts_code table
SQL_BM25 = "sql_bm25" # bm25_content table
HYBRID = "hybrid" # combination of semantic + SQL
NATIVE_GREP = "native_grep"
NATIVE_READ = "native_read"
NATIVE_GLOB = "native_glob"
class EditType(Enum):
"""Enumeration of edit types"""
TARGETED_EDIT = "targeted_edit" # Edit tool with specific lines
MULTI_EDIT = "multi_edit" # MultiEdit tool
FULL_REWRITE = "full_rewrite" # Write tool
APPEND_ONLY = "append_only" # Adding to end of file
ANALYSIS_ONLY = "analysis_only" # Analysis only, no edits
NO_EDIT = "no_edit" # Search only, no modifications
@dataclass
class TestScenario:
"""Test scenario for enhanced MCP analysis"""
scenario_id: str
name: str
description: str
queries: List[str]
expected_retrieval_method: Optional[RetrievalMethod] = None
expected_edit_type: Optional[EditType] = None
complexity_level: str = "medium" # low, medium, high
requires_context: bool = True
expected_files_modified: int = 1
priority: int = 5
expected_response_time_ms: float = 1000.0
@dataclass
class CacheTokenMetrics:
"""Detailed cache token usage"""
cache_read_input_tokens: int = 0
cache_creation_input_tokens: int = 0
cache_hit_rate: float = 0.0
cache_efficiency_ratio: float = 0.0 # cache_read / total_input
@dataclass
class RetrievalMethodMetrics:
"""Detailed retrieval method analysis"""
method_type: RetrievalMethod
schema_used: str # "fts_code", "bm25_content", "symbols", etc.
collection_used: Optional[str] = None # Qdrant collection name
response_time_ms: float = 0.0
results_count: int = 0
metadata_quality_score: float = 0.0 # 0-1 based on snippet/line number availability
line_numbers_available: bool = False
snippets_provided: bool = False
usage_hints_generated: bool = False
def calculate_quality_score(self) -> float:
"""Calculate metadata quality score"""
score = 0.0
if self.line_numbers_available:
score += 0.4
if self.snippets_provided:
score += 0.3
if self.usage_hints_generated:
score += 0.3
self.metadata_quality_score = score
return score
@dataclass
class EditBehaviorMetrics:
"""Analysis of edit behavior based on retrieval"""
search_to_edit_time_ms: float = 0.0
context_reads_before_edit: int = 0
context_lines_read: int = 0
edit_type: EditType = EditType.NO_EDIT
lines_changed: int = 0
total_file_lines: int = 0
edit_precision_ratio: float = 0.0 # lines_changed / total_file_lines
tokens_per_line_changed: float = 0.0
used_offset_limit: bool = False # Whether Read used offset/limit
read_entire_file: bool = False
def calculate_precision_ratio(self) -> float:
"""Calculate edit precision ratio"""
if self.total_file_lines > 0:
self.edit_precision_ratio = self.lines_changed / self.total_file_lines
return self.edit_precision_ratio
@dataclass
class GranularTokenBreakdown:
"""Enhanced token breakdown with detailed categorization"""
interaction_id: str
timestamp: datetime
# Input tokens - detailed breakdown
user_prompt_tokens: int = 0
context_history_tokens: int = 0
tool_response_tokens: int = 0
file_content_tokens: int = 0
mcp_metadata_tokens: int = 0 # Tokens from MCP response metadata
# Cache token breakdown
cache_metrics: CacheTokenMetrics = field(default_factory=CacheTokenMetrics)
# Output tokens - detailed breakdown
reasoning_tokens: int = 0
tool_invocation_tokens: int = 0
code_generation_tokens: int = 0
explanation_tokens: int = 0
diff_generation_tokens: int = 0 # Tokens for creating targeted diffs
full_rewrite_tokens: int = 0 # Tokens for full file rewrites
error_handling_tokens: int = 0
# Efficiency metrics
total_input_tokens: int = 0
total_output_tokens: int = 0
tokens_per_result: float = 0.0
generation_efficiency: float = 0.0 # output / input ratio
def calculate_totals(self) -> None:
"""Calculate total token counts and efficiency metrics"""
self.total_input_tokens = (
self.user_prompt_tokens + self.context_history_tokens +
self.tool_response_tokens + self.file_content_tokens +
self.mcp_metadata_tokens + self.cache_metrics.cache_read_input_tokens +
self.cache_metrics.cache_creation_input_tokens
)
self.total_output_tokens = (
self.reasoning_tokens + self.tool_invocation_tokens +
self.code_generation_tokens + self.explanation_tokens +
self.diff_generation_tokens + self.full_rewrite_tokens +
self.error_handling_tokens
)
if self.total_input_tokens > 0:
self.generation_efficiency = self.total_output_tokens / self.total_input_tokens
@dataclass
class EnhancedQueryMetrics:
"""Comprehensive metrics for a single query execution"""
query_id: str
query_text: str
approach: str # 'mcp' or 'native'
start_time: datetime
end_time: Optional[datetime] = None
success: bool = False
# Enhanced token tracking
token_breakdown: GranularTokenBreakdown = field(default_factory=lambda: GranularTokenBreakdown("", datetime.now()))
# Retrieval method analysis
retrieval_metrics: RetrievalMethodMetrics = field(default_factory=lambda: RetrievalMethodMetrics(RetrievalMethod.NATIVE_GREP))
# Edit behavior analysis
edit_metrics: EditBehaviorMetrics = field(default_factory=EditBehaviorMetrics)
# Tool usage tracking
tools_used: List[str] = field(default_factory=list)
mcp_tools_used: List[str] = field(default_factory=list)
native_tools_used: List[str] = field(default_factory=list)
tool_sequence: List[Tuple[str, float]] = field(default_factory=list) # (tool_name, timestamp)
# Performance metrics
response_time_ms: float = 0.0
accuracy_score: Optional[float] = None
@property
def duration(self) -> float:
if self.end_time:
return (self.end_time - self.start_time).total_seconds() * 1000
return 0.0
class EnhancedMCPAnalyzer:
"""Enhanced analyzer for MCP vs Native comparison with detailed method tracking"""
def __init__(self, workspace_path: Path, session_id: str):
"""Initialize the enhanced analyzer"""
self.workspace_path = workspace_path
self.session_id = session_id
self.query_metrics: List[EnhancedQueryMetrics] = []
# Create analysis directory
self.analysis_dir = Path(f"mcp_analysis_{session_id}")
self.analysis_dir.mkdir(exist_ok=True)
# MCP server log monitoring
self.mcp_log_file = None
self.setup_mcp_monitoring()
def setup_mcp_monitoring(self):
"""Setup MCP server log monitoring for method detection"""
# Look for MCP server logs
potential_log_paths = [
"PathUtils.get_temp_path() / "mcp_server.log",
f"{self.workspace_path}/.mcp/logs/server.log",
f"{self.workspace_path}/mcp_server.log"
]
for log_path in potential_log_paths:
if Path(log_path).exists():
self.mcp_log_file = log_path
logger.info(f"Found MCP log file: {log_path}")
break
if not self.mcp_log_file:
logger.warning("No MCP log file found - method detection will be limited")
def estimate_token_count(self, text: str) -> int:
"""Estimate token count using improved heuristics"""
if not text:
return 0
# More accurate token estimation
# Account for code vs natural language differences
if self._is_code_content(text):
# Code is typically more token-dense
return max(1, len(text) // 3)
else:
# Natural language
return max(1, len(text) // 4)
def _is_code_content(self, text: str) -> bool:
"""Detect if text content is code vs natural language"""
code_indicators = ['{', '}', '()', '=>', 'function', 'class', 'def ', 'import ', '#include']
return any(indicator in text for indicator in code_indicators)
def detect_retrieval_method(self, transcript_content: str, mcp_response: Dict[str, Any] = None) -> RetrievalMethodMetrics:
"""Detect which retrieval method was used"""
method_metrics = RetrievalMethodMetrics(RetrievalMethod.NATIVE_GREP)
# Check for MCP tool usage
if 'mcp__code-index-mcp__search_code' in transcript_content:
method_metrics.method_type = RetrievalMethod.SEMANTIC if 'semantic=true' in transcript_content else RetrievalMethod.SQL_FTS
# Analyze MCP response for more details
if mcp_response:
method_metrics = self._analyze_mcp_response(mcp_response, method_metrics)
elif 'mcp__code-index-mcp__symbol_lookup' in transcript_content:
method_metrics.method_type = RetrievalMethod.SQL_FTS # Symbol lookup typically uses SQL
# Check for native tools
elif 'Grep(' in transcript_content:
method_metrics.method_type = RetrievalMethod.NATIVE_GREP
elif 'Read(' in transcript_content:
method_metrics.method_type = RetrievalMethod.NATIVE_READ
elif 'Glob(' in transcript_content:
method_metrics.method_type = RetrievalMethod.NATIVE_GLOB
# Analyze metadata quality
method_metrics.line_numbers_available = '_usage_hint' in transcript_content or 'offset=' in transcript_content
method_metrics.snippets_provided = 'snippet' in transcript_content
method_metrics.usage_hints_generated = '_usage_hint' in transcript_content
method_metrics.calculate_quality_score()
return method_metrics
def _analyze_mcp_response(self, mcp_response: Dict[str, Any], metrics: RetrievalMethodMetrics) -> RetrievalMethodMetrics:
"""Analyze MCP response to determine exact method used"""
if not mcp_response:
return metrics
# Check for semantic indicators
if any('score' in str(item) for item in mcp_response.get('results', [])):
metrics.method_type = RetrievalMethod.SEMANTIC
# Try to detect collection used
if 'collection' in str(mcp_response):
collection_match = re.search(r'collection["\']:\s*["\']([^"\']+)', str(mcp_response))
if collection_match:
metrics.collection_used = collection_match.group(1)
# Check for SQL schema indicators
elif any('bm25' in str(item).lower() for item in mcp_response.get('results', [])):
metrics.method_type = RetrievalMethod.SQL_BM25
metrics.schema_used = "bm25_content"
elif any('fts' in str(item).lower() for item in mcp_response.get('results', [])):
metrics.method_type = RetrievalMethod.SQL_FTS
metrics.schema_used = "fts_code"
# Extract results metadata
results = mcp_response.get('results', [])
metrics.results_count = len(results)
if results:
first_result = results[0]
metrics.line_numbers_available = 'line' in first_result
metrics.snippets_provided = 'snippet' in first_result or 'content' in first_result
metrics.usage_hints_generated = '_usage_hint' in first_result
return metrics
def analyze_edit_behavior(self, transcript_content: str, retrieval_time: float) -> EditBehaviorMetrics:
"""Analyze edit behavior patterns"""
edit_metrics = EditBehaviorMetrics()
# Find edit operations
edit_pattern = r'(Edit|MultiEdit|Write)\('
edit_matches = list(re.finditer(edit_pattern, transcript_content))
if edit_matches:
first_edit_time = retrieval_time + 1000 # Approximate
edit_metrics.search_to_edit_time_ms = first_edit_time - retrieval_time
# Classify edit type
if 'MultiEdit(' in transcript_content:
edit_metrics.edit_type = EditType.MULTI_EDIT
elif 'Write(' in transcript_content and 'Read(' in transcript_content:
edit_metrics.edit_type = EditType.FULL_REWRITE
elif 'Edit(' in transcript_content:
edit_metrics.edit_type = EditType.TARGETED_EDIT
# Count context reads before edits
read_pattern = r'Read\([^)]*\)'
read_matches = list(re.finditer(read_pattern, transcript_content))
edit_metrics.context_reads_before_edit = len(read_matches)
# Check for offset/limit usage (indicates targeted reading)
edit_metrics.used_offset_limit = 'offset=' in transcript_content or 'limit=' in transcript_content
edit_metrics.read_entire_file = 'Read(' in transcript_content and 'offset=' not in transcript_content
return edit_metrics
def analyze_cache_usage(self, transcript_content: str) -> CacheTokenMetrics:
"""Analyze cache token usage patterns"""
cache_metrics = CacheTokenMetrics()
# Look for cache indicators in transcript
cache_read_pattern = r'cache_read_input_tokens["\']:\s*(\d+)'
cache_creation_pattern = r'cache_creation_input_tokens["\']:\s*(\d+)'
cache_read_matches = re.findall(cache_read_pattern, transcript_content)
cache_creation_matches = re.findall(cache_creation_pattern, transcript_content)
if cache_read_matches:
cache_metrics.cache_read_input_tokens = sum(int(match) for match in cache_read_matches)
if cache_creation_matches:
cache_metrics.cache_creation_input_tokens = sum(int(match) for match in cache_creation_matches)
# Calculate efficiency metrics
total_cache = cache_metrics.cache_read_input_tokens + cache_metrics.cache_creation_input_tokens
if total_cache > 0:
cache_metrics.cache_hit_rate = cache_metrics.cache_read_input_tokens / total_cache
return cache_metrics
def parse_enhanced_transcript(self, transcript_content: str, query_text: str, approach: str) -> EnhancedQueryMetrics:
"""Parse transcript with enhanced analysis"""
query_id = str(uuid.uuid4())
start_time = datetime.now()
# Create comprehensive metrics
metrics = EnhancedQueryMetrics(
query_id=query_id,
query_text=query_text,
approach=approach,
start_time=start_time,
end_time=datetime.now()
)
# Initialize token breakdown
token_breakdown = GranularTokenBreakdown(
interaction_id=query_id,
timestamp=start_time
)
# Detailed token analysis
token_breakdown.user_prompt_tokens = self.estimate_token_count(query_text)
# Extract tool responses and file content
tool_response_content = self._extract_tool_responses(transcript_content)
file_content = self._extract_file_content(transcript_content)
token_breakdown.tool_response_tokens = self.estimate_token_count(tool_response_content)
token_breakdown.file_content_tokens = self.estimate_token_count(file_content)
# Analyze cache usage
token_breakdown.cache_metrics = self.analyze_cache_usage(transcript_content)
# Categorize output tokens
token_breakdown.reasoning_tokens = self._estimate_reasoning_tokens(transcript_content)
token_breakdown.tool_invocation_tokens = self._estimate_tool_invocation_tokens(transcript_content)
token_breakdown.code_generation_tokens = self._estimate_code_generation_tokens(transcript_content)
# Calculate totals
token_breakdown.calculate_totals()
metrics.token_breakdown = token_breakdown
# Analyze retrieval method
mcp_response = self._extract_mcp_response(transcript_content)
metrics.retrieval_metrics = self.detect_retrieval_method(transcript_content, mcp_response)
# Analyze edit behavior
metrics.edit_metrics = self.analyze_edit_behavior(transcript_content, metrics.retrieval_metrics.response_time_ms)
# Track tool usage
metrics.tools_used = self._extract_tools_used(transcript_content)
metrics.mcp_tools_used = [tool for tool in metrics.tools_used if 'mcp__' in tool]
metrics.native_tools_used = [tool for tool in metrics.tools_used if 'mcp__' not in tool]
metrics.success = 'error' not in transcript_content.lower()
metrics.response_time_ms = metrics.duration
return metrics
def _extract_tool_responses(self, transcript: str) -> str:
"""Extract tool response content from transcript"""
# This would be implemented based on the actual transcript format
return ""
def _extract_file_content(self, transcript: str) -> str:
"""Extract file content from transcript"""
# This would be implemented based on the actual transcript format
return ""
def _extract_mcp_response(self, transcript: str) -> Optional[Dict[str, Any]]:
"""Extract MCP response from transcript"""
# This would be implemented based on the actual transcript format
return None
def _estimate_reasoning_tokens(self, transcript: str) -> int:
"""Estimate tokens used for reasoning"""
# Implementation would analyze reasoning patterns in transcript
return 0
def _estimate_tool_invocation_tokens(self, transcript: str) -> int:
"""Estimate tokens used for tool invocations"""
# Implementation would count tool call tokens
return 0
def _estimate_code_generation_tokens(self, transcript: str) -> int:
"""Estimate tokens used for code generation"""
# Implementation would analyze code generation patterns
return 0
def _extract_tools_used(self, transcript: str) -> List[str]:
"""Extract list of tools used from transcript"""
tools = []
tool_patterns = [
r'mcp__code-index-mcp__search_code',
r'mcp__code-index-mcp__symbol_lookup',
r'Read\(',
r'Grep\(',
r'Glob\(',
r'Edit\(',
r'MultiEdit\(',
r'Write\('
]
for pattern in tool_patterns:
if re.search(pattern, transcript):
tool_name = pattern.replace(r'\(', '').replace(r'\\', '')
tools.append(tool_name)
return tools
def save_metrics(self, metrics: EnhancedQueryMetrics):
"""Save metrics to analysis directory"""
metrics_file = self.analysis_dir / f"query_{metrics.query_id}.json"
with open(metrics_file, 'w') as f:
json.dump(asdict(metrics), f, indent=2, default=str)
# Also append to session summary
summary_file = self.analysis_dir / "session_summary.jsonl"
with open(summary_file, 'a') as f:
f.write(json.dumps(asdict(metrics), default=str) + '\n')
def generate_enhanced_report(self) -> Dict[str, Any]:
"""Generate comprehensive analysis report"""
if not self.query_metrics:
return {}
# Group metrics by approach
mcp_metrics = [m for m in self.query_metrics if m.approach == 'mcp']
native_metrics = [m for m in self.query_metrics if m.approach == 'native']
report = {
"session_id": self.session_id,
"test_date": datetime.now().isoformat(),
"total_queries": len(self.query_metrics),
"mcp_queries": len(mcp_metrics),
"native_queries": len(native_metrics),
"retrieval_method_analysis": self._analyze_retrieval_methods(),
"token_efficiency_analysis": self._analyze_token_efficiency(),
"edit_behavior_analysis": self._analyze_edit_behavior(),
"cache_utilization_analysis": self._analyze_cache_utilization(),
"recommendations": self._generate_recommendations()
}
return report
def _analyze_retrieval_methods(self) -> Dict[str, Any]:
"""Analyze retrieval method performance"""
method_stats = {}
for method in RetrievalMethod:
matching_metrics = [m for m in self.query_metrics if m.retrieval_metrics.method_type == method]
if matching_metrics:
method_stats[method.value] = {
"count": len(matching_metrics),
"avg_response_time": sum(m.retrieval_metrics.response_time_ms for m in matching_metrics) / len(matching_metrics),
"avg_results": sum(m.retrieval_metrics.results_count for m in matching_metrics) / len(matching_metrics),
"avg_metadata_quality": sum(m.retrieval_metrics.metadata_quality_score for m in matching_metrics) / len(matching_metrics),
"success_rate": sum(1 for m in matching_metrics if m.success) / len(matching_metrics)
}
return method_stats
def _analyze_token_efficiency(self) -> Dict[str, Any]:
"""Analyze token usage efficiency"""
return {
"avg_input_tokens": sum(m.token_breakdown.total_input_tokens for m in self.query_metrics) / len(self.query_metrics),
"avg_output_tokens": sum(m.token_breakdown.total_output_tokens for m in self.query_metrics) / len(self.query_metrics),
"avg_cache_efficiency": sum(m.token_breakdown.cache_metrics.cache_hit_rate for m in self.query_metrics) / len(self.query_metrics),
"edit_token_efficiency": self._calculate_edit_token_efficiency()
}
def _analyze_edit_behavior(self) -> Dict[str, Any]:
"""Analyze edit behavior patterns"""
edit_types = {}
for edit_type in EditType:
matching = [m for m in self.query_metrics if m.edit_metrics.edit_type == edit_type]
if matching:
edit_types[edit_type.value] = {
"count": len(matching),
"avg_precision": sum(m.edit_metrics.edit_precision_ratio for m in matching) / len(matching),
"avg_context_reads": sum(m.edit_metrics.context_reads_before_edit for m in matching) / len(matching)
}
return {"edit_type_distribution": edit_types}
def _analyze_cache_utilization(self) -> Dict[str, Any]:
"""Analyze cache utilization patterns"""
cache_stats = {
"avg_cache_read_tokens": sum(m.token_breakdown.cache_metrics.cache_read_input_tokens for m in self.query_metrics) / len(self.query_metrics),
"avg_cache_hit_rate": sum(m.token_breakdown.cache_metrics.cache_hit_rate for m in self.query_metrics) / len(self.query_metrics),
"cache_efficiency_correlation": self._calculate_cache_efficiency_correlation()
}
return cache_stats
def _calculate_edit_token_efficiency(self) -> float:
"""Calculate token efficiency for edit operations"""
edit_metrics = [m for m in self.query_metrics if m.edit_metrics.edit_type != EditType.NO_EDIT]
if not edit_metrics:
return 0.0
total_tokens = sum(m.token_breakdown.total_input_tokens + m.token_breakdown.total_output_tokens for m in edit_metrics)
total_lines_changed = sum(m.edit_metrics.lines_changed for m in edit_metrics)
return total_tokens / max(total_lines_changed, 1)
def _calculate_cache_efficiency_correlation(self) -> float:
"""Calculate correlation between cache usage and performance"""
# Simplified correlation calculation
cache_rates = [m.token_breakdown.cache_metrics.cache_hit_rate for m in self.query_metrics]
response_times = [m.response_time_ms for m in self.query_metrics]
if len(cache_rates) < 2:
return 0.0
# Simple correlation coefficient
import statistics
try:
return statistics.correlation(cache_rates, response_times)
except:
return 0.0
def _generate_recommendations(self) -> List[str]:
"""Generate optimization recommendations"""
recommendations = []
# Analyze method performance
method_stats = self._analyze_retrieval_methods()
# Find best performing methods
if method_stats:
best_method = min(method_stats.items(), key=lambda x: x[1]['avg_response_time'])
recommendations.append(f"Use {best_method[0]} for fastest retrieval (avg {best_method[1]['avg_response_time']:.1f}ms)")
# Token efficiency recommendations
token_stats = self._analyze_token_efficiency()
if token_stats['avg_cache_efficiency'] < 0.5:
recommendations.append("Improve cache utilization - current hit rate below 50%")
# Edit behavior recommendations
edit_stats = self._analyze_edit_behavior()
targeted_edits = edit_stats['edit_type_distribution'].get('targeted_edit', {}).get('count', 0)
full_rewrites = edit_stats['edit_type_distribution'].get('full_rewrite', {}).get('count', 0)
if full_rewrites > targeted_edits:
recommendations.append("Focus on improving retrieval metadata quality to enable more targeted edits")
return recommendations
def main():
"""Example usage of the enhanced analysis framework"""
workspace = Path("PathUtils.get_workspace_root()")
session_id = f"enhanced_analysis_{int(time.time())}"
analyzer = EnhancedMCPAnalyzer(workspace, session_id)
# Example test queries would be run here
logger.info(f"Enhanced MCP Analysis Framework initialized for session {session_id}")
logger.info(f"Analysis directory: {analyzer.analysis_dir}")
return analyzer
if __name__ == "__main__":
main()