Skip to main content
Glama

Notion MCP Server

by ccabanillas
client.py9.72 kB
"""Notion API client implementation.""" import os from typing import Any, Dict, List, Optional, Union import httpx import logging import sys import rich from rich.logging import RichHandler from .models.notion import Database, Page, SearchResults, PropertyValue class NotionClient: """Client for interacting with the Notion API.""" def __init__(self, api_key: str): """Initialize the Notion client. Args: api_key: Notion API key """ self.api_key = api_key self.base_url = "https://api.notion.com/v1" # Updated to the latest stable Notion API version self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Notion-Version": "2022-02-22" } # Set up logging to stderr to avoid breaking MCP self.logger = logging.getLogger("notion_client") self.logger.setLevel(logging.INFO) # Make sure handler outputs to stderr if not self.logger.handlers: handler = RichHandler(rich_tracebacks=True, console=rich.console.Console(file=sys.stderr)) self.logger.addHandler(handler) async def list_databases(self) -> List[Database]: """List all databases the integration has access to.""" try: async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/search", headers=self.headers, json={ "filter": { "property": "object", "value": "database" }, "page_size": 100, "sort": { "direction": "descending", "timestamp": "last_edited_time" } } ) response.raise_for_status() data = response.json() if not data.get("results"): return [] return [Database.model_validate(db) for db in data["results"]] except httpx.HTTPStatusError as e: self.logger.error(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") raise except Exception as e: self.logger.error(f"Error listing databases: {str(e)}") raise async def query_database( self, database_id: str, filter: Optional[Dict[str, Any]] = None, sorts: Optional[List[Dict[str, Any]]] = None, start_cursor: Optional[str] = None, page_size: int = 100 ) -> Dict[str, Any]: """Query a database.""" try: body = { "page_size": page_size } if filter: body["filter"] = filter if sorts: body["sorts"] = sorts if start_cursor: body["start_cursor"] = start_cursor async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/databases/{database_id}/query", headers=self.headers, json=body ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: self.logger.error(f"HTTP error querying database {database_id}: {e.response.status_code} - {e.response.text}") raise except Exception as e: self.logger.error(f"Error querying database {database_id}: {str(e)}") raise async def create_page( self, parent_id: str, properties: Dict[str, Any], children: Optional[List[Dict[str, Any]]] = None ) -> Page: """Create a new page.""" try: body = { "parent": {"database_id": parent_id}, "properties": properties } if children: body["children"] = children async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/pages", headers=self.headers, json=body ) response.raise_for_status() return Page.model_validate(response.json()) except httpx.HTTPStatusError as e: self.logger.error(f"HTTP error creating page: {e.response.status_code} - {e.response.text}") raise except Exception as e: self.logger.error(f"Error creating page: {str(e)}") raise async def update_page( self, page_id: str, properties: Dict[str, Any], archived: Optional[bool] = None ) -> Page: """Update a page.""" try: body = {"properties": properties} if archived is not None: body["archived"] = archived async with httpx.AsyncClient() as client: response = await client.patch( f"{self.base_url}/pages/{page_id}", headers=self.headers, json=body ) response.raise_for_status() return Page.model_validate(response.json()) except httpx.HTTPStatusError as e: self.logger.error(f"HTTP error updating page {page_id}: {e.response.status_code} - {e.response.text}") raise except Exception as e: self.logger.error(f"Error updating page {page_id}: {str(e)}") raise async def search( self, query: str = "", filter: Optional[Dict[str, Any]] = None, sort: Optional[Dict[str, Any]] = None, start_cursor: Optional[str] = None, page_size: int = 100 ) -> SearchResults: """Search Notion.""" try: body = { "query": query, "page_size": page_size } if filter: body["filter"] = filter if sort: body["sort"] = sort if start_cursor: body["start_cursor"] = start_cursor async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/search", headers=self.headers, json=body ) response.raise_for_status() data = response.json() # Convert results based on their object type results = [] for item in data.get("results", []): try: if item["object"] == "database": results.append(Database.model_validate(item)) elif item["object"] == "page": results.append(Page.model_validate(item)) except Exception as e: self.logger.warning(f"Error processing search result: {str(e)}") continue return SearchResults( object="list", results=results, next_cursor=data.get("next_cursor"), has_more=data.get("has_more", False) ) except httpx.HTTPStatusError as e: self.logger.error(f"HTTP error during search: {e.response.status_code} - {e.response.text}") raise except Exception as e: self.logger.error(f"Error during search: {str(e)}") raise async def get_block_children( self, block_id: str, start_cursor: Optional[str] = None, page_size: int = 100 ) -> Dict[str, Any]: """Get children blocks of a block.""" try: params = {"page_size": page_size} if start_cursor: params["start_cursor"] = start_cursor async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/blocks/{block_id}/children", headers=self.headers, params=params ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: self.logger.error(f"HTTP error getting block children: {e.response.status_code} - {e.response.text}") raise except Exception as e: self.logger.error(f"Error getting block children: {str(e)}") raise async def get_database( self, database_id: str ) -> Database: """Get database metadata.""" try: async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/databases/{database_id}", headers=self.headers ) response.raise_for_status() return Database.model_validate(response.json()) except httpx.HTTPStatusError as e: self.logger.error(f"HTTP error getting database: {e.response.status_code} - {e.response.text}") raise except Exception as e: self.logger.error(f"Error getting database: {str(e)}") raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ccabanillas/notion-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server