Skip to main content
Glama

Roam Research MCP Server

api.py34.9 kB
"""Core API functions for interacting with Roam Research.""" import os import re import sys import logging from typing import Dict, List, Any, Optional, Union, Set, Tuple, Callable import requests from datetime import datetime import json import time from functools import wraps from roam_mcp.utils import ( format_roam_date, find_block_uid, find_page_by_title, process_nested_content, resolve_block_references ) # Set up logging logger = logging.getLogger("roam-mcp.api") # Get API credentials from environment variables API_TOKEN = os.environ.get("ROAM_API_TOKEN") GRAPH_NAME = os.environ.get("ROAM_GRAPH_NAME") MEMORIES_TAG = os.environ.get("MEMORIES_TAG", "#[[Memories]]") # Validate API credentials if not API_TOKEN: logger.warning("ROAM_API_TOKEN environment variable is not set") if not GRAPH_NAME: logger.warning("ROAM_GRAPH_NAME environment variable is not set") # Enhanced Error Hierarchy class RoamAPIError(Exception): """Base exception for all Roam API errors.""" def __init__(self, message: str, code: Optional[str] = None, details: Optional[Dict] = None, remediation: Optional[str] = None): self.message = message self.code = code or "UNKNOWN_ERROR" self.details = details or {} self.remediation = remediation super().__init__(self._format_message()) def _format_message(self) -> str: msg = f"{self.code}: {self.message}" if self.details: msg += f" - Details: {json.dumps(self.details)}" if self.remediation: msg += f" - Suggestion: {self.remediation}" return msg class AuthenticationError(RoamAPIError): """Exception raised for authentication errors.""" def __init__(self, message: str, details: Optional[Dict] = None): super().__init__( message=message, code="AUTH_ERROR", details=details, remediation="Check your API token and graph name in environment variables." ) class PageNotFoundError(RoamAPIError): """Exception raised when a page cannot be found.""" def __init__(self, title: str, details: Optional[Dict] = None): super().__init__( message=f"Page '{title}' not found", code="PAGE_NOT_FOUND", details=details, remediation="Check the page title for typos or create the page first." ) class BlockNotFoundError(RoamAPIError): """Exception raised when a block cannot be found.""" def __init__(self, uid: str, details: Optional[Dict] = None): super().__init__( message=f"Block with UID '{uid}' not found", code="BLOCK_NOT_FOUND", details=details, remediation="Check the block UID for accuracy." ) class ValidationError(RoamAPIError): """Exception raised for input validation errors.""" def __init__(self, message: str, param: str, details: Optional[Dict] = None): super().__init__( message=message, code="VALIDATION_ERROR", details={"parameter": param, **(details or {})}, remediation="Check the input parameters and correct the formatting." ) class QueryError(RoamAPIError): """Exception raised for query execution errors.""" def __init__(self, message: str, query: str, details: Optional[Dict] = None): super().__init__( message=message, code="QUERY_ERROR", details={"query": query, **(details or {})}, remediation="Check the query syntax or parameters." ) class RateLimitError(RoamAPIError): """Exception raised when rate limits are exceeded.""" def __init__(self, message: str, details: Optional[Dict] = None): super().__init__( message=message, code="RATE_LIMIT_ERROR", details=details, remediation="Retry after a delay or reduce the request frequency." ) class TransactionError(RoamAPIError): """Exception raised for transaction failures.""" def __init__(self, message: str, action_type: str, details: Optional[Dict] = None): super().__init__( message=message, code="TRANSACTION_ERROR", details={"action_type": action_type, **(details or {})}, remediation="Check the action data or retry the operation." ) class PreserveAuthSession(requests.Session): """Session class that preserves authentication headers during redirects.""" def rebuild_auth(self, prepared_request, response): """Preserve the Authorization header on redirects.""" return # Retry decorator for API calls def retry_on_error(max_retries=3, base_delay=1, backoff_factor=2, retry_on=(RateLimitError, requests.exceptions.RequestException)): """ Decorator to retry API calls with exponential backoff. Args: max_retries: Maximum number of retry attempts base_delay: Initial delay in seconds backoff_factor: Multiplier for delay on each retry retry_on: Tuple of exception types to retry on """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): retries = 0 while True: try: return func(*args, **kwargs) except retry_on as e: retries += 1 if retries > max_retries: logger.error(f"Maximum retries ({max_retries}) exceeded: {str(e)}") raise delay = base_delay * (backoff_factor ** (retries - 1)) logger.warning(f"Retrying after error: {str(e)}. Attempt {retries}/{max_retries} in {delay:.2f}s") time.sleep(delay) return wrapper return decorator def validate_credentials(): """ Validate that required API credentials are set. Raises: AuthenticationError: If required credentials are missing """ if not API_TOKEN or not GRAPH_NAME: missing = [] if not API_TOKEN: missing.append("ROAM_API_TOKEN") if not GRAPH_NAME: missing.append("ROAM_GRAPH_NAME") raise AuthenticationError( f"Missing required credentials: {', '.join(missing)}", {"missing": missing} ) def get_session_and_headers() -> Tuple[requests.Session, Dict[str, str]]: """ Create a session with authentication headers. Returns: Tuple of (session, headers) Raises: AuthenticationError: If required environment variables are missing """ validate_credentials() session = PreserveAuthSession() headers = { "Accept": "application/json", "Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json", } return session, headers @retry_on_error() def execute_query(query: str, inputs: Optional[List[Any]] = None) -> Any: """ Execute a Datalog query against the Roam graph. Args: query: Datalog query string inputs: Optional list of query inputs Returns: Query results Raises: QueryError: If the query fails AuthenticationError: If authentication fails RateLimitError: If rate limits are exceeded """ validate_credentials() session, headers = get_session_and_headers() # Prepare query data data = { "query": query, } if inputs: data["inputs"] = inputs # Log query (without inputs for security) logger.debug(f"Executing query: {query}") # Execute query try: response = session.post( f'https://api.roamresearch.com/api/graph/{GRAPH_NAME}/q', headers=headers, json=data ) if response.status_code == 401: raise AuthenticationError("Authentication failed", {"status_code": response.status_code}) if response.status_code == 429: raise RateLimitError("Rate limit exceeded", {"status_code": response.status_code}) response.raise_for_status() result = response.json().get('result') # Log result size if isinstance(result, list): logger.debug(f"Query returned {len(result)} results") return result except requests.RequestException as e: error_msg = f"Query failed: {str(e)}" error_details = {} if hasattr(e, 'response') and e.response: error_details["status_code"] = e.response.status_code try: error_details["response"] = e.response.json() except: error_details["response_text"] = e.response.text[:500] # Classify error based on status code if available if hasattr(e, 'response') and e.response: if e.response.status_code == 401: raise AuthenticationError("Authentication failed", error_details) from e elif e.response.status_code == 429: raise RateLimitError("Rate limit exceeded", error_details) from e logger.error(error_msg, extra={"details": error_details}) raise QueryError(error_msg, query, error_details) from e @retry_on_error() def execute_write_action(action_data: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]: """ Execute a write action or a batch of actions on the Roam graph. Args: action_data: The action data to write or a list of actions for batch operation Returns: Response data Raises: TransactionError: If the write action fails AuthenticationError: If authentication fails RateLimitError: If rate limits are exceeded """ validate_credentials() session, headers = get_session_and_headers() # Check if it's a batch operation or single action is_batch = isinstance(action_data, list) # If it's a batch operation, wrap it in a batch container if is_batch: # Log batch size logger.debug(f"Executing batch write action with {len(action_data)} operations") # Group operations by type for debugging action_types = {} for action in action_data: action_type = action.get("action", "unknown") if action_type in action_types: action_types[action_type] += 1 else: action_types[action_type] = 1 logger.debug(f"Batch operation types: {action_types}") # Prepare batch action batch_data = { "action": "batch-actions", "actions": action_data } action_type = "batch-actions" operation_data = batch_data else: # Log action type action_type = action_data.get("action", "unknown") logger.debug(f"Executing write action: {action_type}") operation_data = action_data # Debug log the operation data logger.debug(f"Sending data: {json.dumps(operation_data)[:100]}...") # Execute action try: response = session.post( f'https://api.roamresearch.com/api/graph/{GRAPH_NAME}/write', headers=headers, json=operation_data # Use json parameter for proper JSON encoding ) logger.debug(f"Status code: {response.status_code}") logger.debug(f"Response headers: {dict(response.headers)}") if response.status_code == 401: raise AuthenticationError("Authentication failed", {"status_code": response.status_code}) if response.status_code == 429: raise RateLimitError("Rate limit exceeded", {"status_code": response.status_code}) # Special handling for empty responses if response.status_code == 200 and not response.text: logger.debug("Received empty response with status 200 (success)") return {"success": True} response.raise_for_status() # Try to parse JSON response try: result = response.json() logger.debug(f"Response: {json.dumps(result)[:500]}") # Success even with error message for batch operations that partly succeed if "batch-error-message" in result and "num-actions-successfully-transacted-before-failure" in result: num_success = result.get("num-actions-successfully-transacted-before-failure", 0) logger.debug(f"Batch partially succeeded with {num_success} actions before failure") return result return result except json.JSONDecodeError: # Some successful operations return empty responses if 200 <= response.status_code < 300: logger.debug("Success with non-JSON response") return {"success": True} else: logger.debug(f"Failed to parse response as JSON: {response.text[:500]}") raise TransactionError( f"Failed to parse response as JSON", action_type, {"response_text": response.text[:500]} ) except requests.RequestException as e: error_details = {} if hasattr(e, 'response') and e.response: error_details["status_code"] = e.response.status_code try: error_details["response"] = e.response.json() except: error_details["response_text"] = e.response.text[:500] # Classify error based on status code if available if hasattr(e, 'response') and e.response: if e.response.status_code == 401: raise AuthenticationError("Authentication failed", error_details) from e elif e.response.status_code == 429: raise RateLimitError("Rate limit exceeded", error_details) from e error_msg = f"Write action failed: {str(e)}" logger.error(error_msg, extra={"details": error_details}) raise TransactionError(error_msg, action_type, error_details) from e def execute_batch_actions(actions: List[Dict[str, Any]], chunk_size: int = 50) -> Dict[str, Any]: """ Execute a batch of actions, optionally chunking into multiple requests. Args: actions: List of actions to execute chunk_size: Maximum number of actions per request Returns: Combined results of all batch operations Raises: TransactionError: If any batch fails """ if not actions: return {"success": True, "created_uids": []} # Single batch if under chunk size if len(actions) <= chunk_size: result = execute_write_action(actions) # Check for tempids-to-uids mapping in response if "tempids-to-uids" in result: return {"success": True, "created_uids": list(result["tempids-to-uids"].values())} elif "successful" in result and result["successful"]: return {"success": True, "created_uids": []} else: return result # Split into chunks for larger batches chunks = [actions[i:i + chunk_size] for i in range(0, len(actions), chunk_size)] logger.debug(f"Splitting batch operation into {len(chunks)} chunks of max {chunk_size} actions") # Track results across chunks combined_results = { "created_uids": [], "success": True } # Track temporary and real UIDs for parent-child relationships temp_uid_map = {} # Execute each chunk for i, chunk in enumerate(chunks): logger.debug(f"Executing batch chunk {i+1}/{len(chunks)} with {len(chunk)} actions") # Update parent UIDs with real UIDs from previous chunks if i > 0 and temp_uid_map: for action in chunk: if action["action"] == "create-block": parent_uid = action["location"]["parent-uid"] if parent_uid.startswith("temp_") and parent_uid in temp_uid_map: action["location"]["parent-uid"] = temp_uid_map[parent_uid] result = execute_write_action(chunk) # Collect UIDs from this chunk created_uids = [] if "tempids-to-uids" in result: created_uids = list(result["tempids-to-uids"].values()) if created_uids: # Map temp UIDs to real UIDs for next chunks if i < len(chunks) - 1: for j, uid in enumerate(created_uids): temp_key = f"temp_{i}_{j}" temp_uid_map[temp_key] = uid combined_results["created_uids"].extend(created_uids) # Add delay between batches to ensure ordering if i < len(chunks) - 1: time.sleep(0.5) return combined_results def find_or_create_page(title: str) -> str: """ Find a page by title or create it if it doesn't exist. Args: title: Page title Returns: Page UID Raises: TransactionError: If page creation fails ValidationError: If title is invalid AuthenticationError: If authentication fails """ validate_credentials() session, headers = get_session_and_headers() # Validate title if not title or not isinstance(title, str): raise ValidationError("Page title must be a non-empty string", "title") title = title.strip() if not title: raise ValidationError("Page title cannot be empty or just whitespace", "title") # Try to find the page first logger.debug(f"Looking for page: {title}") query = f'''[:find ?uid . :where [?e :node/title "{title}"] [?e :block/uid ?uid]]''' page_uid = execute_query(query) if page_uid: logger.debug(f"Found existing page: {title} (UID: {page_uid})") return page_uid # Create the page if it doesn't exist logger.debug(f"Creating new page: {title}") action_data = { "action": "create-page", "page": {"title": title} } try: response = execute_write_action(action_data) if response.get("success", False): # Wait a moment for the page to be created time.sleep(1) # Try to find the page again page_uid = execute_query(query) if page_uid: logger.debug(f"Created page: {title} (UID: {page_uid})") return page_uid # If still not found, try one more time with a longer delay time.sleep(2) page_uid = execute_query(query) if page_uid: logger.debug(f"Found newly created page: {title} (UID: {page_uid})") return page_uid # If we get here, something went wrong error_msg = f"Failed to create page: {title}" logger.error(error_msg) raise TransactionError(error_msg, "create-page", {"title": title, "response": response}) except TransactionError: # Rethrow existing TransactionError raise except Exception as e: error_msg = f"Failed to create page: {title}" logger.error(error_msg) raise TransactionError(error_msg, "create-page", {"title": title, "error": str(e)}) from e def get_daily_page() -> str: """ Get or create today's daily page. Returns: Daily page UID Raises: TransactionError: If page creation fails """ today = datetime.now() date_str = format_roam_date(today) logger.debug(f"Getting daily page for: {date_str}") return find_or_create_page(date_str) def add_block_to_page(page_uid: str, content: str, order: Union[int, str] = "last") -> Optional[str]: """ Add a block to a page. Args: page_uid: Parent page UID content: Block content order: Position ("first", "last", or integer index) Returns: New block UID or None if creation failed Raises: BlockNotFoundError: If page does not exist ValidationError: If parameters are invalid TransactionError: If block creation fails """ # Validate parameters if not page_uid: raise ValidationError("Parent page UID is required", "page_uid") if not content: raise ValidationError("Block content cannot be empty", "content") # Generate a unique block UID import uuid block_uid = str(uuid.uuid4())[:9] action_data = { "action": "create-block", "location": { "parent-uid": page_uid, "order": order }, "block": { "string": content, "uid": block_uid } } logger.debug(f"Adding block to page {page_uid}") try: result = execute_write_action(action_data) if result.get("success", False): # Add a brief delay to ensure the block is created time.sleep(1) # Verify the block exists session, headers = get_session_and_headers() found_uid = find_block_uid(session, headers, GRAPH_NAME, content) if found_uid: logger.debug(f"Created block with UID: {found_uid}") return found_uid # If we couldn't find the UID by content, return the one we generated logger.debug(f"Block created but couldn't verify, returning generated UID: {block_uid}") return block_uid else: logger.error(f"Failed to create block: {result.get('error', 'Unknown error')}") return None except Exception as e: if isinstance(e, (BlockNotFoundError, ValidationError, TransactionError)): raise error_msg = f"Failed to create block: {str(e)}" logger.error(error_msg) raise TransactionError(error_msg, "create-block", {"page_uid": page_uid}) from e def update_block(block_uid: str, content: str) -> bool: """ Update a block's content. Args: block_uid: Block UID content: New content Returns: Success flag Raises: BlockNotFoundError: If block does not exist ValidationError: If parameters are invalid TransactionError: If block update fails """ # Validate parameters if not block_uid: raise ValidationError("Block UID is required", "block_uid") if content is None: raise ValidationError("Block content cannot be None", "content") action_data = { "action": "update-block", "block": { "uid": block_uid, "string": content } } logger.debug(f"Updating block: {block_uid}") try: execute_write_action(action_data) return True except Exception as e: if isinstance(e, (BlockNotFoundError, ValidationError, TransactionError)): raise error_msg = f"Failed to update block: {str(e)}" logger.error(error_msg) raise TransactionError(error_msg, "update-block", {"block_uid": block_uid}) from e def transform_block(block_uid: str, find_pattern: str, replace_with: str, global_replace: bool = True) -> str: """ Transform a block's content using regex pattern replacement. Args: block_uid: Block UID find_pattern: Regex pattern to find replace_with: Text to replace with global_replace: Whether to replace all occurrences Returns: Updated content Raises: BlockNotFoundError: If block does not exist ValidationError: If parameters are invalid QueryError: If block retrieval fails TransactionError: If block update fails """ # Validate parameters if not block_uid: raise ValidationError("Block UID is required", "block_uid") if not find_pattern: raise ValidationError("Find pattern cannot be empty", "find_pattern") # First get the current content query = f'''[:find ?string . :where [?b :block/uid "{block_uid}"] [?b :block/string ?string]]''' logger.debug(f"Getting content for block: {block_uid}") try: current_content = execute_query(query) if not current_content: raise BlockNotFoundError(block_uid) # Apply transformation logger.debug(f"Transforming block {block_uid} with pattern: {find_pattern}") flags = re.MULTILINE count = 0 if global_replace else 1 try: new_content = re.sub(find_pattern, replace_with, current_content, count=count, flags=flags) except re.error as e: raise ValidationError(f"Invalid regex pattern: {str(e)}", "find_pattern", {"pattern": find_pattern}) # Update the block update_block(block_uid, new_content) return new_content except (BlockNotFoundError, ValidationError, QueryError, TransactionError): # Rethrow existing errors raise except Exception as e: error_msg = f"Failed to transform block: {str(e)}" logger.error(error_msg) raise TransactionError(error_msg, "transform-block", {"block_uid": block_uid}) from e def batch_update_blocks(updates: List[Dict[str, Any]], chunk_size: int = 50) -> List[Dict[str, Any]]: """ Update multiple blocks in a single operation. Args: updates: List of update operations chunk_size: Maximum number of actions per batch Returns: List of results Raises: ValidationError: If updates are not valid """ if not isinstance(updates, list): raise ValidationError("Updates must be a list", "updates") if not updates: return [] session, headers = get_session_and_headers() results = [] batch_actions = [] logger.debug(f"Batch updating {len(updates)} blocks") # Validate each update and prepare batch actions for i, update in enumerate(updates): try: block_uid = update.get("block_uid") if not block_uid: results.append({"success": False, "error": "Missing block_uid"}) continue # Check block exists query = f'''[:find ?string . :where [?b :block/uid "{block_uid}"] [?b :block/string ?string]]''' current_content = execute_query(query) if not current_content: results.append({ "success": False, "block_uid": block_uid, "error": f"Block with UID {block_uid} not found" }) continue # Handle direct content update if "content" in update: batch_actions.append({ "action": "update-block", "block": { "uid": block_uid, "string": update["content"] } }) results.append({ "success": True, "block_uid": block_uid, "content": update["content"] }) # Handle pattern transformation elif "transform" in update: transform = update["transform"] try: find_pattern = transform["find"] replace_with = transform["replace"] global_replace = transform.get("global", True) # Apply transformation flags = re.MULTILINE count = 0 if global_replace else 1 new_content = re.sub(find_pattern, replace_with, current_content, count=count, flags=flags) batch_actions.append({ "action": "update-block", "block": { "uid": block_uid, "string": new_content } }) results.append({ "success": True, "block_uid": block_uid, "content": new_content }) except re.error as e: results.append({ "success": False, "block_uid": block_uid, "error": f"Invalid regex pattern: {str(e)}" }) except KeyError as e: results.append({ "success": False, "block_uid": block_uid, "error": f"Missing required transform key: {str(e)}" }) else: results.append({ "success": False, "block_uid": block_uid, "error": "Neither content nor transform provided" }) except Exception as e: logger.error(f"Error preparing update for block {update.get('block_uid', 'unknown')}: {str(e)}") results.append({ "success": False, "block_uid": update.get("block_uid", "unknown"), "error": str(e) }) # Execute batch updates if we have any valid actions if batch_actions: try: execute_batch_actions(batch_actions, chunk_size) except Exception as e: logger.error(f"Error executing batch update: {str(e)}") # Mark all previously successful results as failed for result in results: if result.get("success"): result["success"] = False result["error"] = f"Batch update failed: {str(e)}" # Log success rate successful = sum(1 for r in results if r.get("success")) logger.debug(f"Batch update completed: {successful}/{len(updates)} successful") return results def get_page_content(title: str, resolve_refs: bool = True, max_depth: int = 5) -> str: """ Get the content of a page with optional block reference resolution. Args: title: Page title resolve_refs: Whether to resolve block references max_depth: Maximum depth of nested blocks to retrieve (default: 5) Returns: Page content as markdown Raises: PageNotFoundError: If page retrieval fails QueryError: If query execution fails """ session, headers = get_session_and_headers() # First find the page UID logger.debug(f"Getting content for page: {title}") page_uid = find_page_by_title(session, headers, GRAPH_NAME, title) if not page_uid: raise PageNotFoundError(title) # Build block hierarchy iteratively block_map = {} top_level_blocks = [] # Query to get immediate children of a parent (page or block) def get_children(parent_uid: str, depth: int = 0) -> None: if depth >= max_depth: return query = f"""[:find ?uid ?string ?order :where [?parent :block/uid "{parent_uid}"] [?parent :block/children ?child] [?child :block/uid ?uid] [?child :block/string ?string] [?child :block/order ?order]]""" try: results = execute_query(query) if not results: return for uid, content, order in results: # Resolve references if requested if resolve_refs: content = resolve_block_references(session, headers, GRAPH_NAME, content) # Create block object block = { "uid": uid, "content": content, "order": order, "children": [] } block_map[uid] = block # Add to top-level or parent's children if parent_uid == page_uid: top_level_blocks.append(block) elif parent_uid in block_map: block_map[parent_uid]["children"].append(block) # Recursively fetch children get_children(uid, depth + 1) except QueryError as e: logger.warning(f"Failed to fetch children for {parent_uid}: {str(e)}") raise try: # Start with the page's top-level blocks get_children(page_uid) if not top_level_blocks: logger.debug(f"No content found on page: {title}") return f"# {title}\n\nNo content found on this page." # Sort blocks by order def sort_blocks(blocks): blocks.sort(key=lambda b: b["order"]) for block in blocks: sort_blocks(block["children"]) sort_blocks(top_level_blocks) # Convert to markdown markdown = f"# {title}\n\n" def blocks_to_md(blocks, level=0): result = "" for block in blocks: indent = " " * level result += f"{indent}- {block['content']}\n" if block["children"]: result += blocks_to_md(block["children"], level + 1) return result markdown += blocks_to_md(top_level_blocks) logger.debug(f"Retrieved page content for: {title}") return markdown except QueryError: # Rethrow existing QueryError raise except Exception as e: error_msg = f"Failed to get page content: {str(e)}" logger.error(error_msg) raise QueryError(error_msg, "Iterative child fetch", {"page_title": title, "page_uid": page_uid}) from e

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PhiloSolares/roam-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server