notion.pyโข24.3 kB
#!/usr/bin/env python3
"""
Notion Document Adapter
Adapter for integrating with Notion workspaces and databases.
"""
import asyncio
import json
from typing import Any, Dict, List, Optional
from datetime import datetime
import httpx
from mcp.types import Resource
from .base import BaseDocumentAdapter, DocumentContent, SearchResult
from ..utils import get_logger
logger = get_logger(__name__)
class NotionAdapter(BaseDocumentAdapter):
"""Adapter for Notion workspaces.
Supports integration with Notion pages and databases through the official Notion API.
Required configuration:
- api_token: Notion integration token
- database_id: Notion database ID (optional, for database-based docs)
- page_id: Root page ID (optional, for page-based docs)
"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.client: Optional[httpx.AsyncClient] = None
self.pages_cache: List[Dict[str, Any]] = []
self.databases_cache: List[Dict[str, Any]] = []
async def initialize(self) -> None:
"""Initialize the Notion adapter."""
logger.info("Initializing Notion adapter")
# Validate required configuration
self._validate_config(["api_token"])
# Setup HTTP client with Notion API headers
self.client = httpx.AsyncClient(
base_url="https://api.notion.com/v1",
headers={
"Authorization": f"Bearer {self.config['api_token']}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json",
"User-Agent": "AnyDocs-MCP/0.1.0"
},
timeout=30.0
)
# Test connection
await self._test_connection()
# Build cache of accessible content
await self._build_content_cache()
self._initialized = True
logger.info(f"Notion adapter initialized successfully with {len(self.pages_cache)} pages")
async def list_resources(self) -> List[Resource]:
"""List all accessible pages and databases."""
self._ensure_initialized()
resources = []
# Add pages as resources
for page in self.pages_cache:
resource = Resource(
uri=self.get_resource_uri(page["id"]),
name=self._get_page_title(page),
description=self._get_page_description(page),
mimeType="text/markdown"
)
resources.append(resource)
# Add databases as resources
for database in self.databases_cache:
resource = Resource(
uri=self.get_resource_uri(f"db_{database['id']}"),
name=database.get("title", [{"plain_text": "Untitled Database"}])[0]["plain_text"],
description=f"Notion Database with {len(database.get('properties', {}))} properties",
mimeType="application/json"
)
resources.append(resource)
logger.debug(f"Listed {len(resources)} resources from Notion")
return resources
async def get_content(self, resource_path: str) -> DocumentContent:
"""Get content for a specific Notion page or database."""
self._ensure_initialized()
# Handle database resources
if resource_path.startswith("db_"):
database_id = resource_path[3:] # Remove 'db_' prefix
return await self._get_database_content(database_id)
# Handle page resources
page = self._find_page_by_id(resource_path)
if not page:
raise FileNotFoundError(f"Page not found: {resource_path}")
try:
# Fetch page blocks (content)
blocks = await self._fetch_page_blocks(resource_path)
# Convert blocks to markdown
content = self._blocks_to_markdown(blocks)
return DocumentContent(
title=self._get_page_title(page),
content=content,
url=page.get("url", ""),
source=self.name,
content_type="text/markdown",
last_modified=page.get("last_edited_time"),
metadata={
"page_id": page["id"],
"created_time": page.get("created_time"),
"created_by": page.get("created_by", {}).get("id"),
"last_edited_by": page.get("last_edited_by", {}).get("id"),
"archived": page.get("archived", False),
"parent": page.get("parent", {})
}
)
except Exception as e:
logger.error(f"Error fetching content for page {resource_path}: {e}")
raise
async def search(self, query: str, limit: int = 10) -> List[SearchResult]:
"""Search Notion pages using the Notion API search."""
self._ensure_initialized()
if not query.strip():
raise ValueError("Search query cannot be empty")
try:
# Use Notion's search API
response = await self.client.post(
"/search",
json={
"query": query,
"filter": {
"value": "page",
"property": "object"
},
"page_size": limit
}
)
response.raise_for_status()
data = response.json()
results = []
for page in data.get("results", []):
# Get page content for snippet
try:
blocks = await self._fetch_page_blocks(page["id"], limit=3)
content_snippet = self._blocks_to_text(blocks)[:200]
if len(content_snippet) == 200:
content_snippet += "..."
except Exception:
content_snippet = "Content not available"
results.append(SearchResult(
title=self._get_page_title(page),
content=content_snippet,
url=page.get("url", ""),
source=self.name,
score=1.0, # Notion API doesn't provide relevance scores
metadata={
"page_id": page["id"],
"created_time": page.get("created_time"),
"last_edited_time": page.get("last_edited_time")
}
))
return results
except Exception as e:
logger.error(f"Error searching Notion: {e}")
# Fallback to local cache search
return await self._search_local_cache(query, limit)
async def get_structure(self) -> str:
"""Get the structure of accessible Notion content."""
self._ensure_initialized()
structure_lines = ["# Notion Workspace Structure"]
structure_lines.append("")
# Group pages by parent
root_pages = []
child_pages = {}
for page in self.pages_cache:
parent = page.get("parent", {})
if parent.get("type") == "workspace":
root_pages.append(page)
else:
parent_id = parent.get("page_id") or parent.get("database_id")
if parent_id:
if parent_id not in child_pages:
child_pages[parent_id] = []
child_pages[parent_id].append(page)
else:
root_pages.append(page)
def add_page_to_structure(page: Dict[str, Any], indent: int = 0):
prefix = " " * indent + "- "
title = self._get_page_title(page)
url = page.get("url", "")
structure_lines.append(f"{prefix}[{title}]({url})")
# Add child pages
page_id = page["id"]
if page_id in child_pages:
for child in child_pages[page_id]:
add_page_to_structure(child, indent + 1)
# Add root pages
if root_pages:
structure_lines.append("## Pages")
structure_lines.append("")
for page in root_pages:
add_page_to_structure(page)
# Add databases
if self.databases_cache:
structure_lines.append("")
structure_lines.append("## Databases")
structure_lines.append("")
for database in self.databases_cache:
title = database.get("title", [{"plain_text": "Untitled Database"}])[0]["plain_text"]
structure_lines.append(f"- {title}")
# Add database properties
properties = database.get("properties", {})
for prop_name, prop_data in properties.items():
prop_type = prop_data.get("type", "unknown")
structure_lines.append(f" - {prop_name} ({prop_type})")
return "\n".join(structure_lines)
async def _test_connection(self) -> None:
"""Test the Notion API connection."""
try:
response = await self.client.get("/users/me")
response.raise_for_status()
logger.debug("Notion API connection test successful")
except Exception as e:
logger.error(f"Notion API connection test failed: {e}")
raise ConnectionError(f"Failed to connect to Notion API: {e}")
async def _build_content_cache(self) -> None:
"""Build cache of accessible pages and databases."""
try:
# Search for all accessible pages
response = await self.client.post(
"/search",
json={
"filter": {
"value": "page",
"property": "object"
},
"page_size": 100
}
)
response.raise_for_status()
data = response.json()
self.pages_cache = data.get("results", [])
# Search for all accessible databases
response = await self.client.post(
"/search",
json={
"filter": {
"value": "database",
"property": "object"
},
"page_size": 100
}
)
response.raise_for_status()
data = response.json()
self.databases_cache = data.get("results", [])
logger.debug(f"Cached {len(self.pages_cache)} pages and {len(self.databases_cache)} databases")
except Exception as e:
logger.error(f"Error building content cache: {e}")
self.pages_cache = []
self.databases_cache = []
async def _fetch_page_blocks(self, page_id: str, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""Fetch blocks (content) for a page."""
try:
params = {}
if limit:
params["page_size"] = limit
response = await self.client.get(f"/blocks/{page_id}/children", params=params)
response.raise_for_status()
data = response.json()
blocks = data.get("results", [])
# Recursively fetch child blocks for nested content
for block in blocks:
if block.get("has_children"):
try:
child_blocks = await self._fetch_page_blocks(block["id"])
block["children"] = child_blocks
except Exception as e:
logger.warning(f"Failed to fetch child blocks for {block['id']}: {e}")
return blocks
except Exception as e:
logger.error(f"Error fetching blocks for page {page_id}: {e}")
return []
async def _get_database_content(self, database_id: str) -> DocumentContent:
"""Get content for a database (list of pages)."""
database = self._find_database_by_id(database_id)
if not database:
raise FileNotFoundError(f"Database not found: {database_id}")
try:
# Query database for pages
response = await self.client.post(
f"/databases/{database_id}/query",
json={"page_size": 100}
)
response.raise_for_status()
data = response.json()
# Convert database pages to markdown table
content = self._database_to_markdown(database, data.get("results", []))
title = database.get("title", [{"plain_text": "Untitled Database"}])[0]["plain_text"]
return DocumentContent(
title=title,
content=content,
url=database.get("url", ""),
source=self.name,
content_type="text/markdown",
last_modified=database.get("last_edited_time"),
metadata={
"database_id": database["id"],
"created_time": database.get("created_time"),
"properties": list(database.get("properties", {}).keys())
}
)
except Exception as e:
logger.error(f"Error fetching database content: {e}")
raise
def _blocks_to_markdown(self, blocks: List[Dict[str, Any]]) -> str:
"""Convert Notion blocks to markdown."""
markdown_lines = []
for block in blocks:
block_type = block.get("type")
if block_type == "paragraph":
text = self._rich_text_to_markdown(block.get("paragraph", {}).get("rich_text", []))
if text.strip():
markdown_lines.append(text)
markdown_lines.append("")
elif block_type in ["heading_1", "heading_2", "heading_3"]:
level = int(block_type.split("_")[1])
text = self._rich_text_to_markdown(block.get(block_type, {}).get("rich_text", []))
if text.strip():
markdown_lines.append("#" * level + " " + text)
markdown_lines.append("")
elif block_type == "bulleted_list_item":
text = self._rich_text_to_markdown(block.get("bulleted_list_item", {}).get("rich_text", []))
if text.strip():
markdown_lines.append("- " + text)
elif block_type == "numbered_list_item":
text = self._rich_text_to_markdown(block.get("numbered_list_item", {}).get("rich_text", []))
if text.strip():
markdown_lines.append("1. " + text)
elif block_type == "code":
code_block = block.get("code", {})
language = code_block.get("language", "")
text = self._rich_text_to_markdown(code_block.get("rich_text", []))
markdown_lines.append(f"```{language}")
markdown_lines.append(text)
markdown_lines.append("```")
markdown_lines.append("")
elif block_type == "quote":
text = self._rich_text_to_markdown(block.get("quote", {}).get("rich_text", []))
if text.strip():
markdown_lines.append("> " + text)
markdown_lines.append("")
elif block_type == "divider":
markdown_lines.append("---")
markdown_lines.append("")
# Handle child blocks
if "children" in block:
child_markdown = self._blocks_to_markdown(block["children"])
if child_markdown.strip():
# Indent child content
indented_lines = [" " + line for line in child_markdown.split("\n")]
markdown_lines.extend(indented_lines)
return "\n".join(markdown_lines).strip()
def _blocks_to_text(self, blocks: List[Dict[str, Any]]) -> str:
"""Convert Notion blocks to plain text."""
text_parts = []
for block in blocks:
block_type = block.get("type")
if block_type in ["paragraph", "heading_1", "heading_2", "heading_3", "bulleted_list_item", "numbered_list_item", "quote"]:
rich_text = block.get(block_type, {}).get("rich_text", [])
text = self._rich_text_to_text(rich_text)
if text.strip():
text_parts.append(text)
elif block_type == "code":
rich_text = block.get("code", {}).get("rich_text", [])
text = self._rich_text_to_text(rich_text)
if text.strip():
text_parts.append(text)
# Handle child blocks
if "children" in block:
child_text = self._blocks_to_text(block["children"])
if child_text.strip():
text_parts.append(child_text)
return " ".join(text_parts)
def _rich_text_to_markdown(self, rich_text: List[Dict[str, Any]]) -> str:
"""Convert Notion rich text to markdown."""
result = []
for text_obj in rich_text:
text = text_obj.get("plain_text", "")
annotations = text_obj.get("annotations", {})
if annotations.get("bold"):
text = f"**{text}**"
if annotations.get("italic"):
text = f"*{text}*"
if annotations.get("strikethrough"):
text = f"~~{text}~~"
if annotations.get("code"):
text = f"`{text}`"
# Handle links
if text_obj.get("href"):
text = f"[{text}]({text_obj['href']})"
result.append(text)
return "".join(result)
def _rich_text_to_text(self, rich_text: List[Dict[str, Any]]) -> str:
"""Convert Notion rich text to plain text."""
return "".join([text_obj.get("plain_text", "") for text_obj in rich_text])
def _database_to_markdown(self, database: Dict[str, Any], pages: List[Dict[str, Any]]) -> str:
"""Convert database pages to markdown table."""
if not pages:
return "No pages found in this database."
# Get properties from database schema
properties = database.get("properties", {})
# Create table header
headers = ["Title"] + list(properties.keys())
markdown_lines = [
"| " + " | ".join(headers) + " |",
"| " + " | ".join(["---"] * len(headers)) + " |"
]
# Add rows
for page in pages:
row = [self._get_page_title(page)]
page_properties = page.get("properties", {})
for prop_name in properties.keys():
prop_value = page_properties.get(prop_name, {})
cell_value = self._property_to_text(prop_value)
row.append(cell_value)
markdown_lines.append("| " + " | ".join(row) + " |")
return "\n".join(markdown_lines)
def _property_to_text(self, prop: Dict[str, Any]) -> str:
"""Convert Notion property to text."""
prop_type = prop.get("type")
if prop_type == "title":
return self._rich_text_to_text(prop.get("title", []))
elif prop_type == "rich_text":
return self._rich_text_to_text(prop.get("rich_text", []))
elif prop_type == "number":
return str(prop.get("number", ""))
elif prop_type == "select":
select = prop.get("select")
return select.get("name", "") if select else ""
elif prop_type == "multi_select":
multi_select = prop.get("multi_select", [])
return ", ".join([item.get("name", "") for item in multi_select])
elif prop_type == "date":
date = prop.get("date")
return date.get("start", "") if date else ""
elif prop_type == "checkbox":
return "โ" if prop.get("checkbox") else "โ"
elif prop_type == "url":
return prop.get("url", "")
elif prop_type == "email":
return prop.get("email", "")
elif prop_type == "phone_number":
return prop.get("phone_number", "")
else:
return ""
def _get_page_title(self, page: Dict[str, Any]) -> str:
"""Extract title from a Notion page."""
properties = page.get("properties", {})
# Look for title property
for prop_name, prop_data in properties.items():
if prop_data.get("type") == "title":
title_text = prop_data.get("title", [])
if title_text:
return self._rich_text_to_text(title_text)
# Fallback to page title from API
if "title" in page:
return page["title"]
return "Untitled"
def _get_page_description(self, page: Dict[str, Any]) -> str:
"""Extract description from a Notion page."""
# Try to get first paragraph as description
try:
blocks = page.get("_blocks", [])
if blocks:
first_block = blocks[0]
if first_block.get("type") == "paragraph":
text = self._rich_text_to_text(first_block.get("paragraph", {}).get("rich_text", []))
return text[:100] + "..." if len(text) > 100 else text
except Exception:
pass
return f"Notion page (last edited: {page.get('last_edited_time', 'unknown')})"
def _find_page_by_id(self, page_id: str) -> Optional[Dict[str, Any]]:
"""Find a page in the cache by ID."""
for page in self.pages_cache:
if page["id"] == page_id:
return page
return None
def _find_database_by_id(self, database_id: str) -> Optional[Dict[str, Any]]:
"""Find a database in the cache by ID."""
for database in self.databases_cache:
if database["id"] == database_id:
return database
return None
async def _search_local_cache(self, query: str, limit: int) -> List[SearchResult]:
"""Fallback search using local cache."""
results = []
query_lower = query.lower()
for page in self.pages_cache:
score = 0.0
title = self._get_page_title(page)
# Check title match
if query_lower in title.lower():
score += 10.0
if score > 0:
results.append(SearchResult(
title=title,
content=self._get_page_description(page),
url=page.get("url", ""),
source=self.name,
score=score,
metadata={
"page_id": page["id"],
"created_time": page.get("created_time"),
"last_edited_time": page.get("last_edited_time")
}
))
# Sort by score and limit results
results.sort(key=lambda x: x.score, reverse=True)
return results[:limit]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client:
await self.client.aclose()