Skip to main content
Glama
server.py18.5 kB
"""Main MCP server implementation for org-roam integration.""" import asyncio import logging from typing import Any, Dict, List, Optional import json from mcp.server import Server, NotificationOptions from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types from .config import OrgRoamConfig from .database import OrgRoamDatabase, OrgRoamNode from .file_manager import OrgRoamFileManager # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize server server = Server("org-roam-mcp") # Global variables for database and config config: Optional[OrgRoamConfig] = None db: Optional[OrgRoamDatabase] = None file_manager: Optional[OrgRoamFileManager] = None @server.list_resources() async def handle_list_resources() -> List[types.Resource]: """List available org-roam resources.""" return [ types.Resource( uri="org-roam://nodes", name="All Nodes", description="All nodes in the org-roam database", mimeType="application/json", ), types.Resource( uri="org-roam://stats", name="Database Statistics", description="Statistics about the org-roam database", mimeType="application/json", ), ] @server.read_resource() async def handle_read_resource(uri: str) -> str: """Read org-roam resources.""" if not db: raise RuntimeError("Database not initialized") if uri == "org-roam://nodes": nodes = db.get_all_nodes(limit=config.max_search_results) nodes_data = [ { "id": node.id, "title": node.title, "file": node.file, "level": node.level, "tags": db.get_node_tags(node.id), "aliases": db.get_node_aliases(node.id), } for node in nodes ] return json.dumps(nodes_data, indent=2) elif uri == "org-roam://stats": stats = db.get_database_stats() return json.dumps(stats, indent=2) elif uri.startswith("org-roam://node/"): node_id = uri.replace("org-roam://node/", "") node = db.get_node_by_id(node_id) if not node: raise ValueError(f"Node not found: {node_id}") # Get additional node information tags = db.get_node_tags(node_id) aliases = db.get_node_aliases(node_id) backlinks = db.get_backlinks(node_id) forward_links = db.get_forward_links(node_id) # Read file content content = file_manager.read_node_content(node) node_data = { "id": node.id, "title": node.title, "file": node.file, "level": node.level, "content": content, "tags": tags, "aliases": aliases, "backlinks": [{"source": link.source, "type": link.type} for link in backlinks], "forward_links": [{"dest": link.dest, "type": link.type} for link in forward_links], } return json.dumps(node_data, indent=2) else: raise ValueError(f"Unknown resource: {uri}") @server.list_tools() async def handle_list_tools() -> List[types.Tool]: """List available tools.""" return [ types.Tool( name="search_nodes", description="Search for nodes by title, tags, or aliases", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Search query string" }, "limit": { "type": "integer", "description": "Maximum number of results (default: 50)", "default": 50 } }, "required": ["query"] }, ), types.Tool( name="get_node", description="Get detailed information about a specific node", inputSchema={ "type": "object", "properties": { "node_id": { "type": "string", "description": "The ID of the node to retrieve" } }, "required": ["node_id"] }, ), types.Tool( name="get_backlinks", description="Get all nodes that link to a specific node", inputSchema={ "type": "object", "properties": { "node_id": { "type": "string", "description": "The ID of the target node" } }, "required": ["node_id"] }, ), types.Tool( name="create_node", description="Create a new org-roam node", inputSchema={ "type": "object", "properties": { "title": { "type": "string", "description": "Title of the new node" }, "content": { "type": "string", "description": "Content of the new node", "default": "" }, "tags": { "type": "array", "items": {"type": "string"}, "description": "Tags for the new node", "default": [] } }, "required": ["title"] }, ), types.Tool( name="update_node", description="Update content of an existing node", inputSchema={ "type": "object", "properties": { "node_id": { "type": "string", "description": "ID of the node to update" }, "content": { "type": "string", "description": "New content for the node" } }, "required": ["node_id", "content"] }, ), types.Tool( name="add_link", description="Add a link from one node to another", inputSchema={ "type": "object", "properties": { "source_node_id": { "type": "string", "description": "ID of the source node" }, "target_node_id": { "type": "string", "description": "ID of the target node" } }, "required": ["source_node_id", "target_node_id"] }, ), types.Tool( name="list_files", description="List all org files in the org-roam directory", inputSchema={ "type": "object", "properties": {}, "additionalProperties": False }, ), ] @server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: """Handle tool calls.""" if not db: raise RuntimeError("Database not initialized") if name == "search_nodes": query = arguments["query"] limit = arguments.get("limit", 50) # Input validation if not query or not isinstance(query, str): return [types.TextContent( type="text", text=json.dumps({ "error": "Query must be a non-empty string" }) )] if not isinstance(limit, int) or limit < 1 or limit > 1000: return [types.TextContent( type="text", text=json.dumps({ "error": "Limit must be an integer between 1 and 1000" }) )] nodes = db.search_nodes(query.strip(), limit=limit) results = [] for node in nodes: tags = db.get_node_tags(node.id) aliases = db.get_node_aliases(node.id) results.append({ "id": node.id, "title": node.title, "file": node.file, "tags": tags, "aliases": aliases, }) return [types.TextContent( type="text", text=json.dumps({ "query": query, "results": results, "count": len(results) }, indent=2) )] elif name == "get_node": node_id = arguments["node_id"] # Input validation if not node_id or not isinstance(node_id, str): return [types.TextContent( type="text", text=json.dumps({ "error": "Node ID must be a non-empty string" }) )] node = db.get_node_by_id(node_id) if not node: return [types.TextContent( type="text", text=f"Node not found: {node_id}" )] tags = db.get_node_tags(node_id) aliases = db.get_node_aliases(node_id) backlinks = db.get_backlinks(node_id) forward_links = db.get_forward_links(node_id) # Read actual file content content = file_manager.read_node_content(node) result = { "id": node.id, "title": node.title, "file": node.file, "level": node.level, "content": content, "tags": tags, "aliases": aliases, "backlinks_count": len(backlinks), "forward_links_count": len(forward_links), } return [types.TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "get_backlinks": node_id = arguments["node_id"] backlinks = db.get_backlinks(node_id) # Get details about source nodes results = [] for link in backlinks: source_node = db.get_node_by_id(link.source) if source_node: results.append({ "source_id": link.source, "source_title": source_node.title, "source_file": source_node.file, "link_type": link.type, }) return [types.TextContent( type="text", text=json.dumps({ "target_node": node_id, "backlinks": results, "count": len(results) }, indent=2) )] elif name == "create_node": title = arguments["title"] content = arguments.get("content", "") tags = arguments.get("tags", []) # Input validation if not title or not isinstance(title, str): return [types.TextContent( type="text", text=json.dumps({ "success": False, "error": "Title must be a non-empty string" }) )] if not isinstance(content, str): return [types.TextContent( type="text", text=json.dumps({ "success": False, "error": "Content must be a string" }) )] if not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags): return [types.TextContent( type="text", text=json.dumps({ "success": False, "error": "Tags must be a list of strings" }) )] try: # Create the node using file manager node_id = file_manager.create_node(title.strip(), content, tags) # Refresh database connection to pick up the new node db.refresh_connection() return [types.TextContent( type="text", text=json.dumps({ "success": True, "node_id": node_id, "title": title, "message": f"Created new node: {title}" }, indent=2) )] except Exception as e: logger.error(f"Error creating node '{title}': {e}") return [types.TextContent( type="text", text=json.dumps({ "success": False, "error": f"Failed to create node: {str(e)}" }) )] elif name == "update_node": node_id = arguments["node_id"] new_content = arguments["content"] # Get the node first node = db.get_node_by_id(node_id) if not node: return [types.TextContent( type="text", text=json.dumps({ "success": False, "error": f"Node not found: {node_id}" }) )] try: file_manager.update_node_content(node, new_content) # Refresh database connection to pick up changes db.refresh_connection() return [types.TextContent( type="text", text=json.dumps({ "success": True, "node_id": node_id, "message": f"Updated node: {node.title}" }, indent=2) )] except Exception as e: return [types.TextContent( type="text", text=json.dumps({ "success": False, "error": str(e) }) )] elif name == "add_link": source_node_id = arguments["source_node_id"] target_node_id = arguments["target_node_id"] # Get both nodes source_node = db.get_node_by_id(source_node_id) target_node = db.get_node_by_id(target_node_id) if not source_node: return [types.TextContent( type="text", text=json.dumps({ "success": False, "error": f"Source node not found: {source_node_id}" }) )] if not target_node: return [types.TextContent( type="text", text=json.dumps({ "success": False, "error": f"Target node not found: {target_node_id}" }) )] try: file_manager.add_link_to_node( source_node.file, target_node_id, target_node.title or "Untitled" ) # Refresh database connection to pick up new links db.refresh_connection() return [types.TextContent( type="text", text=json.dumps({ "success": True, "source_node": source_node.title, "target_node": target_node.title, "message": f"Added link from '{source_node.title}' to '{target_node.title}'" }, indent=2) )] except Exception as e: return [types.TextContent( type="text", text=json.dumps({ "success": False, "error": str(e) }) )] elif name == "list_files": org_files = file_manager.list_org_files() # Get metadata for each file files_info = [] for file_path in org_files[:50]: # Limit to avoid too much data try: metadata = file_manager.get_file_metadata(file_path) files_info.append({ "file": file_path, "title": metadata.get("title", ""), "id": metadata.get("ID", ""), "tags": metadata.get("tags", []) }) except Exception as e: logger.warning(f"Error reading metadata for {file_path}: {e}") return [types.TextContent( type="text", text=json.dumps({ "files": files_info, "total_count": len(org_files), "displayed_count": len(files_info) }, indent=2) )] else: raise ValueError(f"Unknown tool: {name}") async def main(): """Main server entry point.""" global config, db, file_manager # Initialize configuration try: config = OrgRoamConfig.from_environment() config.validate() except Exception as e: logger.error(f"Configuration error: {e}") raise # Initialize database try: db = OrgRoamDatabase(config.db_path) logger.info("Database connection established") except Exception as e: logger.error(f"Database initialization error: {e}") raise # Initialize file manager try: file_manager = OrgRoamFileManager(config) logger.info("File manager initialized") except Exception as e: logger.error(f"File manager initialization error: {e}") raise # Run the server async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="org-roam-mcp", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aserranoni/org-roam-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server