server.py•14.3 kB
"""MCP server for the knowledge base."""
import os
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from .search import KnowledgeBaseSearch
from .storage import (
KnowledgeBaseStorage,
DuplicateNoteError,
NoteNotFoundError,
StorageError,
)
# Load environment variables
load_dotenv()
# Configuration
KNOWLEDGE_BASE_PATH = os.getenv("KNOWLEDGE_BASE_PATH", "~/knowledge-base")
CATEGORIES = os.getenv("CATEGORIES", "people,recipes,meetings,procedures,tasks").split(",")
SERVER_NAME = os.getenv("SERVER_NAME", "Knowledge Base")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
# Initialize storage and search
storage = KnowledgeBaseStorage(KNOWLEDGE_BASE_PATH, CATEGORIES)
search_engine = KnowledgeBaseSearch(storage)
# Create MCP server
app = Server(SERVER_NAME)
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List all available tools."""
return [
Tool(
name="add_note",
description="Create a new note in the knowledge base",
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"description": f"Category folder (options: {', '.join(CATEGORIES)})",
"enum": CATEGORIES,
},
"title": {
"type": "string",
"description": "Note title (becomes filename)",
},
"content": {
"type": "string",
"description": "Markdown content of the note",
},
"tags": {
"type": "string",
"description": "Comma-separated tags (optional)",
"default": "",
},
},
"required": ["category", "title", "content"],
},
),
Tool(
name="search_notes",
description="Search through all notes by query, category, or tags",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search term (searches title, content, tags) - case insensitive",
"default": "",
},
"category": {
"type": "string",
"description": f"Optional category filter (options: {', '.join(CATEGORIES)})",
},
"tags": {
"type": "string",
"description": "Optional comma-separated tags to filter by (matches any)",
},
},
},
),
Tool(
name="get_note",
description="Retrieve the full content of a specific note",
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"description": f"Category folder (options: {', '.join(CATEGORIES)})",
"enum": CATEGORIES,
},
"title": {
"type": "string",
"description": "Note title (can use full filename or friendly title)",
},
},
"required": ["category", "title"],
},
),
Tool(
name="update_note",
description="Update an existing note's content or tags",
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"description": f"Category folder (options: {', '.join(CATEGORIES)})",
"enum": CATEGORIES,
},
"title": {
"type": "string",
"description": "Note title",
},
"content": {
"type": "string",
"description": "New content (optional)",
},
"tags": {
"type": "string",
"description": "New comma-separated tags (optional)",
},
"append": {
"type": "boolean",
"description": "If true, append content instead of replacing (default: false)",
"default": False,
},
},
"required": ["category", "title"],
},
),
Tool(
name="list_notes",
description="List all notes, optionally filtered by category or tag",
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"description": f"Optional category filter (options: {', '.join(CATEGORIES)})",
},
"tag": {
"type": "string",
"description": "Optional tag filter",
},
},
},
),
Tool(
name="delete_note",
description="Delete a note from the knowledge base",
inputSchema={
"type": "object",
"properties": {
"category": {
"type": "string",
"description": f"Category folder (options: {', '.join(CATEGORIES)})",
"enum": CATEGORIES,
},
"title": {
"type": "string",
"description": "Note title",
},
},
"required": ["category", "title"],
},
),
Tool(
name="list_categories",
description="List all available categories with note counts",
inputSchema={
"type": "object",
"properties": {},
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls."""
try:
if name == "add_note":
return await handle_add_note(arguments)
elif name == "search_notes":
return await handle_search_notes(arguments)
elif name == "get_note":
return await handle_get_note(arguments)
elif name == "update_note":
return await handle_update_note(arguments)
elif name == "list_notes":
return await handle_list_notes(arguments)
elif name == "delete_note":
return await handle_delete_note(arguments)
elif name == "list_categories":
return await handle_list_categories(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except (DuplicateNoteError, NoteNotFoundError, StorageError) as e:
return [TextContent(type="text", text=str(e))]
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
return [TextContent(type="text", text=error_msg)]
async def handle_add_note(arguments: dict) -> list[TextContent]:
"""Handle add_note tool call."""
try:
category = arguments["category"]
title = arguments["title"]
content = arguments["content"]
tags_str = arguments.get("tags", "")
# Parse tags
tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()]
# Create note
note = storage.create_note(
category=category,
title=title,
content=content,
tags=tags
)
filename = storage.sanitize_filename(title)
result = f"✓ Note '{title}' created in {category}/\n"
result += f" File: {filename}.md\n"
if tags:
result += f" Tags: {', '.join(tags)}"
return [TextContent(type="text", text=result)]
except (DuplicateNoteError, NoteNotFoundError, StorageError) as e:
return [TextContent(type="text", text=str(e))]
except Exception as e:
return [TextContent(type="text", text=f"❌ Error: {str(e)}")]
async def handle_search_notes(arguments: dict) -> list[TextContent]:
"""Handle search_notes tool call."""
query = arguments.get("query", "")
category = arguments.get("category")
tags_str = arguments.get("tags")
# Parse tags
tags = None
if tags_str:
tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()]
# Search
result = search_engine.search_formatted(
query=query,
category=category,
tags=tags
)
return [TextContent(type="text", text=result)]
async def handle_get_note(arguments: dict) -> list[TextContent]:
"""Handle get_note tool call."""
try:
category = arguments["category"]
title = arguments["title"]
# Get note
note = storage.get_note(category, title)
# Format output with frontmatter
output = f"# {note.title}\n\n"
output += f"**Category:** {note.category}\n"
output += f"**Tags:** {', '.join(note.frontmatter.tags) if note.frontmatter.tags else 'none'}\n"
output += f"**Date:** {note.frontmatter.date}\n"
if note.frontmatter.updated:
output += f"**Updated:** {note.frontmatter.updated}\n"
# Add metadata if any
if note.frontmatter.metadata:
output += "\n**Additional Info:**\n"
for key, value in note.frontmatter.metadata.items():
output += f"- {key}: {value}\n"
output += f"\n---\n\n{note.content}"
return [TextContent(type="text", text=output)]
except (DuplicateNoteError, NoteNotFoundError, StorageError) as e:
return [TextContent(type="text", text=str(e))]
except Exception as e:
return [TextContent(type="text", text=f"❌ Error: {str(e)}")]
async def handle_update_note(arguments: dict) -> list[TextContent]:
"""Handle update_note tool call."""
try:
category = arguments["category"]
title = arguments["title"]
content = arguments.get("content")
tags_str = arguments.get("tags")
append = arguments.get("append", False)
# Parse tags
tags = None
if tags_str:
tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()]
# Update note
note = storage.update_note(
category=category,
title=title,
content=content,
tags=tags,
append=append
)
result = f"✓ Note '{title}' updated successfully\n"
result += f" Category: {category}\n"
if tags:
result += f" Tags: {', '.join(note.frontmatter.tags)}\n"
result += f" Last updated: {note.frontmatter.updated}"
return [TextContent(type="text", text=result)]
except (DuplicateNoteError, NoteNotFoundError, StorageError) as e:
return [TextContent(type="text", text=str(e))]
except Exception as e:
return [TextContent(type="text", text=f"❌ Error: {str(e)}")]
async def handle_list_notes(arguments: dict) -> list[TextContent]:
"""Handle list_notes tool call."""
category = arguments.get("category")
tag = arguments.get("tag")
# Get notes
notes = storage.list_notes(category=category, tag=tag)
if not notes:
return [TextContent(type="text", text="No notes found.")]
# Group by category
by_category = {}
for note in notes:
if note.category not in by_category:
by_category[note.category] = []
by_category[note.category].append(note)
# Format output
output_lines = []
if category:
output_lines.append(f"Notes in {category}/ ({len(notes)} total):\n")
else:
output_lines.append(f"All notes ({len(notes)} total):\n")
for cat in sorted(by_category.keys()):
cat_notes = by_category[cat]
output_lines.append(f"\n{cat}/ ({len(cat_notes)} notes):")
for note in sorted(cat_notes, key=lambda n: n.title):
tags_str = ', '.join(note.frontmatter.tags) if note.frontmatter.tags else 'no tags'
output_lines.append(f" - {note.title} [{tags_str}]")
return [TextContent(type="text", text="\n".join(output_lines))]
async def handle_delete_note(arguments: dict) -> list[TextContent]:
"""Handle delete_note tool call."""
try:
category = arguments["category"]
title = arguments["title"]
# Delete note
result = storage.delete_note(category, title)
return [TextContent(type="text", text=result)]
except (DuplicateNoteError, NoteNotFoundError, StorageError) as e:
return [TextContent(type="text", text=str(e))]
except Exception as e:
return [TextContent(type="text", text=f"❌ Error: {str(e)}")]
async def handle_list_categories(arguments: dict) -> list[TextContent]:
"""Handle list_categories tool call."""
stats = storage.get_category_stats()
output_lines = ["Available categories:\n"]
for category, count in sorted(stats.items()):
plural = "note" if count == 1 else "notes"
output_lines.append(f" - {category}/ ({count} {plural})")
return [TextContent(type="text", text="\n".join(output_lines))]
def main():
"""Run the MCP server."""
import asyncio
async def run():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
asyncio.run(run())
if __name__ == "__main__":
main()