#!/usr/bin/env python
"""
AST/ASG Code Analysis MCP Server
Provides code structure and semantic analysis through MCP.
Includes enhanced features for scope handling, incremental parsing, and Neo4j integration.
"""
import fnmatch # Add this line
import hashlib
import json
import os
import sys
from collections import OrderedDict
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, cast
from mcp.server.fastmcp import FastMCP
from ast_mcp_server.neo4j_tools import sync_file_to_graph
from ast_mcp_server.output_manager import AnalysisOutputManager, get_output_manager
from ast_mcp_server.resources import (
CACHE_DIR,
get_cache_path,
register_resources,
)
from ast_mcp_server.server_llm import get_server_llm
from ast_mcp_server.tools import (
analyze_code_structure,
create_asg_from_ast,
detect_language,
init_parsers,
parse_code_to_ast,
register_tools,
)
# Optional tool modules - keep conditional imports as they are
# Conditionally import optional tool modules
register_enhanced_tools: Optional[Callable[[Any], None]] = None
try:
from ast_mcp_server.enhanced_tools import (
register_enhanced_tools as _register_enhanced_tools,
)
register_enhanced_tools = _register_enhanced_tools
ENHANCED_TOOLS_AVAILABLE = True
except ImportError:
ENHANCED_TOOLS_AVAILABLE = False
register_transformation_tools: Optional[Callable[[Any], None]] = None
try:
from ast_mcp_server.transformation_tools import (
register_transformation_tools as _register_transformation_tools,
)
register_transformation_tools = _register_transformation_tools
TRANSFORMATION_TOOLS_AVAILABLE = True
except ImportError:
TRANSFORMATION_TOOLS_AVAILABLE = False
register_neo4j_tools: Optional[Callable[[Any], None]] = None
try:
from ast_mcp_server.neo4j_tools import register_neo4j_tools as _register_neo4j_tools
register_neo4j_tools = _register_neo4j_tools
NEO4J_TOOLS_AVAILABLE = True
except ImportError:
NEO4J_TOOLS_AVAILABLE = False
register_uss_agent_tools: Optional[Callable[[Any], None]] = None
try:
from ast_mcp_server.uss_agent_tools import (
register_uss_agent_tools as _register_uss_agent_tools,
)
register_uss_agent_tools = _register_uss_agent_tools
USS_AGENT_AVAILABLE = True
except ImportError:
USS_AGENT_AVAILABLE = False
# Initialize MCP server
mcp = FastMCP("AstAnalyzer")
# Register all tool modules
register_tools(mcp)
if ENHANCED_TOOLS_AVAILABLE and register_enhanced_tools is not None:
register_enhanced_tools(mcp)
if TRANSFORMATION_TOOLS_AVAILABLE and register_transformation_tools is not None:
register_transformation_tools(mcp)
if NEO4J_TOOLS_AVAILABLE and register_neo4j_tools is not None:
register_neo4j_tools(mcp)
if USS_AGENT_AVAILABLE and register_uss_agent_tools is not None:
register_uss_agent_tools(mcp)
register_resources(mcp)
# LRU cache for incremental parsing
MAX_AST_CACHE_SIZE = int(os.environ.get("AST_CACHE_SIZE", "100"))
class LRUCache(OrderedDict[str, Dict[str, Any]]):
"""LRU cache with maximum size limit."""
def __init__(self, max_size: int = 100):
super().__init__()
self.max_size = max_size
def __setitem__(self, key: str, value: Dict[str, Any]) -> None:
if key in self:
self.move_to_end(key)
super().__setitem__(key, value)
if len(self) > self.max_size:
oldest = next(iter(self))
del self[oldest]
def get_with_touch(self, key: str) -> Optional[Dict[str, Any]]:
"""Get item and move to end (most recently used)."""
if key in self:
self.move_to_end(key)
return self[key]
return None
AST_CACHE: LRUCache = LRUCache(MAX_AST_CACHE_SIZE)
def cached_parse_to_ast(
code: Optional[str] = None,
language: Optional[str] = None,
filename: Optional[str] = None,
) -> Dict[str, Any]:
"""Parse code to AST with LRU caching.
Avoids re-parsing identical code. Cache key is MD5 hash of code + language.
"""
# Detect language for consistent cache keys
# Detect language for consistent cache keys
if not language:
language = detect_language(code or "", filename)
# Use filename as cache key component if code is None (lazy hash)
# But parse requires content. tools.parse_code_to_ast handles reading,
# so we should probably read here if we want a content hash.
# For now, let's defer reading to the tool but use filename in hash if code is missing.
content_key = code if code else (filename if filename else "")
cache_key = hashlib.md5(
f"{content_key}:{language}".encode(), usedforsecurity=False
).hexdigest()
# Check cache first
cached = AST_CACHE.get_with_touch(cache_key)
if cached is not None:
return cached
# Parse and cache
result = parse_code_to_ast(code, language, filename)
if "error" not in result:
AST_CACHE[cache_key] = result
return result
# ============================================================================
# analyze_project: Unique tool - saves analysis to files with optional AI summary
# ============================================================================
@mcp.tool()
def analyze_source_file(
project_name: str,
code: Optional[str] = None,
language: Optional[str] = None,
filename: Optional[str] = None,
include_summary: bool = True,
output_folder: Optional[str] = None,
) -> Dict[str, Any]:
"""Analyze a single source file, save reports to disk, and optionally generate an AI summary."""
# Use cached parsing to avoid re-parsing identical code
ast_data = cached_parse_to_ast(code, language, filename)
if "error" in ast_data:
return ast_data
asg_data = create_asg_from_ast(ast_data)
analysis_data = analyze_code_structure(code, language, filename)
output_manager: AnalysisOutputManager = get_output_manager()
result = output_manager.save_analysis(
project_name=project_name,
ast_data=ast_data,
asg_data=asg_data,
structure_data=analysis_data,
output_path=Path(output_folder) if output_folder else None,
source_filename=filename,
)
ai_summary = None
if include_summary:
try:
llm = get_server_llm()
structure_data = analysis_data.get("structure")
# func_count, class_count, import_count replaced by detailed lists below
# Ensure we have code for preview
preview_code = code
if not preview_code and filename and os.path.exists(filename):
try:
with open(filename, "r", encoding="utf-8") as f:
preview_code = f.read()
except (OSError, IOError) as e:
preview_code = f"(Error reading file: {e})"
# Increase limit significantly (Claude Haiku has 200k context, 15k chars is safe)
preview_text = preview_code[:15000] if preview_code else ""
# Extract detailed metadata
func_names = (
[f["name"] for f in structure_data.get("functions", [])]
if structure_data
else []
)
class_names = (
[c["name"] for c in structure_data.get("classes", [])]
if structure_data
else []
)
# Extract relationships from ASG
edges = asg_data.get("edges", []) if asg_data else []
relationships = []
for edge in edges:
e_type = edge.get("type")
if e_type in ["calls", "imports", "inherits", "calls_import"]:
relationships.append(str(e_type))
# Count relationship types
rel_counts: dict[str, int] = {}
for r in relationships:
rel_counts[r] = rel_counts.get(r, 0) + 1
rel_summary = ", ".join([f"{k}: {v}" for k, v in rel_counts.items()])
prompt = f"""Summarize this code file and its role in the project.
Metadata:
- Project: {project_name}
- File: {filename if filename else 'unknown'}
- Language: {ast_data.get("language", "unknown")}
- Classes: {", ".join(class_names) if class_names else "None"}
- Functions: {", ".join(func_names[:50])} {'...' if len(func_names)>50 else ''}
- Relationships detected: {rel_summary if rel_summary else "None detected"}
Code Content:
{preview_text}
Instructions:
1. Describe the main responsibility of this code.
2. List the key components (classes/functions) and what they do.
3. specific connections to other scripts or libraries based on imports and logic detected.
4. Provide a "Graph Insight" section describing how this file connects to the rest of the system.
"""
ai_summary = llm.chat_sync(prompt, max_tokens=1000)
summary_path = Path(result["folder"]) / "summary.txt"
summary_path.write_text(ai_summary)
result["files_created"].append("summary.txt")
except (RuntimeError, ValueError, TypeError, KeyError) as e:
ai_summary = f"(Summary unavailable: {e})"
if "401" in str(e) or "Unauthorized" in str(e):
# Propagate auth error to caller if possible, or just keep it in summary
# For now, let's just make the error message recognizable
ai_summary = f"AUTH_ERROR: {e}"
response: Dict[str, Any] = {
"status": "success",
"project_name": project_name,
"language": ast_data.get("language", "unknown"),
"output_folder": result["folder"],
"files_created": result["files_created"],
}
if ai_summary:
response["summary"] = ai_summary
return response
# ============================================================================
# Resources for cached data access
# ============================================================================
if ENHANCED_TOOLS_AVAILABLE:
@mcp.resource("diff://{diff_hash}")
def diff_resource(diff_hash: str) -> Dict[str, Any]:
"""Resource for cached AST diff between code versions."""
cache_path = os.path.join(CACHE_DIR, f"{diff_hash}_diff.json")
if os.path.exists(cache_path):
try:
with open(cache_path, "r", encoding="utf-8") as f:
return cast(Dict[str, Any], json.load(f))
except (OSError, IOError, json.JSONDecodeError) as e:
return {"error": f"Error reading cached diff: {e}"}
return {"error": "Diff not found. Use diff_ast tool first."}
@mcp.resource("enhanced_asg://{code_hash}")
def enhanced_asg_resource(code_hash: str) -> Dict[str, Any]:
"""Resource for cached enhanced ASG."""
cache_path = get_cache_path(code_hash, "enhanced_asg")
if os.path.exists(cache_path):
try:
with open(cache_path, "r", encoding="utf-8") as f:
return cast(Dict[str, Any], json.load(f))
except (OSError, IOError, json.JSONDecodeError) as e:
return {"error": f"Error reading cached enhanced ASG: {e}"}
return {
"error": "Enhanced ASG not found. Use generate_enhanced_asg tool first."
}
@mcp.tool()
def analyze_project(
project_path: str,
project_name: str,
file_extensions: Optional[List[str]] = None,
sync_to_db: bool = True,
include_summary: bool = True,
) -> Dict[str, Any]:
"""Recursively analyze a project, generate reports, and optionaly sync to Graph DB.
Args:
project_path: Root directory to analyze
project_name: Name of the project (for output grouping)
file_extensions: List of extensions to include (default: .py, .js, .ts, .tsx, .go)
sync_to_db: Whether to sync nodes/edges to Neo4j (default: True)
include_summary: Whether to generate AI summaries for each file (default: True)
"""
if file_extensions is None:
file_extensions = [".py", ".js", ".ts", ".tsx", ".go"]
processed_count = 0
failed_count = 0
synced_count = 0
failures = []
# Create ONE output folder for the whole project
output_manager = get_output_manager()
project_folder = output_manager.create_analysis_folder(project_name)
if not os.path.exists(project_path):
return {"error": f"Project path {project_path} does not exist"}
llm_error_count = 0
llm_active = include_summary
# Walk directory
for root, dirs, files in os.walk(project_path):
# Skip ignores
dirs[:] = [
d
for d in dirs
if d
not in [
".git",
"node_modules",
"venv",
"__pycache__",
".ipynb_checkpoints",
"analyzed_projects",
"dist",
".vscode",
".mypy_cache",
".env",
"build",
"vendor",
"site-packages",
"target",
"bazel-bin",
"bazel-out",
".gradle",
"gradle",
"maven",
"out",
"public",
"static",
".idea",
".pytest_cache",
".trunk",
".next",
".svelte-kit",
"logs",
"tmp",
"docs/_build",
"html",
"man",
]
]
for file in files:
# Define common ignored file patterns
ignored_file_patterns = [
".DS_Store",
"Thumbs.db",
"*.log",
"*.tmp",
"*~",
".#*",
".env",
"*.pt",
"*.bak",
"*.swp",
"*.orig",
"*.pyc",
"*.class",
"*.o",
"*.obj",
"*.exe",
"*.dll",
"*.so",
"*.dylib",
"*.onnx",
"*.pkl",
"*.bin",
"*.zip",
"*.tar.gz",
"*.rar",
"*.7z",
"*.jar",
"*.war",
"*.wasm",
"*.jpg",
"*.jpeg",
"*.png",
"*.gif",
"*.bmp",
"*.svg",
"*.mp3",
"*.wav",
"*.ogg",
"*.flac",
"*.mp4",
"*.avi",
"*.mov",
]
# Skip ignored files
if any(fnmatch.fnmatch(file, pattern) for pattern in ignored_file_patterns):
continue
ext = os.path.splitext(file)[1]
if ext not in file_extensions:
continue
full_path = os.path.join(root, file)
try:
# Read code once
with open(full_path, "r", encoding="utf-8", errors="ignore") as f:
code_content = f.read()
# 1. Analyze (Local JSON + Summary)
# If we've hit too many auth errors, disable summaries for the rest of the run
current_include_summary = llm_active and (llm_error_count < 3)
res = analyze_source_file(
project_name=project_name,
code=code_content,
filename=full_path,
include_summary=current_include_summary,
output_folder=str(project_folder),
)
if "AUTH_ERROR" in res.get("summary", ""):
llm_error_count += 1
if llm_error_count >= 3:
llm_active = False # Kill switch
processed_count += 1
# 2. Sync to DB
if sync_to_db:
sync_res = sync_file_to_graph(
code=code_content,
file_path=full_path,
project_name=project_name,
)
if "error" not in sync_res:
synced_count += 1
else:
failed_count += 1
failures.append(
{
"file": full_path,
"error": f"Sync to DB failed: {sync_res.get('error', 'Unknown')}",
}
)
except (RuntimeError, OSError, IOError) as e:
failed_count += 1
failures.append({"file": full_path, "error": str(e)})
return {
"processed_files": processed_count,
"failed_files": failed_count,
"synced_files": synced_count,
"output_folder": str(project_folder),
"llm_summaries_completed": (llm_error_count < 3) and include_summary,
"failures": failures,
}
def main() -> None:
"""Main entry point for the AST MCP Server."""
# CRITICAL: All startup messages must go to stderr to avoid corrupting
# the MCP JSONRPC protocol on stdout
print("Starting AST/ASG Code Analysis MCP Server...", file=sys.stderr)
if not init_parsers():
print(
"WARNING: Tree-sitter parsers not found. Run 'uv run build-parsers'.",
file=sys.stderr,
)
else:
print("✓ Tree-sitter parsers initialized", file=sys.stderr)
if ENHANCED_TOOLS_AVAILABLE:
print("✓ Enhanced tools (incremental parsing, diff)", file=sys.stderr)
if TRANSFORMATION_TOOLS_AVAILABLE:
print("✓ Transformation tools (ast-grep)", file=sys.stderr)
if NEO4J_TOOLS_AVAILABLE:
print("✓ Neo4j integration", file=sys.stderr)
if USS_AGENT_AVAILABLE:
print("✓ USS Agent (natural language queries)", file=sys.stderr)
print("\nRunning MCP server...", file=sys.stderr)
mcp.run()
if __name__ == "__main__":
main()