tools.py•79 kB
"""MCP tools for breadcrumb operations."""
import json
import re
import zipfile
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from mcp.server import Server
from mcp.types import TextContent, Tool
def _validate_path(root: Path, *parts: str) -> Path:
"""
Validate that the constructed path is within root directory.
This prevents path traversal attacks by ensuring the final resolved
path is contained within the root directory.
Args:
root: The base directory that paths must stay within
*parts: Path components to join
Returns:
The validated absolute path
Raises:
ValueError: If the path attempts to escape the root directory
"""
# Resolve root to absolute path
root_resolved = root.resolve()
# Construct target path and resolve it
target = root_resolved.joinpath(*parts).resolve()
# Check if target is within root
try:
target.relative_to(root_resolved)
except ValueError:
raise ValueError(
f"Path traversal attempt blocked: path would escape root directory"
)
return target
def register_tools(server: Server, breadcrumb_root: Path, templates_dir: Path):
"""Register all breadcrumb tools."""
# Project Management Tools
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="create_project",
description="Create a new breadcrumb project with folder structure and templates",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Project name (will be used as directory name)",
},
"language_stack": {
"type": "string",
"description": "Programming language/stack (e.g., 'TypeScript, React, Node.js')",
},
"purpose": {
"type": "string",
"description": "One sentence description of the project purpose",
},
"repository": {
"type": "string",
"description": "Repository URL or path",
},
},
"required": ["name"],
},
),
Tool(
name="list_projects",
description="List all breadcrumb projects",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="get_project_info",
description="Get project information from project-index.md",
inputSchema={
"type": "object",
"properties": {
"project": {
"type": "string",
"description": "Project name",
},
},
"required": ["project"],
},
),
# Tool(
# name="search_breadcrumbs",
# description="Search across breadcrumb documentation with filters",
# inputSchema={
# "type": "object",
# "properties": {
# "query": {
# "type": "string",
# "description": "Search term or phrase",
# },
# "project": {
# "type": "string",
# "description": "Limit search to specific project (optional)",
# },
# "doc_type": {
# "type": "string",
# "enum": ["session", "component", "adr", "pattern", "index"],
# "description": "Filter by document type (optional)",
# },
# "case_sensitive": {
# "type": "boolean",
# "description": "Case-sensitive search (default: false)",
# "default": False,
# },
# "limit": {
# "type": "integer",
# "description": "Maximum number of results (default: 10)",
# "default": 10,
# },
# },
# "required": ["query"],
# },
# ),
Tool(
name="create_session",
description="Create a new session log from template",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string", "description": "Project name"},
"topic": {
"type": "string",
"description": "Session topic/focus (used in filename)",
},
"focus": {
"type": "string",
"description": "Brief description of session goal",
},
"status": {
"type": "string",
"enum": ["In Progress", "Completed", "Blocked"],
"default": "In Progress",
},
},
"required": ["project", "topic"],
},
),
Tool(
name="create_component",
description="Create a new component documentation file",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
"name": {"type": "string", "description": "Component name"},
},
"required": ["project", "name"],
},
),
Tool(
name="create_adr",
description="Create a new Architectural Decision Record",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
"title": {"type": "string", "description": "Decision title"},
},
"required": ["project", "title"],
},
),
Tool(
name="create_pattern",
description="Create a new pattern documentation file",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
"name": {"type": "string", "description": "Pattern name"},
},
"required": ["project", "name"],
},
),
Tool(
name="add_learning",
description="Add a learning to the project index's Recent Learnings section",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
"learning": {"type": "string", "description": "Learning text"},
},
"required": ["project", "learning"],
},
),
Tool(
name="quick_note",
description="Add a timestamped quick note to project index",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
"note": {"type": "string", "description": "Note text"},
},
"required": ["project", "note"],
},
),
Tool(
name="update_project_status",
description="Update the project status in project-index.md",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
"status": {
"type": "string",
"enum": ["Active", "Maintenance", "Archived"],
},
},
"required": ["project", "status"],
},
),
Tool(
name="get_recent_sessions",
description="Get the most recent session logs for a project",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
"limit": {
"type": "integer",
"description": "Number of sessions to return (default: 3)",
"default": 3,
},
},
"required": ["project"],
},
),
Tool(
name="find_related_docs",
description="Find documents related to a specific document by links, tags, and topics",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
"doc_path": {
"type": "string",
"description": "Relative path to document (e.g., 'sessions/2025-01-15-auth.md')",
},
},
"required": ["project", "doc_path"],
},
),
Tool(
name="list_tags",
description="List all unique tags across breadcrumbs with counts",
inputSchema={
"type": "object",
"properties": {
"project": {
"type": "string",
"description": "Limit to specific project (optional)",
},
},
},
),
Tool(
name="get_timeline",
description="Show chronological activity timeline for a project",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
"days": {
"type": "integer",
"description": "Number of days to include (default: 30)",
"default": 30,
},
},
"required": ["project"],
},
),
Tool(
name="get_project_stats",
description="Get statistics for a project (document counts, word counts, etc.)",
inputSchema={
"type": "object",
"properties": {
"project": {"type": "string"},
},
"required": ["project"],
},
),
# Export/Import Tools
Tool(
name="export_project",
description="Export project data to JSON or ZIP format for backup or sharing",
inputSchema={
"type": "object",
"properties": {
"project": {
"type": "string",
"description": "Project name to export (optional - exports all projects if not specified)",
},
"output_path": {
"type": "string",
"description": "Output file path (e.g., '/path/to/backup.json' or '/path/to/backup.zip')",
},
"format": {
"type": "string",
"enum": ["json", "zip"],
"description": "Export format (json or zip)",
"default": "json",
},
"include_metadata": {
"type": "boolean",
"description": "Include file metadata (timestamps, word counts)",
"default": True,
},
},
"required": ["output_path"],
},
),
Tool(
name="import_project",
description="Import project data from JSON or ZIP file",
inputSchema={
"type": "object",
"properties": {
"input_path": {
"type": "string",
"description": "Path to import file (.json or .zip)",
},
"overwrite": {
"type": "boolean",
"description": "Overwrite existing projects",
"default": False,
},
},
"required": ["input_path"],
},
),
# Full-text Search Tools
Tool(
name="search_fulltext",
description="Advanced full-text search with relevance scoring and ranking",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (supports AND, OR, NOT operators)",
},
"project": {
"type": "string",
"description": "Limit search to specific project (optional)",
},
"doc_type": {
"type": "string",
"enum": ["session", "component", "adr", "pattern", "index"],
"description": "Filter by document type (optional)",
},
"min_score": {
"type": "number",
"description": "Minimum relevance score (0.0-1.0, default: 0.0)",
"default": 0.0,
},
"limit": {
"type": "integer",
"description": "Maximum number of results (default: 20)",
"default": 20,
},
"context_lines": {
"type": "integer",
"description": "Number of context lines around matches (default: 2)",
"default": 2,
},
},
"required": ["query"],
},
),
# File Management Tools
Tool(
name="read_file",
description="Read the contents of a file in a project",
inputSchema={
"type": "object",
"properties": {
"project": {
"type": "string",
"description": "Project name",
},
"file_path": {
"type": "string",
"description": "Relative path to file within project (e.g., 'sessions/2025-01-15-auth.md')",
},
},
"required": ["project", "file_path"],
},
),
Tool(
name="update_file",
description="Update or create a file in a project",
inputSchema={
"type": "object",
"properties": {
"project": {
"type": "string",
"description": "Project name",
},
"file_path": {
"type": "string",
"description": "Relative path to file within project (e.g., 'sessions/2025-01-15-auth.md')",
},
"content": {
"type": "string",
"description": "New content for the file",
},
"create_if_missing": {
"type": "boolean",
"description": "Create file if it doesn't exist",
"default": True,
},
},
"required": ["project", "file_path", "content"],
},
),
Tool(
name="delete_file",
description="Delete a file from a project",
inputSchema={
"type": "object",
"properties": {
"project": {
"type": "string",
"description": "Project name",
},
"file_path": {
"type": "string",
"description": "Relative path to file within project",
},
},
"required": ["project", "file_path"],
},
),
# Cross-project Analysis Tools
Tool(
name="search_across_projects",
description="Search across all projects with aggregated results and statistics",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search term or phrase",
},
"group_by": {
"type": "string",
"enum": ["project", "doc_type", "tag"],
"description": "Group results by category",
"default": "project",
},
"limit": {
"type": "integer",
"description": "Maximum results per group (default: 5)",
"default": 5,
},
},
"required": ["query"],
},
),
Tool(
name="compare_projects",
description="Compare statistics and metadata across multiple projects",
inputSchema={
"type": "object",
"properties": {
"projects": {
"type": "array",
"items": {"type": "string"},
"description": "List of project names to compare (optional - compares all if not specified)",
},
"metrics": {
"type": "array",
"items": {
"type": "string",
"enum": [
"activity",
"size",
"tags",
"components",
"sessions",
],
},
"description": "Metrics to compare (default: all)",
},
},
},
),
Tool(
name="analyze_cross_project_patterns",
description="Analyze patterns, tags, and trends across all projects",
inputSchema={
"type": "object",
"properties": {
"analysis_type": {
"type": "string",
"enum": ["tags", "activity", "components", "links"],
"description": "Type of analysis to perform",
"default": "tags",
},
"time_range": {
"type": "integer",
"description": "Time range in days for activity analysis (default: 30)",
"default": 30,
},
},
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls."""
try:
if name == "list_projects":
return await _list_projects(breadcrumb_root)
elif name == "create_project":
return await _create_project(breadcrumb_root, templates_dir, **arguments)
elif name == "get_project_info":
return await _get_project_info(breadcrumb_root, arguments["project"])
elif name == "search_breadcrumbs":
return await _search_breadcrumbs(breadcrumb_root, **arguments)
elif name == "create_session":
return await _create_session(breadcrumb_root, templates_dir, **arguments)
elif name == "create_component":
return await _create_component(breadcrumb_root, templates_dir, **arguments)
elif name == "create_adr":
return await _create_adr(breadcrumb_root, templates_dir, **arguments)
elif name == "create_pattern":
return await _create_pattern(breadcrumb_root, templates_dir, **arguments)
elif name == "add_learning":
return await _add_learning(breadcrumb_root, **arguments)
elif name == "quick_note":
return await _quick_note(breadcrumb_root, **arguments)
elif name == "update_project_status":
return await _update_project_status(breadcrumb_root, **arguments)
elif name == "get_recent_sessions":
return await _get_recent_sessions(breadcrumb_root, **arguments)
elif name == "find_related_docs":
return await _find_related_docs(breadcrumb_root, **arguments)
elif name == "list_tags":
return await _list_tags(breadcrumb_root, arguments.get("project"))
elif name == "get_timeline":
return await _get_timeline(breadcrumb_root, **arguments)
elif name == "get_project_stats":
return await _get_project_stats(breadcrumb_root, arguments["project"])
# Export/Import tools
elif name == "export_project":
return await _export_project(breadcrumb_root, **arguments)
elif name == "import_project":
return await _import_project(breadcrumb_root, **arguments)
# Full-text search tools
elif name == "search_fulltext":
return await _search_fulltext(breadcrumb_root, **arguments)
# File management tools
elif name == "read_file":
return await _read_file(breadcrumb_root, **arguments)
elif name == "update_file":
return await _update_file(breadcrumb_root, **arguments)
elif name == "delete_file":
return await _delete_file(breadcrumb_root, **arguments)
# Cross-project analysis tools
elif name == "search_across_projects":
return await _search_across_projects(breadcrumb_root, **arguments)
elif name == "compare_projects":
return await _compare_projects(breadcrumb_root, **arguments)
elif name == "analyze_cross_project_patterns":
return await _analyze_cross_project_patterns(
breadcrumb_root, **arguments
)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
# Tool implementations
async def _list_projects(root: Path) -> list[TextContent]:
"""List all projects."""
projects_dir = root / "projects"
if not projects_dir.exists():
return [TextContent(type="text", text="No projects directory found")]
projects = []
for project_dir in sorted(projects_dir.iterdir()):
if project_dir.is_dir() and not project_dir.name.startswith("."):
index_file = project_dir / "project-index.md"
status = "Unknown"
if index_file.exists():
content = index_file.read_text()
status_match = re.search(r"\*\*Status:\*\*\s*\[?(\w+)", content)
if status_match:
status = status_match.group(1)
projects.append(f"- {project_dir.name} ({status})")
if not projects:
return [TextContent(type="text", text="No projects found")]
result = "Projects:\n" + "\n".join(projects)
return [TextContent(type="text", text=result)]
async def _create_project(
root: Path,
templates_dir: Path,
name: str,
language_stack: str = "",
purpose: str = "",
repository: str = "",
) -> list[TextContent]:
"""Create a new project."""
try:
# Validate project path to prevent traversal
project_dir = _validate_path(root, "projects", name)
if project_dir.exists():
return [TextContent(type="text", text=f"Project '{name}' already exists")]
# Create directory structure
project_dir.mkdir(parents=True)
(project_dir / "sessions").mkdir()
(project_dir / "components").mkdir()
(project_dir / "adr").mkdir()
(project_dir / "patterns").mkdir()
# Create project index from template
template = templates_dir / "project-index.md"
if template.exists():
content = template.read_text()
# Replace placeholders
content = content.replace("[Project Name]", name)
content = content.replace(
"[YYYY-MM-DD]", datetime.now().strftime("%Y-%m-%d")
)
content = content.replace("[Count]", "0")
content = content.replace("[Active/Maintenance/Archived]", "Active")
if language_stack:
content = content.replace(
"[e.g., TypeScript, React, Node.js]", language_stack
)
if purpose:
content = content.replace("[One sentence description]", purpose)
if repository:
content = content.replace("[URL or path]", repository)
(project_dir / "project-index.md").write_text(content)
result = f"Created project '{name}' at {project_dir}"
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error creating project: {str(e)}")]
async def _get_project_info(root: Path, project: str) -> list[TextContent]:
"""Get project information."""
try:
# Validate path to prevent traversal
index_file = _validate_path(root, "projects", project, "project-index.md")
if not index_file.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
content = index_file.read_text()
return [TextContent(type="text", text=content)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
# async def _search_breadcrumbs(
# root: Path,
# query: str,
# project: str = None,
# doc_type: str = None,
# case_sensitive: bool = False,
# limit: int = 10,
# ) -> list[TextContent]:
# """Search breadcrumb documentation."""
# results = []
# projects_dir = root / "projects"
#
# # Determine search scope
# if project:
# search_dirs = [projects_dir / project]
# else:
# search_dirs = [d for d in projects_dir.iterdir() if d.is_dir()]
#
# # Compile regex pattern
# flags = 0 if case_sensitive else re.IGNORECASE
# pattern = re.compile(re.escape(query), flags)
#
# for proj_dir in search_dirs:
# if not proj_dir.exists():
# continue
#
# # Determine subdirectories to search
# if doc_type == "session":
# search_paths = [proj_dir / "sessions"]
# elif doc_type == "component":
# search_paths = [proj_dir / "components"]
# elif doc_type == "adr":
# search_paths = [proj_dir / "adr"]
# elif doc_type == "pattern":
# search_paths = [proj_dir / "patterns"]
# elif doc_type == "index":
# search_paths = [proj_dir]
# else:
# search_paths = [
# proj_dir / "sessions",
# proj_dir / "components",
# proj_dir / "adr",
# proj_dir / "patterns",
# proj_dir,
# ]
#
# for search_path in search_paths:
# if not search_path.exists():
# continue
#
# for file_path in search_path.rglob("*.md"):
# if len(results) >= limit:
# break
#
# try:
# content = file_path.read_text()
# matches = list(pattern.finditer(content))
#
# if matches:
# relative_path = file_path.relative_to(projects_dir)
# # Get context around first match
# match = matches[0]
# start = max(0, match.start() - 100)
# end = min(len(content), match.end() + 100)
# context = content[start:end].strip()
#
# results.append(
# f"\n{relative_path} ({len(matches)} matches):\n...{context}..."
# )
# except Exception:
# pass
#
# if not results:
# return [TextContent(type="text", text=f"No results found for '{query}'")]
#
# result_text = f"Found {len(results)} results for '{query}':\n" + "\n".join(results)
# return [TextContent(type="text", text=result_text)]
#
async def _create_session(
root: Path, templates_dir: Path, project: str, topic: str, focus: str = "", status: str = "In Progress"
) -> list[TextContent]:
"""Create a new session log."""
try:
# Validate project path
project_dir = _validate_path(root, "projects", project)
if not project_dir.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
# Generate filename with date
date_str = datetime.now().strftime("%Y-%m-%d")
# Sanitize topic for filename
topic_slug = re.sub(r"[^\w\s-]", "", topic.lower())
topic_slug = re.sub(r"[-\s]+", "-", topic_slug)
filename = f"{date_str}-{topic_slug}.md"
session_file = _validate_path(root, "projects", project, "sessions", filename)
if session_file.exists():
return [
TextContent(
type="text", text=f"Session file already exists: {filename}"
)
]
# Read template
template_file = templates_dir / "session.md"
if template_file.exists():
content = template_file.read_text()
# Replace placeholders
now = datetime.now()
content = content.replace("[Topic/Focus]", topic)
content = content.replace(
"[YYYY-MM-DD HH:MM]", now.strftime("%Y-%m-%d %H:%M")
)
content = content.replace(
"[Brief description of session goal]", focus or f"Working on {topic}"
)
content = content.replace("[In Progress/Completed/Blocked]", status)
session_file.write_text(content)
result = f"Created session log: {session_file.relative_to(root)}"
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _create_component(root: Path, templates_dir: Path, project: str, name: str) -> list[TextContent]:
"""Create a new component documentation file."""
try:
# Validate project path
project_dir = _validate_path(root, "projects", project)
if not project_dir.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
# Sanitize name for filename
filename = re.sub(r"[^\w\s-]", "", name.lower())
filename = re.sub(r"[-\s]+", "-", filename) + ".md"
component_file = _validate_path(
root, "projects", project, "components", filename
)
if component_file.exists():
return [
TextContent(
type="text", text=f"Component file already exists: {filename}"
)
]
# Read template
template_file = templates_dir / "component.md"
if template_file.exists():
content = template_file.read_text()
content = content.replace("[Component Name]", name)
content = content.replace(
"[YYYY-MM-DD]", datetime.now().strftime("%Y-%m-%d")
)
component_file.write_text(content)
result = f"Created component doc: {component_file.relative_to(root)}"
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _create_adr(root: Path, templates_dir: Path, project: str, title: str) -> list[TextContent]:
"""Create a new ADR."""
try:
# Validate project path
project_dir = _validate_path(root, "projects", project)
if not project_dir.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
adr_dir = _validate_path(root, "projects", project, "adr")
# Find next number
existing_adrs = list(adr_dir.glob("*.md"))
numbers = []
for adr_file in existing_adrs:
match = re.match(r"^(\d+)-", adr_file.name)
if match:
numbers.append(int(match.group(1)))
next_num = max(numbers, default=0) + 1
# Sanitize title for filename
title_slug = re.sub(r"[^\w\s-]", "", title.lower())
title_slug = re.sub(r"[-\s]+", "-", title_slug)
filename = f"{next_num:03d}-{title_slug}.md"
adr_file = _validate_path(root, "projects", project, "adr", filename)
# Read template
template_file = templates_dir / "adr.md"
if template_file.exists():
content = template_file.read_text()
content = content.replace("[Number]", str(next_num))
content = content.replace("[Title]", title)
content = content.replace(
"[YYYY-MM-DD]", datetime.now().strftime("%Y-%m-%d")
)
adr_file.write_text(content)
result = f"Created ADR: {adr_file.relative_to(root)}"
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _create_pattern(root: Path, templates_dir: Path, project: str, name: str) -> list[TextContent]:
"""Create a new pattern documentation file."""
try:
# Validate project path
project_dir = _validate_path(root, "projects", project)
if not project_dir.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
# Sanitize name for filename
filename = re.sub(r"[^\w\s-]", "", name.lower())
filename = re.sub(r"[-\s]+", "-", filename) + ".md"
pattern_file = _validate_path(root, "projects", project, "patterns", filename)
if pattern_file.exists():
return [
TextContent(
type="text", text=f"Pattern file already exists: {filename}"
)
]
# Read template
template_file = templates_dir / "pattern.md"
if template_file.exists():
content = template_file.read_text()
content = content.replace("[Pattern Name]", name)
content = content.replace(
"[YYYY-MM-DD]", datetime.now().strftime("%Y-%m-%d")
)
pattern_file.write_text(content)
result = f"Created pattern doc: {pattern_file.relative_to(root)}"
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _add_learning(root: Path, project: str, learning: str) -> list[TextContent]:
"""Add a learning to project index."""
try:
# Validate path
index_file = _validate_path(root, "projects", project, "project-index.md")
if not index_file.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
content = index_file.read_text()
date_str = datetime.now().strftime("%Y-%m-%d")
new_learning = f"1. {learning} - {date_str}\n"
# Find Recent Learnings section
match = re.search(
r"(## Recent Learnings.*?\n\n)(.*?)(\n##|\Z)", content, re.DOTALL
)
if match:
before = match.group(1)
existing_learnings = match.group(2) # Keep existing learnings
after = match.group(3)
# Insert new learning at top, preserving existing ones
new_content = (
content[: match.start()]
+ before
+ new_learning
+ existing_learnings
+ after
)
index_file.write_text(new_content)
return [
TextContent(
type="text", text=f"Added learning to {project}/project-index.md"
)
]
else:
return [
TextContent(type="text", text="Could not find Recent Learnings section")
]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _quick_note(root: Path, project: str, note: str) -> list[TextContent]:
"""Add a timestamped note to project index."""
try:
# Validate path
index_file = _validate_path(root, "projects", project, "project-index.md")
if not index_file.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
content = index_file.read_text()
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
new_note = f"\n**Note ({timestamp}):** {note}\n"
# Add to Notes section if exists, otherwise append
if "## Notes" in content:
content = content.replace("## Notes\n", f"## Notes\n{new_note}")
else:
content += f"\n## Notes\n{new_note}"
index_file.write_text(content)
return [
TextContent(type="text", text=f"Added note to {project}/project-index.md")
]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _update_project_status(
root: Path, project: str, status: str
) -> list[TextContent]:
"""Update project status."""
try:
# Validate path
index_file = _validate_path(root, "projects", project, "project-index.md")
if not index_file.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
content = index_file.read_text()
content = re.sub(
r"\*\*Status:\*\*\s*\[?(\w+)\]?",
f"**Status:** {status}",
content,
)
index_file.write_text(content)
return [TextContent(type="text", text=f"Updated {project} status to {status}")]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _get_recent_sessions(
root: Path, project: str, limit: int = 3
) -> list[TextContent]:
"""Get recent session logs."""
try:
# Validate path
sessions_dir = _validate_path(root, "projects", project, "sessions")
if not sessions_dir.exists():
return [
TextContent(
type="text", text=f"No sessions found for project '{project}'"
)
]
sessions = sorted(
sessions_dir.glob("*.md"), key=lambda p: p.stat().st_mtime, reverse=True
)[:limit]
if not sessions:
return [
TextContent(
type="text", text=f"No sessions found for project '{project}'"
)
]
results = []
for session_file in sessions:
content = session_file.read_text()
# Extract summary or first 200 chars
summary_match = re.search(
r"## Summary\n\n(.*?)(\n##|\Z)", content, re.DOTALL
)
if summary_match:
summary = summary_match.group(1).strip()
else:
summary = content[:200] + "..."
results.append(f"\n{session_file.name}:\n{summary}\n")
result_text = f"Recent sessions for {project}:\n" + "\n".join(results)
return [TextContent(type="text", text=result_text)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _find_related_docs(
root: Path, project: str, doc_path: str
) -> list[TextContent]:
"""Find related documents."""
try:
# Validate path
doc_file = _validate_path(root, "projects", project, doc_path)
if not doc_file.exists():
return [TextContent(type="text", text=f"Document not found: {doc_path}")]
content = doc_file.read_text()
# Find links
links = re.findall(r"\[.*?\]\((.*?\.md)\)", content)
# Find tags
tags = re.findall(r"#(\w+)", content)
related = []
if links:
related.append(
f"Linked documents:\n" + "\n".join(f" - {link}" for link in links)
)
if tags:
related.append(f"\nTags: {', '.join(set(tags))}")
if not related:
return [TextContent(type="text", text="No related documents found")]
return [TextContent(type="text", text="\n".join(related))]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _list_tags(root: Path, project: str = None) -> list[TextContent]:
"""List all tags with counts."""
from collections import Counter
tags = Counter()
projects_dir = root / "projects"
# Determine search scope
if project:
search_dirs = [projects_dir / project]
else:
search_dirs = [d for d in projects_dir.iterdir() if d.is_dir()]
for proj_dir in search_dirs:
if not proj_dir.exists():
continue
for file_path in proj_dir.rglob("*.md"):
try:
content = file_path.read_text()
found_tags = re.findall(r"#(\w+)", content)
tags.update(found_tags)
except Exception:
pass
if not tags:
return [TextContent(type="text", text="No tags found")]
sorted_tags = sorted(tags.items(), key=lambda x: x[1], reverse=True)
result = "Tags:\n" + "\n".join(f" {tag}: {count}" for tag, count in sorted_tags)
return [TextContent(type="text", text=result)]
async def _get_timeline(root: Path, project: str, days: int = 30) -> list[TextContent]:
"""Get activity timeline."""
try:
# Validate path
project_dir = _validate_path(root, "projects", project)
if not project_dir.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
cutoff = datetime.now() - timedelta(days=days)
timeline = []
sessions_dir = _validate_path(root, "projects", project, "sessions")
if sessions_dir.exists():
for file_path in sorted(sessions_dir.glob("*.md"), reverse=True):
mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if mtime >= cutoff:
timeline.append(f"{mtime.strftime('%Y-%m-%d')}: {file_path.name}")
if not timeline:
return [
TextContent(type="text", text=f"No activity in the last {days} days")
]
result = f"Activity timeline for {project} (last {days} days):\n" + "\n".join(
timeline
)
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _get_project_stats(root: Path, project: str) -> list[TextContent]:
"""Get project statistics."""
try:
# Validate path
project_dir = _validate_path(root, "projects", project)
if not project_dir.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
stats = {
"sessions": len(list((project_dir / "sessions").glob("*.md"))),
"components": len(list((project_dir / "components").glob("*.md"))),
"adrs": len(list((project_dir / "adr").glob("*.md"))),
"patterns": len(list((project_dir / "patterns").glob("*.md"))),
}
# Calculate total words
total_words = 0
for file_path in project_dir.rglob("*.md"):
try:
content = file_path.read_text()
total_words += len(content.split())
except Exception:
pass
result = f"""Statistics for {project}:
Sessions: {stats['sessions']}
Components: {stats['components']}
ADRs: {stats['adrs']}
Patterns: {stats['patterns']}
Total Words: {total_words:,}
"""
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
# ===============================
# Export/Import Functionality
# ===============================
async def _export_project(
root: Path,
output_path: str,
project: str = None,
format: str = "json",
include_metadata: bool = True,
) -> list[TextContent]:
"""Export project data to JSON or ZIP format."""
try:
output = Path(output_path)
projects_dir = root / "projects"
# Determine which projects to export
if project:
project_dirs = [projects_dir / project]
if not project_dirs[0].exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
else:
project_dirs = [
d
for d in projects_dir.iterdir()
if d.is_dir() and not d.name.startswith(".")
]
export_data = {
"export_date": datetime.now().isoformat(),
"breadcrumb_version": "1.0",
"projects": [],
}
for proj_dir in project_dirs:
project_data = {"name": proj_dir.name, "files": {}}
# Read all markdown files in the project
for file_path in proj_dir.rglob("*.md"):
relative_path = str(file_path.relative_to(proj_dir))
content = file_path.read_text()
file_data = {"content": content}
if include_metadata:
stat = file_path.stat()
file_data["metadata"] = {
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"size": stat.st_size,
"word_count": len(content.split()),
}
project_data["files"][relative_path] = file_data
export_data["projects"].append(project_data)
# Export based on format
if format == "json":
output.write_text(json.dumps(export_data, indent=2))
result = (
f"Exported {len(export_data['projects'])} project(s) to {output_path}"
)
elif format == "zip":
with zipfile.ZipFile(output, "w", zipfile.ZIP_DEFLATED) as zf:
# Write metadata
zf.writestr(
"export_metadata.json",
json.dumps(
{
"export_date": export_data["export_date"],
"breadcrumb_version": export_data["breadcrumb_version"],
"projects": [p["name"] for p in export_data["projects"]],
},
indent=2,
),
)
# Write each project's files
for project_data in export_data["projects"]:
proj_name = project_data["name"]
for rel_path, file_data in project_data["files"].items():
zf.writestr(f"{proj_name}/{rel_path}", file_data["content"])
if include_metadata and "metadata" in file_data:
zf.writestr(
f"{proj_name}/.metadata/{rel_path}.json",
json.dumps(file_data["metadata"], indent=2),
)
result = f"Exported {len(export_data['projects'])} project(s) to {output_path} (ZIP)"
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(type="text", text=f"Export error: {str(e)}")]
async def _import_project(
root: Path,
input_path: str,
overwrite: bool = False,
) -> list[TextContent]:
"""Import project data from JSON or ZIP file."""
try:
input_file = Path(input_path)
if not input_file.exists():
return [
TextContent(type="text", text=f"Import file not found: {input_path}")
]
projects_dir = root / "projects"
imported_projects = []
# Handle JSON import
if input_file.suffix == ".json":
data = json.loads(input_file.read_text())
for project_data in data.get("projects", []):
proj_name = project_data["name"]
proj_dir = projects_dir / proj_name
if proj_dir.exists() and not overwrite:
continue
# Create project structure
proj_dir.mkdir(parents=True, exist_ok=True)
for rel_path, file_data in project_data["files"].items():
file_path = proj_dir / rel_path
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(file_data["content"])
imported_projects.append(proj_name)
# Handle ZIP import
elif input_file.suffix == ".zip":
with zipfile.ZipFile(input_file, "r") as zf:
# Read metadata to get project list
if "export_metadata.json" in zf.namelist():
metadata = json.loads(zf.read("export_metadata.json"))
project_names = metadata.get("projects", [])
else:
# Infer projects from directory structure
project_names = list(
set(
name.split("/")[0]
for name in zf.namelist()
if "/" in name and not name.startswith(".")
)
)
for proj_name in project_names:
proj_dir = projects_dir / proj_name
if proj_dir.exists() and not overwrite:
continue
proj_dir.mkdir(parents=True, exist_ok=True)
# Extract files for this project
for file_info in zf.infolist():
if file_info.filename.startswith(
f"{proj_name}/"
) and not file_info.filename.startswith(
f"{proj_name}/.metadata/"
):
rel_path = file_info.filename[len(proj_name) + 1 :]
if rel_path:
file_path = proj_dir / rel_path
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_bytes(zf.read(file_info.filename))
imported_projects.append(proj_name)
else:
return [
TextContent(
type="text", text=f"Unsupported file format: {input_file.suffix}"
)
]
if not imported_projects:
return [
TextContent(
type="text",
text="No projects imported (use overwrite=true to replace existing projects)",
)
]
result = f"Successfully imported {len(imported_projects)} project(s): {', '.join(imported_projects)}"
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(type="text", text=f"Import error: {str(e)}")]
# ===============================
# File Management
# ===============================
async def _read_file(root: Path, project: str, file_path: str) -> list[TextContent]:
"""Read a file from a project."""
try:
# Validate path to prevent traversal attacks
full_path = _validate_path(root, "projects", project, file_path)
if not full_path.exists():
return [TextContent(type="text", text=f"File not found: {file_path}")]
if not full_path.is_file():
return [TextContent(type="text", text=f"Not a file: {file_path}")]
content = full_path.read_text()
result = f"Contents of {project}/{file_path}:\n\n{content}"
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Read error: {str(e)}")]
async def _update_file(
root: Path,
project: str,
file_path: str,
content: str,
create_if_missing: bool = True,
) -> list[TextContent]:
"""Update or create a file in a project."""
try:
# Validate project directory first
project_dir = _validate_path(root, "projects", project)
if not project_dir.exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
# Validate full file path
full_path = _validate_path(root, "projects", project, file_path)
if not full_path.exists() and not create_if_missing:
return [TextContent(type="text", text=f"File not found: {file_path}")]
# Create parent directories if needed
full_path.parent.mkdir(parents=True, exist_ok=True)
# Check if file existed before writing
file_existed = full_path.exists()
# Write content
full_path.write_text(content)
action = "Updated" if file_existed else "Created"
result = f"{action} file: {project}/{file_path}"
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Update error: {str(e)}")]
async def _delete_file(root: Path, project: str, file_path: str) -> list[TextContent]:
"""Delete a file from a project."""
try:
# Validate path to prevent traversal attacks
full_path = _validate_path(root, "projects", project, file_path)
if not full_path.exists():
return [TextContent(type="text", text=f"File not found: {file_path}")]
if not full_path.is_file():
return [TextContent(type="text", text=f"Not a file: {file_path}")]
full_path.unlink()
result = f"Deleted file: {project}/{file_path}"
return [TextContent(type="text", text=result)]
except ValueError as e:
return [TextContent(type="text", text=f"Security error: {str(e)}")]
except Exception as e:
return [TextContent(type="text", text=f"Delete error: {str(e)}")]
# ===============================
# Full-text Search
# ===============================
def _calculate_relevance_score(
content: str, query_terms: list[str], file_path: Path
) -> float:
"""Calculate relevance score for a document based on query terms."""
score = 0.0
content_lower = content.lower()
# Count term frequencies
for term in query_terms:
term_lower = term.lower()
count = content_lower.count(term_lower)
if count > 0:
# Base score for presence
score += 1.0
# Bonus for multiple occurrences (with diminishing returns)
score += min(count * 0.1, 1.0)
# Bonus for term in title/filename
if term_lower in file_path.name.lower():
score += 0.5
# Bonus for term at beginning of document (first 200 chars)
if term_lower in content_lower[:200]:
score += 0.3
# Normalize by number of terms
if query_terms:
score = score / len(query_terms)
return score
async def _search_fulltext(
root: Path,
query: str,
project: str = None,
doc_type: str = None,
min_score: float = 0.0,
limit: int = 20,
context_lines: int = 2,
) -> list[TextContent]:
"""Advanced full-text search with relevance scoring."""
try:
projects_dir = root / "projects"
# Parse query for terms (simple tokenization)
query_terms = [term.strip() for term in query.split() if term.strip()]
if not query_terms:
return [TextContent(type="text", text="Empty query")]
# Determine search scope
if project:
search_dirs = [projects_dir / project]
if not search_dirs[0].exists():
return [TextContent(type="text", text=f"Project '{project}' not found")]
else:
search_dirs = [
d
for d in projects_dir.iterdir()
if d.is_dir() and not d.name.startswith(".")
]
results = []
for proj_dir in search_dirs:
# Determine subdirectories to search
if doc_type == "session":
search_paths = [proj_dir / "sessions"]
elif doc_type == "component":
search_paths = [proj_dir / "components"]
elif doc_type == "adr":
search_paths = [proj_dir / "adr"]
elif doc_type == "pattern":
search_paths = [proj_dir / "patterns"]
elif doc_type == "index":
search_paths = [proj_dir]
else:
search_paths = [
proj_dir / "sessions",
proj_dir / "components",
proj_dir / "adr",
proj_dir / "patterns",
proj_dir,
]
for search_path in search_paths:
if not search_path.exists():
continue
for file_path in search_path.rglob("*.md"):
try:
content = file_path.read_text()
# Calculate relevance score
score = _calculate_relevance_score(
content, query_terms, file_path
)
if score >= min_score:
# Find all matches and their context
lines = content.split("\n")
matches = []
for i, line in enumerate(lines):
if any(
term.lower() in line.lower() for term in query_terms
):
# Get context lines
start = max(0, i - context_lines)
end = min(len(lines), i + context_lines + 1)
context = "\n".join(lines[start:end])
matches.append((i + 1, context))
if matches:
relative_path = file_path.relative_to(projects_dir)
results.append(
{
"path": relative_path,
"score": score,
"matches": matches[
:5
], # Limit to 5 match contexts per file
}
)
except Exception:
pass
# Sort by relevance score
results.sort(key=lambda x: x["score"], reverse=True)
results = results[:limit]
if not results:
return [TextContent(type="text", text=f"No results found for '{query}'")]
# Format results
output = [f"Found {len(results)} results for '{query}':\n"]
for result in results:
output.append(f"\n{result['path']} (score: {result['score']:.2f})")
output.append(f" {len(result['matches'])} match(es) found:")
for line_num, context in result["matches"]:
output.append(f"\n Line {line_num}:")
for ctx_line in context.split("\n"):
output.append(f" {ctx_line}")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
return [TextContent(type="text", text=f"Search error: {str(e)}")]
# ===============================
# Cross-project Analysis
# ===============================
async def _search_across_projects(
root: Path,
query: str,
group_by: str = "project",
limit: int = 5,
) -> list[TextContent]:
"""Search across all projects with aggregated results."""
try:
projects_dir = root / "projects"
project_dirs = [
d
for d in projects_dir.iterdir()
if d.is_dir() and not d.name.startswith(".")
]
if not project_dirs:
return [TextContent(type="text", text="No projects found")]
# Compile search pattern
pattern = re.compile(re.escape(query), re.IGNORECASE)
# Collect results grouped by category
grouped_results = defaultdict(list)
for proj_dir in project_dirs:
for file_path in proj_dir.rglob("*.md"):
try:
content = file_path.read_text()
matches = list(pattern.finditer(content))
if matches:
relative_path = file_path.relative_to(projects_dir)
# Determine grouping key
if group_by == "project":
key = proj_dir.name
elif group_by == "doc_type":
parent = file_path.parent.name
key = (
parent
if parent
in ["sessions", "components", "adr", "patterns"]
else "other"
)
elif group_by == "tag":
tags = re.findall(r"#(\w+)", content)
key = tags[0] if tags else "untagged"
else:
key = "all"
# Get first match context
match = matches[0]
start = max(0, match.start() - 100)
end = min(len(content), match.end() + 100)
context = content[start:end].strip()
grouped_results[key].append(
{
"path": relative_path,
"matches": len(matches),
"context": context,
}
)
except Exception:
pass
if not grouped_results:
return [TextContent(type="text", text=f"No results found for '{query}'")]
# Format output
output = [f"Search results for '{query}' grouped by {group_by}:\n"]
for key, results in sorted(grouped_results.items()):
output.append(f"\n## {key} ({len(results)} results)")
for i, result in enumerate(results[:limit]):
output.append(
f"\n{i+1}. {result['path']} ({result['matches']} matches)"
)
output.append(f" ...{result['context']}...")
if len(results) > limit:
output.append(f"\n ... and {len(results) - limit} more")
# Summary statistics
total_matches = sum(len(results) for results in grouped_results.values())
output.append(
f"\n\nTotal: {total_matches} documents across {len(grouped_results)} {group_by}(s)"
)
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
return [TextContent(type="text", text=f"Search error: {str(e)}")]
async def _compare_projects(
root: Path,
projects: list[str] = None,
metrics: list[str] = None,
) -> list[TextContent]:
"""Compare statistics and metadata across multiple projects."""
try:
projects_dir = root / "projects"
# Determine which projects to compare
if projects:
project_dirs = [projects_dir / p for p in projects]
for proj_dir in project_dirs:
if not proj_dir.exists():
return [
TextContent(
type="text", text=f"Project '{proj_dir.name}' not found"
)
]
else:
project_dirs = [
d
for d in projects_dir.iterdir()
if d.is_dir() and not d.name.startswith(".")
]
if not project_dirs:
return [TextContent(type="text", text="No projects to compare")]
# Default metrics if not specified
if not metrics:
metrics = ["activity", "size", "tags", "components", "sessions"]
# Collect comparison data
comparison = []
for proj_dir in project_dirs:
data = {"name": proj_dir.name}
# Activity metric
if "activity" in metrics:
recent_files = []
cutoff = datetime.now() - timedelta(days=30)
for file_path in proj_dir.rglob("*.md"):
mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if mtime >= cutoff:
recent_files.append(file_path)
data["recent_activity"] = len(recent_files)
# Size metric
if "size" in metrics:
total_words = 0
for file_path in proj_dir.rglob("*.md"):
try:
content = file_path.read_text()
total_words += len(content.split())
except Exception:
pass
data["total_words"] = total_words
# Tags metric
if "tags" in metrics:
tags = Counter()
for file_path in proj_dir.rglob("*.md"):
try:
content = file_path.read_text()
found_tags = re.findall(r"#(\w+)", content)
tags.update(found_tags)
except Exception:
pass
data["unique_tags"] = len(tags)
data["total_tags"] = sum(tags.values())
# Components metric
if "components" in metrics:
data["components"] = len(list((proj_dir / "components").glob("*.md")))
# Sessions metric
if "sessions" in metrics:
data["sessions"] = len(list((proj_dir / "sessions").glob("*.md")))
data["adrs"] = len(list((proj_dir / "adr").glob("*.md")))
data["patterns"] = len(list((proj_dir / "patterns").glob("*.md")))
comparison.append(data)
# Format output as table
output = ["Project Comparison:\n"]
# Header
headers = ["Project"]
if "activity" in metrics:
headers.append("Recent Activity (30d)")
if "size" in metrics:
headers.append("Total Words")
if "tags" in metrics:
headers.extend(["Unique Tags", "Total Tags"])
if "components" in metrics:
headers.append("Components")
if "sessions" in metrics:
headers.extend(["Sessions", "ADRs", "Patterns"])
output.append(" | ".join(headers))
output.append("-" * (sum(len(h) for h in headers) + 3 * len(headers)))
# Data rows
for data in comparison:
row = [data["name"]]
if "activity" in metrics:
row.append(str(data.get("recent_activity", 0)))
if "size" in metrics:
row.append(f"{data.get('total_words', 0):,}")
if "tags" in metrics:
row.append(str(data.get("unique_tags", 0)))
row.append(str(data.get("total_tags", 0)))
if "components" in metrics:
row.append(str(data.get("components", 0)))
if "sessions" in metrics:
row.append(str(data.get("sessions", 0)))
row.append(str(data.get("adrs", 0)))
row.append(str(data.get("patterns", 0)))
output.append(" | ".join(row))
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
return [TextContent(type="text", text=f"Comparison error: {str(e)}")]
async def _analyze_cross_project_patterns(
root: Path,
analysis_type: str = "tags",
time_range: int = 30,
) -> list[TextContent]:
"""Analyze patterns, tags, and trends across all projects."""
try:
projects_dir = root / "projects"
project_dirs = [
d
for d in projects_dir.iterdir()
if d.is_dir() and not d.name.startswith(".")
]
if not project_dirs:
return [TextContent(type="text", text="No projects found")]
output = [f"Cross-project Analysis ({analysis_type}):\n"]
if analysis_type == "tags":
# Analyze tag usage across projects
all_tags = Counter()
project_tags = {}
for proj_dir in project_dirs:
tags = Counter()
for file_path in proj_dir.rglob("*.md"):
try:
content = file_path.read_text()
found_tags = re.findall(r"#(\w+)", content)
tags.update(found_tags)
except Exception:
pass
if tags:
project_tags[proj_dir.name] = tags
all_tags.update(tags)
output.append(f"Top 20 tags across all projects:\n")
for tag, count in all_tags.most_common(20):
project_count = sum(1 for tags in project_tags.values() if tag in tags)
output.append(
f" #{tag}: {count} uses across {project_count} project(s)"
)
output.append(f"\nTag distribution by project:")
for proj_name, tags in sorted(project_tags.items()):
output.append(
f"\n {proj_name}: {len(tags)} unique tags, {sum(tags.values())} total"
)
elif analysis_type == "activity":
# Analyze activity patterns
cutoff = datetime.now() - timedelta(days=time_range)
activity_data = []
for proj_dir in project_dirs:
activity = {"name": proj_dir.name, "files": [], "days": Counter()}
for file_path in proj_dir.rglob("*.md"):
mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if mtime >= cutoff:
activity["files"].append(file_path.name)
day = mtime.strftime("%Y-%m-%d")
activity["days"][day] += 1
if activity["files"]:
activity_data.append(activity)
# Sort by activity
activity_data.sort(key=lambda x: len(x["files"]), reverse=True)
output.append(f"Activity in last {time_range} days:\n")
for data in activity_data:
output.append(f"\n{data['name']}: {len(data['files'])} modified files")
# Show top 5 most active days
for day, count in data["days"].most_common(5):
output.append(f" {day}: {count} changes")
elif analysis_type == "components":
# Analyze component patterns
all_components = {}
for proj_dir in project_dirs:
components_dir = proj_dir / "components"
if components_dir.exists():
components = list(components_dir.glob("*.md"))
if components:
all_components[proj_dir.name] = [c.stem for c in components]
output.append(f"Components across projects:\n")
for proj_name, components in sorted(all_components.items()):
output.append(f"\n{proj_name} ({len(components)} components):")
for comp in sorted(components)[:10]:
output.append(f" - {comp}")
if len(components) > 10:
output.append(f" ... and {len(components) - 10} more")
elif analysis_type == "links":
# Analyze linking patterns
link_stats = {}
for proj_dir in project_dirs:
internal_links = 0
external_links = 0
for file_path in proj_dir.rglob("*.md"):
try:
content = file_path.read_text()
links = re.findall(r"\[.*?\]\((.*?)\)", content)
for link in links:
if link.endswith(".md"):
internal_links += 1
else:
external_links += 1
except Exception:
pass
if internal_links or external_links:
link_stats[proj_dir.name] = {
"internal": internal_links,
"external": external_links,
}
output.append(f"Link statistics:\n")
for proj_name, stats in sorted(link_stats.items()):
total = stats["internal"] + stats["external"]
output.append(f"\n{proj_name}: {total} total links")
output.append(f" Internal: {stats['internal']}")
output.append(f" External: {stats['external']}")
return [TextContent(type="text", text="\n".join(output))]
except Exception as e:
return [TextContent(type="text", text=f"Analysis error: {str(e)}")]