#!/usr/bin/env python3
"""
markdown-frontmatter-mcp
A Model Context Protocol (MCP) server that queries Markdown files by front matter metadata.
Designed for Obsidian vaults and other Markdown-based knowledge bases.
Environment Variables:
KB_PATH: Path to the knowledge base root directory (required)
Usage:
KB_PATH=/path/to/vault python -m markdown_frontmatter_mcp
"""
import os
import re
import json
import sys
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional
import yaml
# MCP Protocol Implementation
# Using stdio transport (JSON-RPC over stdin/stdout)
def parse_frontmatter(content: str) -> dict:
"""Extract YAML front matter from markdown content."""
if not content.startswith('---'):
return {}
end_match = re.search(r'\n---\s*\n', content[3:])
if not end_match:
return {}
yaml_content = content[3:end_match.start() + 3]
try:
return yaml.safe_load(yaml_content) or {}
except yaml.YAMLError:
return {}
def extract_title(content: str, filename: str) -> str:
"""Extract title from first H1 heading or filename."""
# Look for first # heading
match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
if match:
return match.group(1).strip()
# Fall back to filename without extension
return Path(filename).stem.replace('-', ' ').replace('_', ' ').title()
def extract_excerpt(content: str, max_length: int = 200) -> str:
"""Extract first paragraph after front matter as excerpt."""
# Remove front matter
if content.startswith('---'):
end_match = re.search(r'\n---\s*\n', content[3:])
if end_match:
content = content[end_match.end() + 3:]
# Remove headings and get first substantial text
lines = content.strip().split('\n')
text_lines = []
for line in lines:
line = line.strip()
# Skip empty lines, headings, and metadata
if not line or line.startswith('#') or line.startswith('---'):
if text_lines: # Stop at first break after finding text
break
continue
text_lines.append(line)
excerpt = ' '.join(text_lines)
if len(excerpt) > max_length:
excerpt = excerpt[:max_length].rsplit(' ', 1)[0] + '...'
return excerpt
def normalize_date(date_val) -> Optional[str]:
"""Normalize date value to YYYY-MM-DD string."""
if not date_val:
return None
if isinstance(date_val, datetime):
return date_val.strftime('%Y-%m-%d')
if hasattr(date_val, 'isoformat'):
return str(date_val)[:10]
# Already a string
date_str = str(date_val)[:10]
# Validate format
try:
datetime.strptime(date_str, '%Y-%m-%d')
return date_str
except ValueError:
return None
def query_by_frontmatter(
kb_path: str,
tags: Optional[list] = None,
days: int = 7,
folders: Optional[list] = None,
limit: int = 10
) -> list:
"""
Query markdown files by front matter metadata.
Args:
kb_path: Path to knowledge base root
tags: List of tags to filter by (matches ANY)
days: How far back to look (created OR updated within this window)
folders: Subdirectories to search (default: all)
limit: Maximum results to return
Returns:
List of matching files with metadata
"""
kb_path = Path(kb_path).expanduser()
if not kb_path.exists():
return {"error": f"Knowledge base path not found: {kb_path}"}
cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
results = []
# Determine which folders to search
if folders:
search_paths = [kb_path / folder for folder in folders if (kb_path / folder).exists()]
else:
search_paths = [kb_path]
# Skip these directories
skip_dirs = {'.obsidian', '.git', '.smart-env', '.versiondb', 'node_modules', '.claude'}
for search_path in search_paths:
for root, dirs, files in os.walk(search_path):
# Skip hidden and system directories
dirs[:] = [d for d in dirs if d not in skip_dirs and not d.startswith('.')]
for file in files:
if not file.endswith('.md'):
continue
file_path = Path(root) / file
rel_path = file_path.relative_to(kb_path)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception:
continue
frontmatter = parse_frontmatter(content)
if not frontmatter:
continue
# Get dates
created = normalize_date(frontmatter.get('created') or frontmatter.get('date'))
updated = normalize_date(frontmatter.get('updated'))
# Determine most recent date
dates = [d for d in [created, updated] if d]
if not dates:
continue
most_recent = max(dates)
# Filter by date
if most_recent < cutoff_date:
continue
# Filter by tags
file_tags = frontmatter.get('tags') or []
if isinstance(file_tags, str):
file_tags = [file_tags]
if not isinstance(file_tags, list):
file_tags = []
# Normalize tags (lowercase, strip #)
file_tags = [str(t).lower().strip().lstrip('#') for t in file_tags if t]
if tags:
# Check if ANY of the requested tags match
search_tags = [t.lower().strip() for t in tags]
if not any(t in file_tags for t in search_tags):
continue
# Build result
results.append({
'path': str(rel_path),
'title': extract_title(content, file),
'tags': file_tags,
'created': created,
'updated': updated,
'last_touched': most_recent,
'excerpt': extract_excerpt(content)
})
# Sort by most recent first
results.sort(key=lambda x: x['last_touched'], reverse=True)
# Apply limit
return results[:limit]
# ============================================================================
# MCP Server Implementation
# ============================================================================
def send_response(response_id, result):
"""Send a JSON-RPC response."""
response = {
"jsonrpc": "2.0",
"id": response_id,
"result": result
}
sys.stdout.write(json.dumps(response) + '\n')
sys.stdout.flush()
def send_error(response_id, code, message):
"""Send a JSON-RPC error response."""
response = {
"jsonrpc": "2.0",
"id": response_id,
"error": {
"code": code,
"message": message
}
}
sys.stdout.write(json.dumps(response) + '\n')
sys.stdout.flush()
def handle_initialize(request_id, params):
"""Handle the initialize request."""
send_response(request_id, {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "markdown-frontmatter-mcp",
"version": "0.1.0"
}
})
def handle_tools_list(request_id):
"""Handle tools/list request."""
send_response(request_id, {
"tools": [
{
"name": "query_recent_notes",
"description": "Query markdown files by front matter metadata (tags, dates). Returns recently created or updated notes matching the criteria. Use this to catch up on recent thinking about specific topics.",
"inputSchema": {
"type": "object",
"properties": {
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by tags (matches ANY). Examples: ['ai-systems'], ['personal-growth', 'therapy']"
},
"days": {
"type": "integer",
"default": 7,
"description": "How far back to look (notes created OR updated within this window)"
},
"folders": {
"type": "array",
"items": {"type": "string"},
"description": "Subdirectories to search. Default: all. Examples: ['thoughts', 'business']"
},
"limit": {
"type": "integer",
"default": 10,
"description": "Maximum number of results to return"
}
}
}
}
]
})
def handle_tools_call(request_id, params):
"""Handle tools/call request."""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name != "query_recent_notes":
send_error(request_id, -32602, f"Unknown tool: {tool_name}")
return
kb_path = os.environ.get("KB_PATH")
if not kb_path:
send_error(request_id, -32602, "KB_PATH environment variable not set")
return
try:
# Expand ~ in path
kb_path = os.path.expanduser(kb_path)
results = query_by_frontmatter(
kb_path=kb_path,
tags=arguments.get("tags"),
days=arguments.get("days", 7),
folders=arguments.get("folders"),
limit=arguments.get("limit", 10)
)
# Handle error dict
if isinstance(results, dict) and "error" in results:
text_content = f"Error: {results['error']}"
elif results is None or len(results) == 0:
text_content = "No matching notes found."
else:
lines = [f"Found {len(results)} recent note(s):\n"]
for r in results:
lines.append(f"### {r['title']}")
lines.append(f"**Path:** `{r['path']}`")
tags_str = ', '.join(r['tags']) if r.get('tags') else 'none'
lines.append(f"**Tags:** {tags_str}")
created = r.get('created') or 'unknown'
updated = r.get('updated') or 'unknown'
lines.append(f"**Created:** {created} | **Updated:** {updated}")
excerpt = r.get('excerpt', '')
if excerpt:
lines.append(f"\n{excerpt}\n")
lines.append("---\n")
text_content = '\n'.join(lines)
send_response(request_id, {
"content": [
{
"type": "text",
"text": text_content
}
]
})
except Exception as e:
import traceback
error_detail = f"{str(e)}\n{traceback.format_exc()}"
send_error(request_id, -32603, f"Internal error: {error_detail}")
def main():
"""Main entry point - runs the MCP server."""
# Check for KB_PATH
kb_path = os.environ.get("KB_PATH")
if not kb_path:
sys.stderr.write("Warning: KB_PATH environment variable not set\n")
# Read JSON-RPC messages from stdin
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
except json.JSONDecodeError as e:
sys.stderr.write(f"JSON parse error: {e}\n")
continue
request_id = request.get("id")
method = request.get("method")
params = request.get("params", {})
if method == "initialize":
handle_initialize(request_id, params)
elif method == "notifications/initialized":
# Client acknowledgment, no response needed
pass
elif method == "tools/list":
handle_tools_list(request_id)
elif method == "tools/call":
handle_tools_call(request_id, params)
else:
if request_id:
send_error(request_id, -32601, f"Method not found: {method}")
if __name__ == "__main__":
main()