Notion API MCP Server

by pbohannon
Verified
""" Notion Databases API interactions. """ from typing import Any, Dict, List, Optional, Union import httpx import structlog from datetime import datetime logger = structlog.get_logger() class DatabasesAPI: """ Handles interactions with Notion's Databases API endpoints. Supports advanced querying, filtering, and database management. """ def __init__(self, client: httpx.AsyncClient): """ Initialize DatabasesAPI with an HTTP client. Args: client: Configured httpx AsyncClient for Notion API requests """ self._client = client self._log = logger.bind(module="databases_api") async def create_database( self, parent_page_id: str, title: str, properties: Dict[str, Any] ) -> Dict[str, Any]: """ Create a new database. Args: parent_page_id: ID of parent page title: Database title properties: Database property definitions Returns: Created database object Raises: httpx.HTTPError: On API request failure """ try: response = await self._client.post( "databases", json={ "parent": { "type": "page_id", "page_id": parent_page_id }, "title": [{ "type": "text", "text": {"content": title} }], "properties": properties } ) response.raise_for_status() return response.json() except httpx.HTTPError as e: self._log.error( "create_database_error", parent_id=parent_page_id, error=str(e) ) raise async def query_database( self, database_id: str, filter_conditions: Optional[Dict[str, Any]] = None, sorts: Optional[List[Dict[str, Any]]] = None, start_cursor: Optional[str] = None, page_size: int = 100 ) -> Dict[str, Any]: """ Query a database with advanced filtering and sorting. Args: database_id: Database to query filter_conditions: Filter criteria sorts: Sort specifications start_cursor: Pagination cursor page_size: Results per page Returns: Query results Raises: httpx.HTTPError: On API request failure """ try: body: Dict[str, Any] = {"page_size": page_size} if filter_conditions: body["filter"] = filter_conditions if sorts: body["sorts"] = sorts if start_cursor: body["start_cursor"] = start_cursor response = await self._client.post( f"databases/{database_id}/query", json=body ) response.raise_for_status() return response.json() except httpx.HTTPError as e: self._log.error( "query_database_error", database_id=database_id, error=str(e) ) raise def create_filter( self, conditions: List[Dict[str, Any]], operator: str = "and" ) -> Dict[str, Any]: """ Create a compound filter for database queries. Args: conditions: List of filter conditions operator: Logic operator ('and' or 'or') Returns: Filter object for API Raises: ValueError: If operator is invalid """ if operator not in ("and", "or"): raise ValueError("Operator must be 'and' or 'or'") return {operator: conditions} def create_date_filter( self, property_name: str, condition: str, value: Union[str, datetime, None] = None ) -> Dict[str, Any]: """ Create a date filter condition. Args: property_name: Property to filter on condition: Date condition (before, after, equals, etc.) value: Date value. If datetime, will be converted to ISO format. If None, used for conditions like 'is_empty' Returns: Date filter object formatted for Notion API """ if isinstance(value, datetime): value = value.isoformat() filter_obj = { "property": property_name, "date": {} } if value is None and condition in ["is_empty", "is_not_empty"]: filter_obj["date"][condition] = True else: filter_obj["date"][condition] = value return filter_obj def create_text_filter( self, property_name: str, condition: str, value: str ) -> Dict[str, Any]: """ Create a text filter condition. Args: property_name: Property to filter on condition: Text condition (equals, contains, etc.) value: Text value Returns: Text filter object """ return { "property": property_name, "text": {condition: value} } def create_number_filter( self, property_name: str, condition: str, value: Union[int, float] ) -> Dict[str, Any]: """ Create a number filter condition. Args: property_name: Property to filter on condition: Number condition (equals, greater_than, etc.) value: Numeric value Returns: Number filter object """ return { "property": property_name, "number": {condition: value} } def create_search_filter( self, query: str, property_name: Optional[str] = None ) -> Dict[str, Any]: """ Create a search filter for full-text or property-specific search. Args: query: Search query text property_name: Optional property to search within. If None, performs full-text search across all text content. Returns: Search filter object """ if property_name: return { "property": property_name, "rich_text": {"contains": query} } else: return {"title": {"contains": query}} def create_sort( self, property_name: str, direction: str = "ascending" ) -> Dict[str, Any]: """ Create a sort specification. Args: property_name: Property to sort by direction: Sort direction Returns: Sort object Raises: ValueError: If direction is invalid """ if direction not in ("ascending", "descending"): raise ValueError("Direction must be 'ascending' or 'descending'") return { "property": property_name, "direction": direction } async def update_database( self, database_id: str, title: Optional[str] = None, properties: Optional[Dict[str, Any]] = None, archived: Optional[bool] = None ) -> Dict[str, Any]: """ Update database title, schema, or archive status. Args: database_id: Database to update title: New title properties: Updated property definitions archived: Archive status Returns: Updated database object Raises: httpx.HTTPError: On API request failure """ try: body: Dict[str, Any] = {} if title: body["title"] = [{ "type": "text", "text": {"content": title} }] if properties: body["properties"] = properties if archived is not None: body["archived"] = archived response = await self._client.patch( f"databases/{database_id}", json=body ) response.raise_for_status() return response.json() except httpx.HTTPError as e: self._log.error( "update_database_error", database_id=database_id, error=str(e) ) raise async def get_database(self, database_id: str) -> Dict[str, Any]: """ Retrieve database metadata. Args: database_id: Database to retrieve Returns: Database object Raises: httpx.HTTPError: On API request failure """ try: response = await self._client.get(f"databases/{database_id}") response.raise_for_status() return response.json() except httpx.HTTPError as e: self._log.error( "get_database_error", database_id=database_id, error=str(e) ) raise async def list_databases(self) -> Dict[str, Any]: """ List all databases the integration has access to. Returns: List of database objects Raises: httpx.HTTPError: On API request failure """ try: response = await self._client.post( "search", json={ "filter": { "value": "database", "property": "object" } } ) response.raise_for_status() return response.json() except httpx.HTTPError as e: self._log.error( "list_databases_error", error=str(e) ) raise async def search_database( self, database_id: str, query: str, property_name: Optional[str] = None, sorts: Optional[List[Dict[str, Any]]] = None, start_cursor: Optional[str] = None, page_size: int = 100 ) -> Dict[str, Any]: """ Search database content with optional property-specific search and sorting. Args: database_id: Database to search query: Search query text property_name: Optional property to search within sorts: Optional sort specifications start_cursor: Pagination cursor page_size: Results per page Returns: Search results from database query Raises: httpx.HTTPError: On API request failure """ try: # Create search filter filter_obj = self.create_search_filter(query, property_name) # Query database with filter return await self.query_database( database_id, filter_conditions=filter_obj, sorts=sorts, start_cursor=start_cursor, page_size=page_size ) except httpx.HTTPError as e: self._log.error( "search_database_error", database_id=database_id, query=query, error=str(e) ) raise