Skip to main content
Glama

AnyDocs MCP Server

by funky1688
notion.pyโ€ข24.3 kB
#!/usr/bin/env python3 """ Notion Document Adapter Adapter for integrating with Notion workspaces and databases. """ import asyncio import json from typing import Any, Dict, List, Optional from datetime import datetime import httpx from mcp.types import Resource from .base import BaseDocumentAdapter, DocumentContent, SearchResult from ..utils import get_logger logger = get_logger(__name__) class NotionAdapter(BaseDocumentAdapter): """Adapter for Notion workspaces. Supports integration with Notion pages and databases through the official Notion API. Required configuration: - api_token: Notion integration token - database_id: Notion database ID (optional, for database-based docs) - page_id: Root page ID (optional, for page-based docs) """ def __init__(self, config: Dict[str, Any]): super().__init__(config) self.client: Optional[httpx.AsyncClient] = None self.pages_cache: List[Dict[str, Any]] = [] self.databases_cache: List[Dict[str, Any]] = [] async def initialize(self) -> None: """Initialize the Notion adapter.""" logger.info("Initializing Notion adapter") # Validate required configuration self._validate_config(["api_token"]) # Setup HTTP client with Notion API headers self.client = httpx.AsyncClient( base_url="https://api.notion.com/v1", headers={ "Authorization": f"Bearer {self.config['api_token']}", "Notion-Version": "2022-06-28", "Content-Type": "application/json", "User-Agent": "AnyDocs-MCP/0.1.0" }, timeout=30.0 ) # Test connection await self._test_connection() # Build cache of accessible content await self._build_content_cache() self._initialized = True logger.info(f"Notion adapter initialized successfully with {len(self.pages_cache)} pages") async def list_resources(self) -> List[Resource]: """List all accessible pages and databases.""" self._ensure_initialized() resources = [] # Add pages as resources for page in self.pages_cache: resource = Resource( uri=self.get_resource_uri(page["id"]), name=self._get_page_title(page), description=self._get_page_description(page), mimeType="text/markdown" ) resources.append(resource) # Add databases as resources for database in self.databases_cache: resource = Resource( uri=self.get_resource_uri(f"db_{database['id']}"), name=database.get("title", [{"plain_text": "Untitled Database"}])[0]["plain_text"], description=f"Notion Database with {len(database.get('properties', {}))} properties", mimeType="application/json" ) resources.append(resource) logger.debug(f"Listed {len(resources)} resources from Notion") return resources async def get_content(self, resource_path: str) -> DocumentContent: """Get content for a specific Notion page or database.""" self._ensure_initialized() # Handle database resources if resource_path.startswith("db_"): database_id = resource_path[3:] # Remove 'db_' prefix return await self._get_database_content(database_id) # Handle page resources page = self._find_page_by_id(resource_path) if not page: raise FileNotFoundError(f"Page not found: {resource_path}") try: # Fetch page blocks (content) blocks = await self._fetch_page_blocks(resource_path) # Convert blocks to markdown content = self._blocks_to_markdown(blocks) return DocumentContent( title=self._get_page_title(page), content=content, url=page.get("url", ""), source=self.name, content_type="text/markdown", last_modified=page.get("last_edited_time"), metadata={ "page_id": page["id"], "created_time": page.get("created_time"), "created_by": page.get("created_by", {}).get("id"), "last_edited_by": page.get("last_edited_by", {}).get("id"), "archived": page.get("archived", False), "parent": page.get("parent", {}) } ) except Exception as e: logger.error(f"Error fetching content for page {resource_path}: {e}") raise async def search(self, query: str, limit: int = 10) -> List[SearchResult]: """Search Notion pages using the Notion API search.""" self._ensure_initialized() if not query.strip(): raise ValueError("Search query cannot be empty") try: # Use Notion's search API response = await self.client.post( "/search", json={ "query": query, "filter": { "value": "page", "property": "object" }, "page_size": limit } ) response.raise_for_status() data = response.json() results = [] for page in data.get("results", []): # Get page content for snippet try: blocks = await self._fetch_page_blocks(page["id"], limit=3) content_snippet = self._blocks_to_text(blocks)[:200] if len(content_snippet) == 200: content_snippet += "..." except Exception: content_snippet = "Content not available" results.append(SearchResult( title=self._get_page_title(page), content=content_snippet, url=page.get("url", ""), source=self.name, score=1.0, # Notion API doesn't provide relevance scores metadata={ "page_id": page["id"], "created_time": page.get("created_time"), "last_edited_time": page.get("last_edited_time") } )) return results except Exception as e: logger.error(f"Error searching Notion: {e}") # Fallback to local cache search return await self._search_local_cache(query, limit) async def get_structure(self) -> str: """Get the structure of accessible Notion content.""" self._ensure_initialized() structure_lines = ["# Notion Workspace Structure"] structure_lines.append("") # Group pages by parent root_pages = [] child_pages = {} for page in self.pages_cache: parent = page.get("parent", {}) if parent.get("type") == "workspace": root_pages.append(page) else: parent_id = parent.get("page_id") or parent.get("database_id") if parent_id: if parent_id not in child_pages: child_pages[parent_id] = [] child_pages[parent_id].append(page) else: root_pages.append(page) def add_page_to_structure(page: Dict[str, Any], indent: int = 0): prefix = " " * indent + "- " title = self._get_page_title(page) url = page.get("url", "") structure_lines.append(f"{prefix}[{title}]({url})") # Add child pages page_id = page["id"] if page_id in child_pages: for child in child_pages[page_id]: add_page_to_structure(child, indent + 1) # Add root pages if root_pages: structure_lines.append("## Pages") structure_lines.append("") for page in root_pages: add_page_to_structure(page) # Add databases if self.databases_cache: structure_lines.append("") structure_lines.append("## Databases") structure_lines.append("") for database in self.databases_cache: title = database.get("title", [{"plain_text": "Untitled Database"}])[0]["plain_text"] structure_lines.append(f"- {title}") # Add database properties properties = database.get("properties", {}) for prop_name, prop_data in properties.items(): prop_type = prop_data.get("type", "unknown") structure_lines.append(f" - {prop_name} ({prop_type})") return "\n".join(structure_lines) async def _test_connection(self) -> None: """Test the Notion API connection.""" try: response = await self.client.get("/users/me") response.raise_for_status() logger.debug("Notion API connection test successful") except Exception as e: logger.error(f"Notion API connection test failed: {e}") raise ConnectionError(f"Failed to connect to Notion API: {e}") async def _build_content_cache(self) -> None: """Build cache of accessible pages and databases.""" try: # Search for all accessible pages response = await self.client.post( "/search", json={ "filter": { "value": "page", "property": "object" }, "page_size": 100 } ) response.raise_for_status() data = response.json() self.pages_cache = data.get("results", []) # Search for all accessible databases response = await self.client.post( "/search", json={ "filter": { "value": "database", "property": "object" }, "page_size": 100 } ) response.raise_for_status() data = response.json() self.databases_cache = data.get("results", []) logger.debug(f"Cached {len(self.pages_cache)} pages and {len(self.databases_cache)} databases") except Exception as e: logger.error(f"Error building content cache: {e}") self.pages_cache = [] self.databases_cache = [] async def _fetch_page_blocks(self, page_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]: """Fetch blocks (content) for a page.""" try: params = {} if limit: params["page_size"] = limit response = await self.client.get(f"/blocks/{page_id}/children", params=params) response.raise_for_status() data = response.json() blocks = data.get("results", []) # Recursively fetch child blocks for nested content for block in blocks: if block.get("has_children"): try: child_blocks = await self._fetch_page_blocks(block["id"]) block["children"] = child_blocks except Exception as e: logger.warning(f"Failed to fetch child blocks for {block['id']}: {e}") return blocks except Exception as e: logger.error(f"Error fetching blocks for page {page_id}: {e}") return [] async def _get_database_content(self, database_id: str) -> DocumentContent: """Get content for a database (list of pages).""" database = self._find_database_by_id(database_id) if not database: raise FileNotFoundError(f"Database not found: {database_id}") try: # Query database for pages response = await self.client.post( f"/databases/{database_id}/query", json={"page_size": 100} ) response.raise_for_status() data = response.json() # Convert database pages to markdown table content = self._database_to_markdown(database, data.get("results", [])) title = database.get("title", [{"plain_text": "Untitled Database"}])[0]["plain_text"] return DocumentContent( title=title, content=content, url=database.get("url", ""), source=self.name, content_type="text/markdown", last_modified=database.get("last_edited_time"), metadata={ "database_id": database["id"], "created_time": database.get("created_time"), "properties": list(database.get("properties", {}).keys()) } ) except Exception as e: logger.error(f"Error fetching database content: {e}") raise def _blocks_to_markdown(self, blocks: List[Dict[str, Any]]) -> str: """Convert Notion blocks to markdown.""" markdown_lines = [] for block in blocks: block_type = block.get("type") if block_type == "paragraph": text = self._rich_text_to_markdown(block.get("paragraph", {}).get("rich_text", [])) if text.strip(): markdown_lines.append(text) markdown_lines.append("") elif block_type in ["heading_1", "heading_2", "heading_3"]: level = int(block_type.split("_")[1]) text = self._rich_text_to_markdown(block.get(block_type, {}).get("rich_text", [])) if text.strip(): markdown_lines.append("#" * level + " " + text) markdown_lines.append("") elif block_type == "bulleted_list_item": text = self._rich_text_to_markdown(block.get("bulleted_list_item", {}).get("rich_text", [])) if text.strip(): markdown_lines.append("- " + text) elif block_type == "numbered_list_item": text = self._rich_text_to_markdown(block.get("numbered_list_item", {}).get("rich_text", [])) if text.strip(): markdown_lines.append("1. " + text) elif block_type == "code": code_block = block.get("code", {}) language = code_block.get("language", "") text = self._rich_text_to_markdown(code_block.get("rich_text", [])) markdown_lines.append(f"```{language}") markdown_lines.append(text) markdown_lines.append("```") markdown_lines.append("") elif block_type == "quote": text = self._rich_text_to_markdown(block.get("quote", {}).get("rich_text", [])) if text.strip(): markdown_lines.append("> " + text) markdown_lines.append("") elif block_type == "divider": markdown_lines.append("---") markdown_lines.append("") # Handle child blocks if "children" in block: child_markdown = self._blocks_to_markdown(block["children"]) if child_markdown.strip(): # Indent child content indented_lines = [" " + line for line in child_markdown.split("\n")] markdown_lines.extend(indented_lines) return "\n".join(markdown_lines).strip() def _blocks_to_text(self, blocks: List[Dict[str, Any]]) -> str: """Convert Notion blocks to plain text.""" text_parts = [] for block in blocks: block_type = block.get("type") if block_type in ["paragraph", "heading_1", "heading_2", "heading_3", "bulleted_list_item", "numbered_list_item", "quote"]: rich_text = block.get(block_type, {}).get("rich_text", []) text = self._rich_text_to_text(rich_text) if text.strip(): text_parts.append(text) elif block_type == "code": rich_text = block.get("code", {}).get("rich_text", []) text = self._rich_text_to_text(rich_text) if text.strip(): text_parts.append(text) # Handle child blocks if "children" in block: child_text = self._blocks_to_text(block["children"]) if child_text.strip(): text_parts.append(child_text) return " ".join(text_parts) def _rich_text_to_markdown(self, rich_text: List[Dict[str, Any]]) -> str: """Convert Notion rich text to markdown.""" result = [] for text_obj in rich_text: text = text_obj.get("plain_text", "") annotations = text_obj.get("annotations", {}) if annotations.get("bold"): text = f"**{text}**" if annotations.get("italic"): text = f"*{text}*" if annotations.get("strikethrough"): text = f"~~{text}~~" if annotations.get("code"): text = f"`{text}`" # Handle links if text_obj.get("href"): text = f"[{text}]({text_obj['href']})" result.append(text) return "".join(result) def _rich_text_to_text(self, rich_text: List[Dict[str, Any]]) -> str: """Convert Notion rich text to plain text.""" return "".join([text_obj.get("plain_text", "") for text_obj in rich_text]) def _database_to_markdown(self, database: Dict[str, Any], pages: List[Dict[str, Any]]) -> str: """Convert database pages to markdown table.""" if not pages: return "No pages found in this database." # Get properties from database schema properties = database.get("properties", {}) # Create table header headers = ["Title"] + list(properties.keys()) markdown_lines = [ "| " + " | ".join(headers) + " |", "| " + " | ".join(["---"] * len(headers)) + " |" ] # Add rows for page in pages: row = [self._get_page_title(page)] page_properties = page.get("properties", {}) for prop_name in properties.keys(): prop_value = page_properties.get(prop_name, {}) cell_value = self._property_to_text(prop_value) row.append(cell_value) markdown_lines.append("| " + " | ".join(row) + " |") return "\n".join(markdown_lines) def _property_to_text(self, prop: Dict[str, Any]) -> str: """Convert Notion property to text.""" prop_type = prop.get("type") if prop_type == "title": return self._rich_text_to_text(prop.get("title", [])) elif prop_type == "rich_text": return self._rich_text_to_text(prop.get("rich_text", [])) elif prop_type == "number": return str(prop.get("number", "")) elif prop_type == "select": select = prop.get("select") return select.get("name", "") if select else "" elif prop_type == "multi_select": multi_select = prop.get("multi_select", []) return ", ".join([item.get("name", "") for item in multi_select]) elif prop_type == "date": date = prop.get("date") return date.get("start", "") if date else "" elif prop_type == "checkbox": return "โœ“" if prop.get("checkbox") else "โœ—" elif prop_type == "url": return prop.get("url", "") elif prop_type == "email": return prop.get("email", "") elif prop_type == "phone_number": return prop.get("phone_number", "") else: return "" def _get_page_title(self, page: Dict[str, Any]) -> str: """Extract title from a Notion page.""" properties = page.get("properties", {}) # Look for title property for prop_name, prop_data in properties.items(): if prop_data.get("type") == "title": title_text = prop_data.get("title", []) if title_text: return self._rich_text_to_text(title_text) # Fallback to page title from API if "title" in page: return page["title"] return "Untitled" def _get_page_description(self, page: Dict[str, Any]) -> str: """Extract description from a Notion page.""" # Try to get first paragraph as description try: blocks = page.get("_blocks", []) if blocks: first_block = blocks[0] if first_block.get("type") == "paragraph": text = self._rich_text_to_text(first_block.get("paragraph", {}).get("rich_text", [])) return text[:100] + "..." if len(text) > 100 else text except Exception: pass return f"Notion page (last edited: {page.get('last_edited_time', 'unknown')})" def _find_page_by_id(self, page_id: str) -> Optional[Dict[str, Any]]: """Find a page in the cache by ID.""" for page in self.pages_cache: if page["id"] == page_id: return page return None def _find_database_by_id(self, database_id: str) -> Optional[Dict[str, Any]]: """Find a database in the cache by ID.""" for database in self.databases_cache: if database["id"] == database_id: return database return None async def _search_local_cache(self, query: str, limit: int) -> List[SearchResult]: """Fallback search using local cache.""" results = [] query_lower = query.lower() for page in self.pages_cache: score = 0.0 title = self._get_page_title(page) # Check title match if query_lower in title.lower(): score += 10.0 if score > 0: results.append(SearchResult( title=title, content=self._get_page_description(page), url=page.get("url", ""), source=self.name, score=score, metadata={ "page_id": page["id"], "created_time": page.get("created_time"), "last_edited_time": page.get("last_edited_time") } )) # Sort by score and limit results results.sort(key=lambda x: x.score, reverse=True) return results[:limit] async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.client: await self.client.aclose()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/funky1688/AnyDocs-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server