#!/usr/bin/env python3
import asyncio
import json
import sys
from pathlib import Path
from obsidian_tools import ObsidianVault
try:
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
except ImportError:
print("Error: MCP SDK not installed. Run: pip install mcp", file=sys.stderr)
sys.exit(1)
VAULT_PATH = Path(__file__).parent.parent
vault = ObsidianVault(VAULT_PATH)
app = Server("obsidian-vault")
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="vault_search",
description="Search vault notes by title, content, or tags. Returns matching notes with paths and match reasons.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query string"},
"search_in": {
"type": "array",
"items": {
"type": "string",
"enum": ["title", "content", "tags"],
},
"description": "Where to search (default: all)",
"default": ["title", "content", "tags"],
},
"limit": {
"type": "integer",
"description": "Max results (default: 20)",
"default": 20,
},
},
"required": ["query"],
},
),
Tool(
name="vault_read",
description="Read the full content of a note by name. Returns frontmatter, content, tags, and links. For very long notes, use max_length or use the standard Read tool for untruncated content.",
inputSchema={
"type": "object",
"properties": {
"note": {"type": "string", "description": "Note name or title"},
"include_content": {
"type": "boolean",
"description": "Include full markdown content (default: true)",
"default": True,
},
"max_length": {
"type": "integer",
"description": "Max content length in chars (default: 8000). Set higher for full content.",
"default": 8000,
},
},
"required": ["note"],
},
),
Tool(
name="vault_broken_links",
description="Find broken wikilinks (links to non-existent notes) sorted by reference count. Each result includes inferred topic tags aggregated from referrer notes. Use filter_tag to scope to a domain (e.g. 'comp-neuro', 'math').",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Max results (default: 20)",
"default": 20,
},
"filter_tag": {
"type": "string",
"description": "Only show broken links whose referrers include this tag (substring match, e.g. 'comp-neuro')",
},
},
},
),
Tool(
name="vault_backlinks",
description="Get all notes that link TO a specific note (incoming links). Shows which notes reference this one.",
inputSchema={
"type": "object",
"properties": {
"note": {"type": "string", "description": "Note name or title"}
},
"required": ["note"],
},
),
Tool(
name="vault_forward_links",
description="Get all notes that a specific note links TO (outgoing links). Shows what this note references.",
inputSchema={
"type": "object",
"properties": {
"note": {"type": "string", "description": "Note name or title"}
},
"required": ["note"],
},
),
Tool(
name="vault_related",
description="Find notes related to a given note based on shared links and connections. Useful for discovering relevant context.",
inputSchema={
"type": "object",
"properties": {
"note": {"type": "string", "description": "Note name or title"},
"limit": {
"type": "integer",
"description": "Max results (default: 10)",
"default": 10,
},
},
"required": ["note"],
},
),
Tool(
name="vault_hubs",
description="Find hub notes with high connectivity (many incoming and outgoing links). Identifies central concepts and MOCs.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Max results (default: 20)",
"default": 20,
},
"min_links": {
"type": "integer",
"description": "Minimum total links (default: 5)",
"default": 5,
},
},
},
),
Tool(
name="vault_orphans",
description="Find orphaned notes with no incoming or outgoing links. These may need integration into the knowledge graph.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Max results (default: 50)",
"default": 50,
}
},
},
),
Tool(
name="vault_stats",
description="Get comprehensive vault statistics including note count, link count, broken links, tags, and orphans.",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="vault_recent",
description="Get recently modified notes sorted by modification time. Useful for tracking recent work.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Max results (default: 20)",
"default": 20,
},
"days": {
"type": "integer",
"description": "Only show notes modified within N days (optional)",
},
},
},
),
Tool(
name="vault_tags",
description="Analyze tag usage across the vault. Returns tags sorted by frequency with example notes.",
inputSchema={
"type": "object",
"properties": {
"min_count": {
"type": "integer",
"description": "Minimum tag count to show (default: 2)",
"default": 2,
}
},
},
),
Tool(
name="vault_context",
description="Get semantically relevant context notes using Smart Connections embeddings + link structure. Combines embedding similarity with graph proximity.",
inputSchema={
"type": "object",
"properties": {
"note": {
"type": "string",
"description": "Note name to find context for",
},
"depth": {
"type": "integer",
"description": "Link depth for proximity bonus (default: 2)",
"default": 2,
},
"limit": {
"type": "integer",
"description": "Max results (default: 10)",
"default": 10,
},
},
"required": ["note"],
},
),
Tool(
name="vault_suggest_links",
description="Find note pairs that should be linked based on high semantic similarity but are currently unlinked. Discovers missing connections.",
inputSchema={
"type": "object",
"properties": {
"threshold": {
"type": "number",
"description": "Similarity threshold 0-1 (default: 0.85)",
"default": 0.85,
},
"limit": {
"type": "integer",
"description": "Max suggestions (default: 20)",
"default": 20,
},
},
},
),
Tool(
name="vault_knowledge_velocity",
description="Track knowledge growth patterns via git history. Shows which notes are growing fastest by link count or edit frequency.",
inputSchema={
"type": "object",
"properties": {
"days": {
"type": "integer",
"description": "Period to analyze in days (default: 90)",
"default": 90,
},
"metric": {
"type": "string",
"enum": ["links", "edits"],
"description": "Growth metric (default: links)",
"default": "links",
},
},
},
),
Tool(
name="vault_maturity",
description="Score notes by development maturity based on word count, link integration, and recency. Identifies stubs, orphans, stale notes.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Max results (default: 30)",
"default": 30,
},
"sort": {
"type": "string",
"enum": ["least_mature", "most_mature"],
"description": "Sort order (default: least_mature)",
"default": "least_mature",
},
},
},
),
Tool(
name="vault_link_weights",
description="Analyze link strength based on co-edit frequency in git history. Links between notes frequently edited together are stronger.",
inputSchema={
"type": "object",
"properties": {
"min_weight": {
"type": "integer",
"description": "Minimum co-edit count (default: 2)",
"default": 2,
}
},
},
),
Tool(
name="vault_prereq_tree",
description="Build a learning dependency tree from a root note. Uses backlinks and creation order to infer prerequisites.",
inputSchema={
"type": "object",
"properties": {
"note": {
"type": "string",
"description": "Root note to build tree from",
},
"depth": {
"type": "integer",
"description": "Tree depth (default: 3)",
"default": 3,
},
},
"required": ["note"],
},
),
Tool(
name="vault_batch_tag",
description="Preview bulk tag operations on notes. Can target notes similar to a reference note. Always runs as dry-run through MCP.",
inputSchema={
"type": "object",
"properties": {
"similar_to": {
"type": "string",
"description": "Find notes similar to this note (optional)",
},
"add_tag": {
"type": "string",
"description": "Tag to add (optional)",
},
"remove_tag": {
"type": "string",
"description": "Tag to remove (optional)",
},
"threshold": {
"type": "number",
"description": "Similarity threshold 0-1 (default: 0.85)",
"default": 0.85,
},
},
},
),
Tool(
name="vault_rag",
description="Semantic search over vault content. Takes a free-text query (e.g. 'what do I know about active inference'), ranks notes by TF-IDF relevance with embedding reranking, and returns content excerpts. The primary tool for knowledge retrieval.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Free-text search query",
},
"limit": {
"type": "integer",
"description": "Max results (default: 10)",
"default": 10,
},
"excerpt_len": {
"type": "integer",
"description": "Max excerpt length per result (default: 500)",
"default": 500,
},
"expand_links": {
"type": "boolean",
"description": "Also return graph-linked notes of top hits (default: true)",
"default": True,
},
},
"required": ["query"],
},
),
Tool(
name="vault_clusters",
description="Detect topic clusters in the knowledge graph via label propagation. Returns clusters with central notes, top tags, size, and cohesion score. Reveals emergent structure.",
inputSchema={
"type": "object",
"properties": {
"min_cluster_size": {
"type": "integer",
"description": "Minimum notes per cluster (default: 3)",
"default": 3,
}
},
},
),
Tool(
name="vault_diff",
description="Structural diff of the knowledge graph over a time period. Shows new notes created, deleted notes, modified notes, and link changes. Uses git history.",
inputSchema={
"type": "object",
"properties": {
"since_days": {
"type": "integer",
"description": "Look back N days (default: 7)",
"default": 7,
}
},
},
),
Tool(
name="vault_open_questions",
description="Extract unresolved questions (lines ending with ?), TODOs, FIXMEs, and unchecked checkboxes from notes. Surfaces the frontier of thinking and incomplete work.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Max results (default: 50)",
"default": 50,
},
"type_filter": {
"type": "string",
"enum": ["question", "todo", "checkbox", "all"],
"description": "Filter by type (default: all)",
"default": "all",
},
},
},
),
Tool(
name="vault_concept_gaps",
description="Find structural holes in the knowledge graph: high-demand broken links (concepts that need notes), stub hubs (notes with many backlinks but no content), and bridge gaps (unlinked notes with many shared neighbors).",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Max results (default: 30)",
"default": 30,
}
},
},
),
Tool(
name="vault_journal_write",
description="Append an observation to the agent journal. Use this to record discoveries, decisions, patterns noticed, or context for future sessions. Persists across conversations.",
inputSchema={
"type": "object",
"properties": {
"entry": {
"type": "string",
"description": "The observation or note to record",
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Optional tags for categorization (e.g. ['discovery', 'pattern', 'decision'])",
},
},
"required": ["entry"],
},
),
Tool(
name="vault_journal_read",
description="Read past journal entries. Use at the start of a session to recall prior observations, or to find specific past decisions and discoveries.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Max entries (default: 20)",
"default": 20,
},
"tag": {
"type": "string",
"description": "Filter by tag (optional)",
},
"since_days": {
"type": "integer",
"description": "Only entries from last N days (optional)",
},
},
},
),
Tool(
name="vault_topic_timeline",
description="Track how a knowledge domain grows over time. Filter by tag or folder, see edit counts and unique notes bucketed by week/month/day. Use this to answer 'when was I most active on X?' or 'how has my writing on X evolved?'",
inputSchema={
"type": "object",
"properties": {
"tag": {
"type": "string",
"description": "Filter to notes with this tag (substring match)",
},
"folder": {
"type": "string",
"description": "Filter to notes in this folder (e.g. 'Notes/Comp Neuro')",
},
"period": {
"type": "string",
"enum": ["day", "week", "month"],
"description": "Time bucket size (default: week)",
"default": "week",
},
"days": {
"type": "integer",
"description": "How far back to look in days (default: 180)",
"default": 180,
},
},
},
),
Tool(
name="vault_note_history",
description="Get the full edit history of a specific note from git. Shows commit dates, messages, and tracks word count + link count at each snapshot. Use to understand how a note evolved.",
inputSchema={
"type": "object",
"properties": {
"note": {"type": "string", "description": "Note name or title"},
"limit": {
"type": "integer",
"description": "Max commits to show (default: 30)",
"default": 30,
},
},
"required": ["note"],
},
),
Tool(
name="vault_attention_flow",
description="What topics are you focused on? Aggregates recent edits by tag or folder, compares to the previous equivalent period to detect rising/falling/stable trends. Use to understand attention patterns.",
inputSchema={
"type": "object",
"properties": {
"days": {
"type": "integer",
"description": "Period to analyze in days (default: 14)",
"default": 14,
},
"group_by": {
"type": "string",
"enum": ["tag", "folder"],
"description": "Group edits by tag or folder (default: tag)",
"default": "tag",
},
"limit": {
"type": "integer",
"description": "Max topics to return (default: 20)",
"default": 20,
},
},
},
),
]
def _format_tree(tree, indent=0):
"""Recursively format a prereq tree for display."""
prefix = " " * indent + ("└─ " if indent > 0 else "")
result = f"{prefix}{tree['note']}\n"
for child in tree.get("children", []):
result += _format_tree(child, indent + 1)
return result
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls."""
try:
if name == "vault_search":
query = arguments["query"]
search_in = arguments.get("search_in", ["title", "content", "tags"])
limit = arguments.get("limit", 20)
results = vault.search(query, search_in=search_in, limit=limit)
if not results:
return [
TextContent(type="text", text=f"No results found for '{query}'")
]
summary = f"Found {len(results)} matches for '{query}':\n\n"
for r in results:
reasons = ", ".join(r["match_reason"])
summary += f"- {r['title']} ({r['path']}) — matched in {reasons}\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_read":
note_name = arguments["note"]
include_content = arguments.get("include_content", True)
max_length = arguments.get("max_length", 8000)
graph = vault._build_graph()
matching = [
n for n in graph["nodes"].keys() if n.lower() == note_name.lower()
]
if not matching:
return [TextContent(type="text", text=f"Note '{note_name}' not found")]
note = matching[0]
data = graph["nodes"][note]
path = str(data["path"].relative_to(vault.vault_path))
backlinks = graph["backlinks"].get(note, [])
forward = graph["forward_links"].get(note, [])
summary = f"**{note}** ({path})\n"
summary += f"Tags: {', '.join(data['tags']) if data['tags'] else 'none'}\n"
summary += f"Links out: {len(forward)} | Links in: {len(backlinks)}\n"
if include_content:
content = data["content"].strip()
if len(content) > max_length:
content = content[:max_length] + "\n\n... (truncated)"
summary += f"\n---\n{content}"
return [TextContent(type="text", text=summary)]
elif name == "vault_broken_links":
limit = arguments.get("limit", 20)
filter_tag = arguments.get("filter_tag")
results = vault.get_broken_links(limit=limit, filter_tag=filter_tag)
if not results:
tag_str = f" matching tag '{filter_tag}'" if filter_tag else ""
return [
TextContent(type="text", text=f"No broken links found{tag_str}")
]
tag_str = f" (filtered: {filter_tag})" if filter_tag else ""
summary = f"Top {len(results)} broken links{tag_str}:\n\n"
for r in results:
tags = ", ".join(f"{t}({c})" for t, c in r["inferred_tags"][:5])
summary += f"- **{r['link']}** — {r['count']} refs\n"
summary += f" Topics: {tags}\n"
summary += f" From: {', '.join(r['referrers'][:5])}"
if len(r["referrers"]) > 5:
summary += f" (+{len(r['referrers']) - 5} more)"
summary += "\n\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_backlinks":
note = arguments["note"]
results = vault.get_backlinks(note)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
count = results["backlink_count"]
if count == 0:
return [
TextContent(
type="text", text=f"'{results['note']}' has no backlinks"
)
]
summary = f"**{results['note']}** has {count} backlink(s):\n\n"
for bl in results["backlinks"]:
summary += f"- {bl}\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_forward_links":
note = arguments["note"]
results = vault.get_forward_links(note)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
count = results["forward_link_count"]
if count == 0:
return [
TextContent(
type="text", text=f"'{results['note']}' has no outgoing links"
)
]
summary = f"**{results['note']}** links to {count} note(s):\n\n"
for fl in results["forward_links"]:
summary += f"- {fl}\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_related":
note = arguments["note"]
limit = arguments.get("limit", 10)
results = vault.get_related(note, limit=limit)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
if results["related_count"] == 0:
return [
TextContent(
type="text",
text=f"No related notes found for '{results['note']}'",
)
]
summary = f"**{results['note']}** — {results['related_count']} related note(s):\n\n"
for r in results["related"]:
summary += f"- {r['note']} (score: {r['score']})\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_hubs":
limit = arguments.get("limit", 20)
min_links = arguments.get("min_links", 5)
results = vault.get_hubs(limit=limit, min_links=min_links)
if not results:
return [
TextContent(
type="text",
text=f"No hubs found with at least {min_links} links",
)
]
summary = f"Top {len(results)} hub notes (min {min_links} links):\n\n"
for h in results:
summary += f"- **{h['note']}** — {h['total']} total ({h['backlinks']} in, {h['forward_links']} out)\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_orphans":
limit = arguments.get("limit", 50)
results = vault.get_orphans(limit=limit)
if not results:
return [TextContent(type="text", text="No orphaned notes found")]
summary = f"Found {len(results)} orphaned note(s):\n\n"
for o in results:
summary += f"- {o['title']} ({o['path']})\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_stats":
results = vault.get_stats()
summary = f"""Vault Statistics
Total notes: {results['total_notes']}
Total links: {results['total_links']}
Broken links: {results['unique_broken_links']} unique ({results['total_broken_link_references']} references)
Tags: {results['total_tags']} unique
Notes without backlinks: {results['notes_without_backlinks']}
Orphaned notes: {results['orphaned_notes']}
Top tags:
"""
for tag, count in results["most_common_tags"][:10]:
summary += f" - {tag}: {count}\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_recent":
limit = arguments.get("limit", 20)
days = arguments.get("days")
results = vault.get_recent(limit=limit, days=days)
if not results:
days_str = f" in last {days} days" if days else ""
return [
TextContent(type="text", text=f"No recent notes found{days_str}")
]
days_str = f" (last {days} days)" if days else ""
summary = f"Recently modified notes{days_str}:\n\n"
for r in results:
summary += f"- {r['title']} — {r['days_ago']}d ago ({r['path']})\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_tags":
min_count = arguments.get("min_count", 2)
results = vault.get_tags(min_count=min_count)
if not results:
return [
TextContent(
type="text",
text=f"No tags found with at least {min_count} uses",
)
]
summary = f"Tags (min {min_count} uses):\n\n"
for t in results[:30]:
example_notes = ", ".join(t["notes"][:3])
if len(t["notes"]) > 3:
example_notes += f" (+{len(t['notes']) - 3} more)"
summary += f"- **{t['tag']}** ({t['count']}) — {example_notes}\n"
if len(results) > 30:
summary += f"\n...and {len(results) - 30} more tags"
return [TextContent(type="text", text=summary)]
elif name == "vault_context":
note = arguments["note"]
depth = arguments.get("depth", 2)
limit = arguments.get("limit", 10)
results = vault.get_context(note, depth=depth, limit=limit)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
if results["context_count"] == 0:
return [
TextContent(
type="text", text=f"No context found for '{results['query']}'"
)
]
summary = f"Context for **{results['query']}** ({results['context_count']} notes):\n\n"
for c in results["context"]:
summary += f"- {c['note']} — sim: {c['similarity']}, link bonus: {c['link_bonus']}, total: {c['total_score']}\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_suggest_links":
threshold = arguments.get("threshold", 0.85)
limit = arguments.get("limit", 20)
results = vault.suggest_missing_links(threshold=threshold, limit=limit)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
suggestions = results["suggestions"]
if not suggestions:
return [
TextContent(
type="text",
text=f"No missing links found above threshold {threshold}",
)
]
summary = f"Suggested links ({len(suggestions)} pairs above {threshold} similarity):\n\n"
for s in suggestions:
reason = (
"semantic + structural"
if s["reason"] == "semantic_and_structural"
else "semantic similarity"
)
summary += f"- **{s['note_a']}** <-> **{s['note_b']}** — {s['similarity']} ({reason})\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_knowledge_velocity":
days = arguments.get("days", 90)
metric = arguments.get("metric", "links")
results = vault.analyze_knowledge_velocity(days=days, metric=metric)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
notes = results.get("notes", [])
if not notes:
return [
TextContent(
type="text", text=f"No growth data found for last {days} days"
)
]
summary = f"Knowledge velocity ({results['metric']}, last {results['period_days']}d):\n\n"
if metric == "links":
for n in notes[:30]:
summary += f"- **{n['note']}** — {n['initial_links']} -> {n['final_links']} links (+{n['growth']})\n"
else:
for n in notes[:30]:
summary += f"- **{n['note']}** — {n['edit_count']} edits\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_maturity":
limit = arguments.get("limit", 30)
sort_order = arguments.get("sort", "least_mature")
results = vault.score_note_maturity()
if sort_order == "most_mature":
results.reverse()
results = results[:limit]
if not results:
return [TextContent(type="text", text="No notes to score")]
label = "Least mature" if sort_order == "least_mature" else "Most mature"
summary = f"{label} notes (top {limit}):\n\n"
for n in results:
issues = ", ".join(n["issues"]) if n["issues"] else "none"
summary += (
f"- **{n['note']}** — score: {n['maturity_score']}, "
f"{n['word_count']}w, {n['backlinks']}in/{n['forward_links']}out, "
f"{n['age_days']}d old, issues: {issues}\n"
)
return [TextContent(type="text", text=summary)]
elif name == "vault_link_weights":
min_weight = arguments.get("min_weight", 2)
results = vault.analyze_link_weights(min_weight=min_weight)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
links = results.get("links", [])
if not links:
return [
TextContent(
type="text", text=f"No links found with weight >= {min_weight}"
)
]
summary = f"Weighted links (min {min_weight} co-edits):\n\n"
for link in links[:40]:
summary += f"- {link['from']} -> {link['to']} — weight: {link['weight']} ({link['type']})\n"
if len(links) > 40:
summary += f"\n...and {len(links) - 40} more"
return [TextContent(type="text", text=summary)]
elif name == "vault_prereq_tree":
note = arguments["note"]
depth = arguments.get("depth", 3)
results = vault.build_prereq_tree(note, depth=depth)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
tree = results["tree"]
summary = f"Prerequisite tree for **{results['root']}** (depth {results['depth']}):\n\n"
summary += _format_tree(tree)
return [TextContent(type="text", text=summary)]
elif name == "vault_batch_tag":
similar_to = arguments.get("similar_to")
add_tag = arguments.get("add_tag")
remove_tag = arguments.get("remove_tag")
threshold = arguments.get("threshold", 0.85)
results = vault.batch_tag(
similar_to=similar_to,
add_tag=add_tag,
remove_tag=remove_tag,
threshold=threshold,
dry_run=True,
)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
ops = results.get("operations", [])
if not ops:
return [TextContent(type="text", text="No notes matched the criteria")]
summary = f"Dry-run: {len(ops)} note(s) would be modified:\n\n"
for op in ops[:20]:
sim_str = f" (sim: {op['similarity']})" if op.get("similarity") else ""
summary += f"- **{op['note']}**{sim_str}\n"
summary += f" tags: {op['old_tags']} -> {op['new_tags']}\n"
if len(ops) > 20:
summary += f"\n...and {len(ops) - 20} more"
summary += "\n\nThis is a preview only. Use the CLI `./v batch-tag --apply` to execute."
return [TextContent(type="text", text=summary)]
elif name == "vault_rag":
query = arguments["query"]
limit = arguments.get("limit", 10)
excerpt_len = arguments.get("excerpt_len", 500)
expand_links = arguments.get("expand_links", True)
results = vault.rag_search(
query,
limit=limit,
excerpt_len=excerpt_len,
expand_links=expand_links,
)
if not results["results"]:
return [
TextContent(type="text", text=f"No results found for '{query}'")
]
summary = f"Found {results['result_count']} results for '{query}':\n\n"
for i, r in enumerate(results["results"], 1):
source = (
f" [via graph expansion]"
if r.get("source") == "graph_expansion"
else ""
)
score = r.get("combined_score", r.get("tfidf_score", 0))
summary += f"### {i}. {r['note']} (score: {score}){source}\n"
summary += f"Path: {r['path']}\n"
if r.get("tags"):
summary += f"Tags: {', '.join(r['tags'])}\n"
if r.get("excerpt"):
summary += f"\n{r['excerpt']}\n"
summary += "\n---\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_clusters":
min_size = arguments.get("min_cluster_size", 3)
results = vault.detect_clusters(min_cluster_size=min_size)
if not results["clusters"]:
return [TextContent(type="text", text="No clusters found")]
summary = (
f"Found {results['cluster_count']} clusters (min size {min_size}):\n\n"
)
for i, c in enumerate(results["clusters"], 1):
tags = (
", ".join(f"{t[0]}({t[1]})" for t in c["top_tags"][:3])
if c["top_tags"]
else "none"
)
summary += (
f"### Cluster {i} — {c['size']} notes, cohesion: {c['cohesion']}\n"
)
summary += f"Central: {', '.join(c['central_notes'])}\n"
summary += f"Top tags: {tags}\n"
summary += f"Links: {c['internal_links']} internal, {c['external_links']} external\n"
if len(c["members"]) <= 15:
summary += f"Members: {', '.join(c['members'])}\n"
else:
summary += f"Members: {', '.join(c['members'][:15])} (+{len(c['members']) - 15} more)\n"
summary += "\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_diff":
since_days = arguments.get("since_days", 7)
results = vault.graph_diff(since_days=since_days)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
summary = f"Graph diff (last {results['period_days']} days):\n\n"
summary += f"New notes: {results['new_note_count']} | "
summary += f"Deleted: {results['deleted_count']} | "
summary += f"Modified: {results['modified_count']}\n"
summary += f"Links added: {results['summary']['total_links_added']} | "
summary += f"Links removed: {results['summary']['total_links_removed']}\n\n"
if results["new_notes"]:
summary += "**New notes:**\n"
for n in results["new_notes"][:20]:
tags = f" [{', '.join(n['tags'])}]" if n.get("tags") else ""
summary += f"- {n['note']} — {n['word_count']}w, {n['links_out']} links{tags}\n"
summary += "\n"
if results["deleted_notes"]:
summary += "**Deleted notes:**\n"
for n in results["deleted_notes"][:10]:
summary += f"- {n}\n"
summary += "\n"
if results["link_changes"]:
summary += "**Link changes:**\n"
for lc in results["link_changes"][:15]:
if lc["links_added"]:
summary += (
f"- {lc['note']}: +{', '.join(lc['links_added'][:5])}\n"
)
if lc["links_removed"]:
summary += (
f"- {lc['note']}: -{', '.join(lc['links_removed'][:5])}\n"
)
return [TextContent(type="text", text=summary)]
elif name == "vault_open_questions":
limit = arguments.get("limit", 50)
type_filter = arguments.get("type_filter", "all")
results = vault.extract_open_questions(limit=limit)
if type_filter != "all":
results = [q for q in results if q["type"] == type_filter]
if not results:
return [TextContent(type="text", text="No open questions/TODOs found")]
summary = f"Found {len(results)} open items:\n\n"
for q in results:
icon = {"question": "?", "todo": "!", "checkbox": "[ ]"}.get(
q["type"], "•"
)
summary += f"- [{icon}] **{q['note']}** ({q['days_ago']}d ago)\n"
summary += f" {q['text']}\n\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_concept_gaps":
limit = arguments.get("limit", 30)
results = vault.concept_gaps(limit=limit)
if not results:
return [TextContent(type="text", text="No concept gaps found")]
summary = f"Found {len(results)} concept gaps:\n\n"
for g in results:
icon = {
"missing_note": "NEW",
"stub_hub": "EXPAND",
"bridge_gap": "LINK",
}.get(g["type"], "?")
summary += f"- [{icon}] {g['suggestion']}\n"
if g["type"] == "missing_note":
refs = ", ".join(g["referenced_by"][:5])
summary += f" Referenced by: {refs}\n"
elif g["type"] == "bridge_gap":
summary += f" Jaccard similarity: {g['jaccard']}\n"
summary += "\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_journal_write":
entry = arguments["entry"]
tags = arguments.get("tags", [])
record = vault.journal_append(entry=entry, source="agent", tags=tags)
return [
TextContent(
type="text",
text=f"Recorded at {record['timestamp']}: {entry[:100]}{'...' if len(entry) > 100 else ''}",
)
]
elif name == "vault_journal_read":
limit = arguments.get("limit", 20)
tag = arguments.get("tag")
since_days = arguments.get("since_days")
entries = vault.journal_read(
limit=limit,
tag=tag,
since_days=since_days,
)
if not entries:
return [TextContent(type="text", text="No journal entries found")]
summary = f"Journal ({len(entries)} entries):\n\n"
for e in entries:
tags_str = f" [{', '.join(e['tags'])}]" if e.get("tags") else ""
summary += f"- **{e['timestamp'][:16]}** ({e['source']}){tags_str}\n"
summary += f" {e['entry']}\n\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_topic_timeline":
tag = arguments.get("tag")
folder = arguments.get("folder")
period = arguments.get("period", "week")
days = arguments.get("days", 180)
results = vault.topic_timeline(
tag=tag, folder=folder, period=period, days=days
)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
timeline = results["timeline"]
filter_str = (
f"tag={results['filter']['tag']}"
if results["filter"]["tag"]
else (
f"folder={results['filter']['folder']}"
if results["filter"]["folder"]
else "all notes"
)
)
summary = f"Topic timeline ({filter_str}, {results['period_type']}ly, last {results['days']}d):\n"
summary += f"Total: {results['total_edits']} edits across {results['total_unique_notes']} notes\n\n"
for t in timeline:
bar = "#" * min(t["edits"], 40)
summary += f"{t['period']} {bar} {t['edits']} edits, {t['unique_notes']} notes\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_note_history":
note = arguments["note"]
limit = arguments.get("limit", 30)
results = vault.note_history(note, limit=limit)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
history = results["history"]
summary = f"History of **{results['note']}** ({results['path']})\n"
summary += f"First seen: {results['first_seen']} | Last modified: {results['last_modified']}\n"
summary += f"Total commits: {results['total_commits']}\n\n"
for h in history:
wc = f" | {h['word_count']}w" if "word_count" in h else ""
lc = f" | {h.get('link_count', '?')} links" if "word_count" in h else ""
delta = ""
if "word_delta" in h:
wd = h["word_delta"]
ld = h["link_delta"]
delta = f" | delta: {'+' if wd >= 0 else ''}{wd}w, {'+' if ld >= 0 else ''}{ld} links"
summary += f"- {h['date']} {h['message']}{wc}{lc}{delta}\n"
return [TextContent(type="text", text=summary)]
elif name == "vault_attention_flow":
days = arguments.get("days", 14)
group_by = arguments.get("group_by", "tag")
limit = arguments.get("limit", 20)
results = vault.attention_flow(days=days, group_by=group_by, limit=limit)
if "error" in results:
return [TextContent(type="text", text=f"Error: {results['error']}")]
topics = results["topics"]
summary = f"Attention flow (last {results['period_days']}d, by {results['group_by']}):\n"
summary += f"Total edits: {results['total_edits']}\n\n"
trend_icon = {"rising": "^", "falling": "v", "stable": "=", "new": "+"}
for t in topics:
icon = trend_icon.get(t["trend"], "?")
bar = "#" * min(t["edits"], 30)
prev = (
f" (prev: {t['prev_period_edits']})"
if t["prev_period_edits"] > 0
else ""
)
summary += f"[{icon}] {t['topic']} {bar} {t['edits']} edits, {t['unique_notes']} notes{prev}\n"
return [TextContent(type="text", text=summary)]
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
return [TextContent(type="text", text=f"Error executing {name}: {str(e)}")]
async def main():
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())