Skip to main content
Glama
onimsha

Airtable OAuth MCP Server

by onimsha
client.py16.1 kB
"""Airtable API client with OAuth 2.0 authentication.""" import asyncio import logging import time from typing import Any, Protocol import httpx from pydantic import ValidationError from .exceptions import ( AirtableAPIError, AirtableAuthError, AirtableNotFoundError, AirtableRateLimitError, AirtableValidationError, ) from .models import ( AirtableRecord, BaseSchemaResponse, CreateRecordsRequest, CreateRecordsResponse, DeleteRecordsResponse, ListBasesResponse, ListRecordsOptions, ListRecordsResponse, UpdateRecordsRequest, UpdateRecordsResponse, ) logger = logging.getLogger(__name__) class OAuthHandler(Protocol): """Protocol for OAuth handlers used by AirtableClient.""" async def ensure_valid_token(self) -> bool: """Ensure the access token is valid, refreshing if necessary.""" ... @property def access_token(self) -> str | None: """Get the current access token.""" ... # Airtable API configuration AIRTABLE_BASE_URL = "https://api.airtable.com" RATE_LIMIT_REQUESTS_PER_SECOND = 5 RATE_LIMIT_WINDOW = 1.0 # 1 second class AirtableClient: """Airtable API client with OAuth 2.0 authentication and rate limiting. This client provides async methods for interacting with the Airtable API: - Base operations (list bases, get schema) - Record operations (CRUD operations) - Built-in rate limiting and retry logic - Comprehensive error handling """ def __init__( self, oauth_handler: OAuthHandler, base_url: str = AIRTABLE_BASE_URL, timeout: float = 30.0, max_retries: int = 3, ): """Initialize the Airtable client. Args: oauth_handler: OAuth handler for authentication base_url: Airtable API base URL timeout: Request timeout in seconds max_retries: Maximum number of retries for failed requests """ self.oauth_handler = oauth_handler self.base_url = base_url self.timeout = timeout self.max_retries = max_retries # Rate limiting state self._last_request_time = 0.0 self._request_count = 0 self._rate_limit_window_start = 0.0 async def _ensure_rate_limit(self) -> None: """Ensure we don't exceed rate limits.""" current_time = time.time() # Reset window if needed if current_time - self._rate_limit_window_start >= RATE_LIMIT_WINDOW: self._rate_limit_window_start = current_time self._request_count = 0 # If we've hit the limit, wait if self._request_count >= RATE_LIMIT_REQUESTS_PER_SECOND: wait_time = RATE_LIMIT_WINDOW - ( current_time - self._rate_limit_window_start ) if wait_time > 0: logger.debug(f"Rate limit reached, waiting {wait_time:.2f} seconds") await asyncio.sleep(wait_time) # Reset for new window self._rate_limit_window_start = time.time() self._request_count = 0 self._request_count += 1 self._last_request_time = current_time async def _make_request( self, method: str, endpoint: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None, response_model: Any | None = None, ) -> dict[str, Any] | Any: """Make an authenticated request to the Airtable API. Args: method: HTTP method endpoint: API endpoint data: Request body data params: Query parameters response_model: Pydantic model to parse response Returns: Parsed response data Raises: AirtableAPIError: For API errors AirtableAuthError: For authentication errors AirtableRateLimitError: For rate limit errors """ # Ensure valid authentication if not await self.oauth_handler.ensure_valid_token(): raise AirtableAuthError("Failed to obtain valid access token") # Apply rate limiting await self._ensure_rate_limit() url = f"{self.base_url}{endpoint}" # Prepare headers headers = self.oauth_handler.get_auth_headers() headers.update( { "Accept": "application/json", "Content-Type": "application/json", } ) # Prepare request kwargs request_kwargs = { "method": method, "url": url, "headers": headers, "timeout": self.timeout, } if data: request_kwargs["json"] = data if params: request_kwargs["params"] = params # Retry logic last_exception = None for attempt in range(self.max_retries + 1): try: async with httpx.AsyncClient() as client: response = await client.request(**request_kwargs) # Handle different status codes if response.status_code == 401: raise AirtableAuthError( "Authentication failed - invalid or expired token" ) elif response.status_code == 404: raise AirtableNotFoundError(f"Resource not found: {url}") elif response.status_code == 422: error_text = response.text raise AirtableValidationError(f"Validation error: {error_text}") elif response.status_code == 429: retry_after = int(response.headers.get("Retry-After", 30)) raise AirtableRateLimitError( "Rate limit exceeded", retry_after=retry_after ) elif not response.is_success: error_text = response.text raise AirtableAPIError( f"API request failed: {response.status_code} {response.reason_phrase}", status_code=response.status_code, response_text=error_text, ) # Parse response try: response_data = response.json() except Exception as e: raise AirtableAPIError( f"Failed to parse JSON response: {e}" ) from e # Validate response with Pydantic model if provided if response_model: try: return response_model.model_validate(response_data) except ValidationError as e: raise AirtableAPIError( f"Response validation error: {e}" ) from e return response_data except AirtableRateLimitError as e: # Handle rate limit with backoff if attempt < self.max_retries: wait_time = e.retry_after or (2**attempt) logger.warning(f"Rate limited, retrying in {wait_time} seconds") await asyncio.sleep(wait_time) continue raise except (httpx.RequestError, httpx.TimeoutException) as e: last_exception = e if attempt < self.max_retries: wait_time = 2**attempt # Exponential backoff logger.warning( f"Request failed (attempt {attempt + 1}), retrying in {wait_time} seconds: {e}" ) await asyncio.sleep(wait_time) continue break # If we get here, all retries failed raise AirtableAPIError( f"Request failed after {self.max_retries + 1} attempts: {last_exception}" ) async def list_bases(self) -> ListBasesResponse: """List all accessible Airtable bases. Returns: Response containing list of bases """ logger.info("Listing Airtable bases") return await self._make_request( "GET", "/v0/meta/bases", response_model=ListBasesResponse, ) async def get_base_schema(self, base_id: str) -> BaseSchemaResponse: """Get schema information for a specific base. Args: base_id: The Airtable base ID Returns: Response containing base schema (tables and fields) """ logger.info(f"Getting schema for base {base_id}") return await self._make_request( "GET", f"/v0/meta/bases/{base_id}/tables", response_model=BaseSchemaResponse, ) async def list_records( self, base_id: str, table_id: str, options: ListRecordsOptions | None = None, ) -> list[AirtableRecord]: """List records from a table with pagination support. Args: base_id: The Airtable base ID table_id: The table ID or name options: Options for filtering and pagination Returns: List of records """ logger.info(f"Listing records from {base_id}/{table_id}") all_records = [] offset = None while True: # Build query parameters params = {} if options: # Convert Pydantic model to dict and filter None values option_dict = options.model_dump(by_alias=True, exclude_none=True) # Handle array parameters specially for Airtable API for key, value in option_dict.items(): if key == "fields" and isinstance(value, list): # Airtable expects fields as multiple parameters: fields[]=field1&fields[]=field2 params[key] = value elif key == "sort" and isinstance(value, list): # Airtable expects sort as indexed parameters: sort[0][field]=Name&sort[0][direction]=asc for i, sort_item in enumerate(value): if isinstance(sort_item, dict): for sort_key, sort_value in sort_item.items(): params[f"sort[{i}][{sort_key}]"] = sort_value else: params[key] = value if offset: params["offset"] = offset # Debug: Log the parameters being sent to Airtable logger.debug(f"Sending request to Airtable with params: {params}") response = await self._make_request( "GET", f"/v0/{base_id}/{table_id}", params=params, response_model=ListRecordsResponse, ) all_records.extend(response.records) # Check if there are more records if not response.offset: break offset = response.offset # Respect maxRecords if specified if ( options and options.max_records and len(all_records) >= options.max_records ): all_records = all_records[: options.max_records] break return all_records async def get_record( self, base_id: str, table_id: str, record_id: str, ) -> AirtableRecord: """Get a specific record by ID. Args: base_id: The Airtable base ID table_id: The table ID or name record_id: The record ID Returns: The requested record """ logger.info(f"Getting record {record_id} from {base_id}/{table_id}") response = await self._make_request( "GET", f"/v0/{base_id}/{table_id}/{record_id}", response_model=AirtableRecord, ) return response async def create_records( self, base_id: str, table_id: str, records: list[dict[str, Any]], typecast: bool = False, ) -> list[AirtableRecord]: """Create new records in a table. Args: base_id: The Airtable base ID table_id: The table ID or name records: List of record data (each should have 'fields' key) typecast: Whether to enable automatic data conversion Returns: List of created records """ logger.info(f"Creating {len(records)} records in {base_id}/{table_id}") request_data = CreateRecordsRequest( records=records, typecast=typecast, ) response = await self._make_request( "POST", f"/v0/{base_id}/{table_id}", data=request_data.model_dump(by_alias=True, exclude_none=True), response_model=CreateRecordsResponse, ) return response.records async def update_records( self, base_id: str, table_id: str, records: list[dict[str, Any]], typecast: bool = False, ) -> list[AirtableRecord]: """Update existing records in a table. Args: base_id: The Airtable base ID table_id: The table ID or name records: List of record updates (each should have 'id' and 'fields' keys) typecast: Whether to enable automatic data conversion Returns: List of updated records """ logger.info(f"Updating {len(records)} records in {base_id}/{table_id}") request_data = UpdateRecordsRequest( records=records, typecast=typecast, ) response = await self._make_request( "PATCH", f"/v0/{base_id}/{table_id}", data=request_data.model_dump(by_alias=True, exclude_none=True), response_model=UpdateRecordsResponse, ) return response.records async def delete_records( self, base_id: str, table_id: str, record_ids: list[str], ) -> list[str]: """Delete records from a table. Args: base_id: The Airtable base ID table_id: The table ID or name record_ids: List of record IDs to delete Returns: List of deleted record IDs """ logger.info(f"Deleting {len(record_ids)} records from {base_id}/{table_id}") params = {"records[]": record_ids} response = await self._make_request( "DELETE", f"/v0/{base_id}/{table_id}", params=params, response_model=DeleteRecordsResponse, ) return [record["id"] for record in response.records] async def search_records( self, base_id: str, table_id: str, filter_by_formula: str, options: ListRecordsOptions | None = None, ) -> list[AirtableRecord]: """Search records using a formula filter. Args: base_id: The Airtable base ID table_id: The table ID or name filter_by_formula: Airtable formula for filtering options: Additional options for the search Returns: List of matching records """ logger.info( f"Searching records in {base_id}/{table_id} with formula: {filter_by_formula}" ) # Merge filter with existing options search_options = options or ListRecordsOptions() search_options.filter_by_formula = filter_by_formula return await self.list_records(base_id, table_id, search_options)

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/onimsha/airtable-mcp-server-oauth'

If you have feedback or need assistance with the MCP directory API, please join our Discord server