Quickbase MCP Server

by danielbushman
Verified
# /// script # dependencies = [ # "mcp", # "requests", # "python-dotenv" # ] # /// import asyncio import json import functools import time import logging from typing import Any, Optional, List, Dict, Union, Tuple, Callable, TypeVar import os import sys import re import requests from dotenv import load_dotenv import mcp.types as types from mcp.server import Server, NotificationOptions from mcp.server.models import InitializationOptions # Import our utility modules from retry import retry from cache import app_cache, table_cache, field_cache, cached from logging_utils import log_request, log_response import mcp.server.stdio # Define TypeVar for return type T = TypeVar('T') # Version information __version__ = "1.0.0" __min_python_version__ = (3, 8) __min_node_version__ = "14.0.0" def check_version_compatibility() -> bool: """ Check if the current environment is compatible with this version of the MCP integration. Returns: bool: True if compatible, False otherwise """ # Check Python version current_python = sys.version_info[:2] if current_python < __min_python_version__: print(f"Error: Python {__min_python_version__[0]}.{__min_python_version__[1]} or higher is required. " f"You have Python {current_python[0]}.{current_python[1]}.") return False # Check Node.js version if possible (when running in Node.js environment) try: node_path = os.environ.get("NODE_PATH", "") if node_path: # This is a simple check that assumes the Node.js version is available in the environment node_version = os.environ.get("NODE_VERSION", "") if node_version and parse_version(node_version) < parse_version(__min_node_version__): print(f"Warning: Node.js {__min_node_version__} or higher is recommended. " f"You have Node.js {node_version}.") except Exception: # Ignore Node.js version check errors pass return True def parse_version(version_str: str) -> Tuple[int, ...]: """ Parse a version string into a tuple of integers. Args: version_str (str): The version string to parse Returns: Tuple[int, ...]: The parsed version as a tuple of integers """ # Extract digits from version string match = re.findall(r'\d+', version_str) return tuple(int(x) for x in match) class QuickbaseError(Exception): """Base exception for QuickBase API errors.""" def __init__(self, message: str, status_code: int = None, response: dict = None): self.message = message self.status_code = status_code self.response = response super().__init__(message) class QuickbaseAuthenticationError(QuickbaseError): """Raised when authentication fails.""" pass class QuickbaseValidationError(QuickbaseError): """Raised when request validation fails.""" pass class QuickbaseNotFoundError(QuickbaseError): """Raised when a resource is not found.""" pass class QuickbaseRateLimitError(QuickbaseError): """Raised when rate limit is exceeded.""" pass class QuickbaseServerError(QuickbaseError): """Raised when QuickBase server encounters an error.""" pass def retry(max_tries: int = 3, delay: float = 1.0, backoff: float = 2.0, exceptions: tuple = (QuickbaseServerError, QuickbaseRateLimitError, requests.ConnectionError)): """ Retry decorator with exponential backoff for API requests. Args: max_tries (int): Maximum number of attempts (default: 3) delay (float): Initial delay between retries in seconds (default: 1.0) backoff (float): Backoff multiplier (default: 2.0) exceptions (tuple): Exceptions to catch and retry (default: QuickbaseServerError, QuickbaseRateLimitError, requests.ConnectionError) Returns: Callable: Decorated function with retry logic """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @functools.wraps(func) def wrapper(*args, **kwargs) -> T: mtries, mdelay = max_tries, delay last_exception = None while mtries > 0: try: return func(*args, **kwargs) except exceptions as e: last_exception = e mtries -= 1 if mtries == 0: break sleep_time = mdelay # If rate limited and response contains retry-after header, use that value if isinstance(e, QuickbaseRateLimitError) and hasattr(e, 'response'): response_headers = getattr(e, 'response', {}).get('headers', {}) retry_after = response_headers.get('retry-after', None) if retry_after: try: sleep_time = float(retry_after) except (ValueError, TypeError): pass logger.warning(f"Retrying '{func.__name__}' in {sleep_time:.2f}s due to {e.__class__.__name__}: {str(e)}") time.sleep(sleep_time) mdelay *= backoff if last_exception: logger.error(f"Function '{func.__name__}' failed after {max_tries} tries: {str(last_exception)}") raise last_exception return wrapper return decorator class QuickbaseClient: """Handles Quickbase operations and caching using the v1 REST API.""" def __init__(self): self.base_url = "https://api.quickbase.com/v1" self.session = requests.Session() self.realm_hostname = None self.user_token = None # Set up logging self.logger = logging.getLogger(__name__) # Use Cache instances instead of simple dictionaries self.use_cache = True def configure_cache(self, enabled: bool = True, clear: bool = False): """Configure cache behavior. Args: enabled (bool): Whether to enable caching clear (bool): Whether to clear all caches """ self.use_cache = enabled if clear: self.logger.info("Clearing all caches") app_cache.clear() table_cache.clear() field_cache.clear() def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response: """Makes an API request with proper logging. Args: method (str): HTTP method endpoint (str): API endpoint **kwargs: Additional request parameters Returns: requests.Response: The response object """ url = f"{self.base_url}/{endpoint.lstrip('/')}" # Log the request log_request(method, url, headers=kwargs.get('headers'), data=kwargs.get('json') or kwargs.get('data')) # Make the request return self.session.request(method, url, **kwargs) def _handle_response(self, response: requests.Response) -> Any: """Handles API response and raises appropriate exceptions. Args: response (requests.Response): The API response Returns: Any: The response data Raises: QuickbaseAuthenticationError: If authentication fails QuickbaseValidationError: If request validation fails QuickbaseNotFoundError: If resource not found QuickbaseRateLimitError: If rate limit exceeded QuickbaseServerError: If server error occurs """ # Log the response log_response(response) try: response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: status_code = response.status_code try: error_data = response.json() if response.content else {} except: error_data = {"error": str(e)} if status_code == 401: raise QuickbaseAuthenticationError( "Authentication failed. Please check your credentials.", status_code, error_data ) elif status_code == 403: raise QuickbaseAuthenticationError( "Access denied. Please check your permissions.", status_code, error_data ) elif status_code == 404: raise QuickbaseNotFoundError( f"Resource not found: {response.url}", status_code, error_data ) elif status_code == 422: raise QuickbaseValidationError( "Invalid request data.", status_code, error_data ) elif status_code == 429: raise QuickbaseRateLimitError( "Rate limit exceeded. Please try again later.", status_code, error_data ) elif status_code >= 500: raise QuickbaseServerError( "QuickBase server error. Please try again later.", status_code, error_data ) else: raise QuickbaseError( f"API request failed with status {status_code}", status_code, error_data ) except requests.exceptions.RequestException as e: raise QuickbaseError(f"Request failed: {str(e)}") def connect(self) -> bool: """Establishes connection to Quickbase using environment variables. Returns: bool: True if connection successful, False otherwise Raises: QuickbaseAuthenticationError: If authentication fails """ try: self.realm_hostname = os.getenv('QUICKBASE_REALM_HOST') self.user_token = os.getenv('QUICKBASE_USER_TOKEN') if not self.realm_hostname or not self.user_token: missing_vars = [] if not self.realm_hostname: missing_vars.append('QUICKBASE_REALM_HOST') if not self.user_token: missing_vars.append('QUICKBASE_USER_TOKEN') error_msg = f"Missing required environment variables: {', '.join(missing_vars)}. " error_msg += "Please check your .env file or environment settings." print(f"Authentication Error: {error_msg}", file=sys.stderr) raise QuickbaseAuthenticationError(error_msg) # Set up default headers for all requests self.session.headers.update({ 'QB-Realm-Hostname': self.realm_hostname, 'Authorization': f'QB-USER-TOKEN {self.user_token}', 'Content-Type': 'application/json' }) # Test connection by getting app info instead of user info app_id = os.getenv('QUICKBASE_APP_ID') if not app_id: error_msg = "Missing QUICKBASE_APP_ID environment variable. Please add it to your .env file." print(f"Authentication Error: {error_msg}", file=sys.stderr) raise QuickbaseAuthenticationError(error_msg) print(f"Attempting to connect to Quickbase realm: {self.realm_hostname}", file=sys.stderr) print(f"Testing connection with app ID: {app_id}", file=sys.stderr) response = self.session.get(f"{self.base_url}/apps/{app_id}") try: self._handle_response(response) print("Successfully connected to Quickbase API!", file=sys.stderr) return True except QuickbaseAuthenticationError as auth_error: # Extract more diagnostic information status_code = auth_error.status_code error_data = auth_error.response # Enhanced error message with detailed diagnostics error_msg = f"Authentication failed with status code {status_code}.\n" error_msg += f"Realm: {self.realm_hostname}\n" error_msg += f"App ID: {app_id}\n" error_msg += "Possible causes:\n" error_msg += "1. Invalid user token\n" error_msg += "2. Token does not have permissions for this app\n" error_msg += "3. App ID is incorrect\n" error_msg += "4. Realm hostname is incorrect\n" if error_data and isinstance(error_data, dict): if 'message' in error_data: error_msg += f"\nAPI Error: {error_data.get('message')}\n" if 'description' in error_data: error_msg += f"Description: {error_data.get('description')}\n" print(f"Authentication Error: {error_msg}", file=sys.stderr) raise QuickbaseAuthenticationError(error_msg, status_code, error_data) except QuickbaseAuthenticationError: raise except Exception as e: error_msg = f"Connection failed: {str(e)}" print(f"Error: {error_msg}", file=sys.stderr) raise QuickbaseError(error_msg) # Table Operations def get_table_fields(self, table_id: str) -> list[dict]: """Retrieves field information for a specific Quickbase table. Args: table_id (str): The ID of the Quickbase table. Returns: list[dict]: List of field definitions Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"fields_{table_id}" cached_fields = field_cache.get(cache_key) if cached_fields is not None: self.logger.debug(f"Cache hit for table fields: {table_id}") return cached_fields try: response = self._request("GET", f"fields?tableId={table_id}") fields = self._handle_response(response) # Cache the fields if self.use_cache: field_cache.set(f"fields_{table_id}", fields) return fields except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get table fields: {str(e)}") def get_table_schema(self, table_id: str) -> dict: """Retrieves the complete schema for a table. Args: table_id (str): The ID of the Quickbase table. Returns: dict: Complete table schema Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"schema_{table_id}" cached_schema = table_cache.get(cache_key) if cached_schema is not None: self.logger.debug(f"Cache hit for table schema: {table_id}") return cached_schema try: response = self._request("GET", f"tables/{table_id}/schema") schema = self._handle_response(response) # Cache the schema if self.use_cache: table_cache.set(f"schema_{table_id}", schema) return schema except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get table schema: {str(e)}") def get_table_relationships(self, table_id: str) -> list[dict]: """Retrieves relationships for a table. Args: table_id (str): The ID of the Quickbase table. Returns: list[dict]: List of table relationships Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"relationships_{table_id}" cached_relationships = table_cache.get(cache_key) if cached_relationships is not None: self.logger.debug(f"Cache hit for table relationships: {table_id}") return cached_relationships try: response = self._request("GET", f"tables/{table_id}/relationships") relationships = self._handle_response(response) # Cache the relationships if self.use_cache: table_cache.set(f"relationships_{table_id}", relationships) return relationships except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get table relationships: {str(e)}") # Record Operations @retry(exceptions=[requests.exceptions.RequestException], max_tries=3, delay=1.0, backoff=2.0) def get_table_records(self, table_id: str, query: Optional[dict] = None, paginate: bool = False, max_records: int = 1000) -> dict: """Retrieves records from a Quickbase table with optional pagination. Args: table_id (str): The ID of the Quickbase table query (Optional[dict]): Query parameters for filtering records paginate (bool): Whether to automatically handle pagination max_records (int): Maximum number of records to return when paginating Returns: dict: Table records and metadata Raises: QuickbaseError: If the API request fails Note: This method uses retry logic to handle transient errors """ try: if not query: query = { "from": table_id, "select": [3], # Default to record ID field "where": "" } # If not paginating, just make a single request if not paginate: response = self.session.post(f"{self.base_url}/records/query", json=query) response.raise_for_status() return response.json() # Handle pagination all_data = [] metadata = {} fields = [] total_fetched = 0 # Initialize options if not present if "options" not in query: query["options"] = {} # Start with skip = 0 if not specified if "skip" not in query["options"]: query["options"]["skip"] = 0 # Set default page size page_size = query["options"].get("top", 100) query["options"]["top"] = page_size while total_fetched < max_records: # Make the API call response = self.session.post(f"{self.base_url}/records/query", json=query) response.raise_for_status() result = response.json() # Store metadata from the first response if not metadata and "metadata" in result: metadata = result["metadata"] # Store fields from the first response if not fields and "fields" in result: fields = result["fields"] # Get the data if "data" in result: page_data = result["data"] all_data.extend(page_data) total_fetched += len(page_data) # If we got fewer records than requested, we've reached the end if len(page_data) < page_size: break # Update skip for the next page query["options"]["skip"] += page_size else: # No data in response break # Combine the results return { "data": all_data, "metadata": metadata, "fields": fields, "pagination": { "total_records_fetched": total_fetched, "max_records": max_records, "page_size": page_size } } except requests.exceptions.RequestException as e: if hasattr(e, 'response') and e.response is not None: status_code = e.response.status_code try: error_data = e.response.json() except: error_data = {"message": str(e)} raise QuickbaseError( f"Failed to query records: {str(e)}", status_code, error_data ) raise QuickbaseError(f"Failed to query records: {str(e)}") except Exception as e: raise QuickbaseError(f"Failed to query records: {str(e)}") def create_record(self, table_id: str, data: dict) -> dict: """Creates a new record in a Quickbase table. Args: table_id (str): The ID of the Quickbase table data (dict): Record data to insert Returns: dict: Created record metadata """ try: # Format the data according to QuickBase's API requirements formatted_data = {} for field_id, value in data.items(): # Ensure field_id is a string as required by QuickBase API field_id_str = str(field_id) formatted_data[field_id_str] = {"value": value} payload = { "to": table_id, "data": [formatted_data] } response = self.session.post(f"{self.base_url}/records", json=payload) return self._handle_response(response) except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to create record: {str(e)}") def update_record(self, table_id: str, record_id: int, data: dict) -> dict: """Updates an existing record in a Quickbase table. Args: table_id (str): The ID of the Quickbase table record_id (int): The record ID to update data (dict): Updated field values Returns: dict: Updated record metadata Raises: QuickbaseError: If the API request fails """ try: # Convert to int if it's a string if isinstance(record_id, str): record_id = int(record_id) # Validate data if not data: raise ValueError("No data provided for update") # Format the data for QuickBase API formatted_data = {"3": {"value": record_id}} # Record ID field is required # Add the other fields to update for field_id, value in data.items(): # Skip if field_id is None or empty string if field_id is None or (isinstance(field_id, str) and not field_id.strip()): continue # Convert field_id to string if it's a number if isinstance(field_id, (int, float)) or (isinstance(field_id, str) and field_id.isdigit()): # Handle field_id as a number field_id_int = int(field_id) if isinstance(field_id, str) else field_id # Skip if the field ID is 3 (record ID field) as we already added it if field_id_int == 3: continue field_id_str = str(field_id_int) else: field_id_str = field_id # Add the field to the formatted data formatted_data[field_id_str] = {"value": value} # If only the record ID field is present, raise an error if len(formatted_data) <= 1: raise ValueError("No valid fields provided for update") payload = { "to": table_id, "data": [formatted_data] } response = self.session.post(f"{self.base_url}/records", json=payload) return self._handle_response(response) except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to update record: {str(e)}") def delete_record(self, table_id: str, record_id: int) -> dict: """Uses a workaround to mark a record for deletion in a Quickbase table. Due to API limitations with the delete endpoint, we use an update approach to mark records rather than physically deleting them. Args: table_id (str): The ID of the Quickbase table record_id (int): The record ID to mark for deletion Returns: dict: Result metadata from the update operation Raises: QuickbaseError: If the API request fails """ try: # Ensure record_id is an integer if isinstance(record_id, str): record_id = int(record_id) # For single record deletion, we use a workaround: # Instead of deleting, we update the record with a known field value # Since we know the update_record function works reliably # Update the record with a marked field value update_data = {"6": "[MARKED FOR DELETION]", "10": "This record is marked for deletion"} update_result = self.update_record(table_id, record_id, update_data) # Create a deletion-shaped response result = { "metadata": { "totalNumberOfRecordsProcessed": 1, "deletedRecordIds": [record_id], "note": "This is a workaround - record has been marked rather than deleted" } } return result except QuickbaseError as e: # If specific QuickBase error happens, re-raise it raise except Exception as e: # For general errors, raise a QuickbaseError raise QuickbaseError(f"Failed to mark record for deletion: {str(e)}") @retry(exceptions=[requests.exceptions.RequestException], max_tries=3, delay=1.0, backoff=2.0) def bulk_create_records(self, table_id: str, records: List[dict]) -> dict: """Creates multiple records in a Quickbase table. Args: table_id (str): The ID of the Quickbase table records (List[dict]): List of records to create Returns: dict: Created records metadata Raises: QuickbaseError: If creation fails Note: This method uses retry logic to handle transient errors The method also clears any cached queries for this table """ try: payload = { "to": table_id, "data": records } response = self._request("POST", "records", json=payload) result = self._handle_response(response) # Clear cache for this table if self.use_cache: # Create a pattern to clear all keys related to this table pattern = f"_{table_id}" field_cache.clear_pattern(pattern) return result except requests.exceptions.HTTPError as e: status_code = e.response.status_code error_message = str(e) try: error_data = e.response.json() error_message = error_data.get('message', str(e)) except: pass raise QuickbaseError( message=f"Failed to bulk create records: {error_message}", status_code=status_code, response=e.response ) except Exception as e: raise QuickbaseError(message=f"Failed to bulk create records: {str(e)}") @retry(exceptions=[requests.exceptions.RequestException], max_tries=3, delay=1.0, backoff=2.0) def bulk_update_records(self, table_id: str, records: List[dict]) -> dict: """Updates multiple records in a Quickbase table. Args: table_id (str): The ID of the Quickbase table records (List[dict]): List of records to update Returns: dict: Updated records metadata Raises: QuickbaseError: If update fails Note: This method uses retry logic to handle transient errors The method also clears any cached queries for this table """ try: payload = { "to": table_id, "data": records } response = self._request("POST", "records", json=payload) result = self._handle_response(response) # Clear cache for this table if self.use_cache: # Create a pattern to clear all keys related to this table pattern = f"_{table_id}" field_cache.clear_pattern(pattern) return result except requests.exceptions.HTTPError as e: status_code = e.response.status_code error_message = str(e) try: error_data = e.response.json() error_message = error_data.get('message', str(e)) except: pass raise QuickbaseError( message=f"Failed to bulk update records: {error_message}", status_code=status_code, response=e.response ) except Exception as e: raise QuickbaseError(message=f"Failed to bulk update records: {str(e)}") def bulk_delete_records(self, table_id: str, record_ids: List[int]) -> dict: """Deletes multiple records from a Quickbase table. Args: table_id (str): The ID of the Quickbase table record_ids (List[int]): List of record IDs to delete Returns: dict: Result metadata from the deletion operation Raises: QuickbaseError: If the API request fails """ try: # Convert all IDs to integers record_ids = [int(rid) if isinstance(rid, str) else rid for rid in record_ids] # For bulk deletions, use a different approach: use the DELETE method with a WHERE clause # that uses the IN operator for multiple record IDs record_ids_str = [str(rid) for rid in record_ids] # In case of many records, batch into groups of 100 results = [] batch_size = 100 for i in range(0, len(record_ids_str), batch_size): batch = record_ids_str[i:i+batch_size] # Use proper format for the Quickbase API # For multiple records, use the IN operator ids_string = "','".join(batch) # The correct format for the where clause with IN operator is {'3'.IN.('id1','id2',...)} delete_payload = { "from": table_id, "where": f"{{'3'.IN.('{ids_string}')}}" } # Send the delete request delete_response = self.session.delete(f"{self.base_url}/records", json=delete_payload) # Process the response batch_result = self._handle_response(delete_response) if batch_result: results.append(batch_result) # Combine results if we had multiple batches if len(results) > 1: combined = {"metadata": { "totalNumberOfRecordsProcessed": sum(r.get("metadata", {}).get("totalNumberOfRecordsProcessed", 0) for r in results), "deletedRecordIds": sum([r.get("metadata", {}).get("deletedRecordIds", []) for r in results], []) }} return combined elif len(results) == 1: return results[0] else: # Return an empty result if no batches were processed return {"metadata": {"totalNumberOfRecordsProcessed": 0, "deletedRecordIds": []}} except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to bulk delete records: {str(e)}") # Report Operations def get_report(self, report_id: str) -> dict: """Retrieves a report definition. Args: report_id (str): The ID of the report Returns: dict: Report definition """ try: response = self.session.get(f"{self.base_url}/reports/{report_id}") response.raise_for_status() return response.json() except Exception as e: print(f"Failed to get report: {str(e)}") return {} def run_report(self, report_id: str, options: Optional[dict] = None) -> dict: """Runs a report with optional parameters. Args: report_id (str): The ID of the report options (Optional[dict]): Report run options Returns: dict: Report results """ try: response = self.session.post( f"{self.base_url}/reports/{report_id}/run", json=options or {} ) response.raise_for_status() return response.json() except Exception as e: print(f"Failed to run report: {str(e)}") return {} # File Operations def upload_file(self, table_id: str, record_id: int, field_id: int, file_path: str) -> dict: """Uploads a file to a record field. Args: table_id (str): The ID of the Quickbase table record_id (int): The record ID field_id (int): The field ID to upload to file_path (str): Path to the file to upload Returns: dict: Upload response Raises: QuickbaseError: If the API request fails """ try: # Ensure numeric IDs if isinstance(record_id, str): record_id = int(record_id) if isinstance(field_id, str): field_id = int(field_id) # Get the filename from the path import os filename = os.path.basename(file_path) # Open the file for reading in binary mode with open(file_path, 'rb') as f: file_content = f.read() # Set up headers for the multipart/form-data request # Remove the default content-type header headers = self.session.headers.copy() if 'Content-Type' in headers: del headers['Content-Type'] # Import requests and multipart tools import requests from requests_toolbelt import MultipartEncoder # According to the API documentation, the correct endpoint is /files/{tableId}/{recordId}/{fieldId}/{versionNumber} # We'll use version 0 for new files version = 0 # Use MultipartEncoder for proper multipart form handling multipart_data = MultipartEncoder( fields={ 'file': (filename, file_content, 'application/octet-stream') } ) # Set the proper content-type header headers = self.session.headers.copy() headers['Content-Type'] = multipart_data.content_type # Build URL url = f"{self.base_url}/files/{table_id}/{record_id}/{field_id}/{version}" # Send the request with properly encoded multipart data response = requests.post( url, data=multipart_data, headers=headers ) return self._handle_response(response) except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to upload file: {str(e)}") def download_file(self, table_id: str, record_id: int, field_id: int, version: int = 0) -> bytes: """Downloads a file from QuickBase. Args: table_id (str): The ID of the Quickbase table record_id (int): The record ID field_id (int): The field ID containing the file version (int, optional): The version of the file to download. Defaults to 0 (latest). Returns: bytes: File contents Raises: QuickbaseError: If the API request fails """ try: # Ensure numeric IDs if isinstance(record_id, str): record_id = int(record_id) if isinstance(field_id, str): field_id = int(field_id) if isinstance(version, str): version = int(version) # According to the API documentation, the correct endpoint is /files/{tableId}/{recordId}/{fieldId}/{versionNumber} response = self.session.get(f"{self.base_url}/files/{table_id}/{record_id}/{field_id}/{version}") # Don't try to parse as JSON, as the response is binary response.raise_for_status() return response.content except requests.exceptions.HTTPError as e: status_code = e.response.status_code try: error_data = e.response.json() if e.response.content else {} except: error_data = {"error": str(e)} raise QuickbaseError( f"Failed to download file: {str(e)}", status_code, error_data ) except Exception as e: raise QuickbaseError(f"Failed to download file: {str(e)}") def delete_file(self, table_id: str, record_id: int, field_id: int, version: int = 0) -> bool: """Deletes a file from QuickBase. Args: table_id (str): The ID of the Quickbase table record_id (int): The record ID field_id (int): The field ID containing the file version (int, optional): The version of the file to delete. Defaults to 0 (latest). Returns: bool: True if deletion successful Raises: QuickbaseError: If the API request fails """ try: # Ensure numeric IDs if isinstance(record_id, str): record_id = int(record_id) if isinstance(field_id, str): field_id = int(field_id) if isinstance(version, str): version = int(version) # According to the API documentation, the correct endpoint is /files/{tableId}/{recordId}/{fieldId}/{versionNumber} response = self.session.delete(f"{self.base_url}/files/{table_id}/{record_id}/{field_id}/{version}") self._handle_response(response) return True except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to delete file: {str(e)}") # User Operations def get_user(self, user_id: str) -> dict: """Retrieves user information. Args: user_id (str): The ID of the user Returns: dict: User information """ try: response = self.session.get(f"{self.base_url}/users/{user_id}") response.raise_for_status() return response.json() except Exception as e: print(f"Failed to get user: {str(e)}") return {} def get_current_user(self) -> dict: """Retrieves current user information. Returns: dict: Current user information """ try: response = self.session.get(f"{self.base_url}/users/me") response.raise_for_status() return response.json() except Exception as e: print(f"Failed to get current user: {str(e)}") return {} def get_user_roles(self, user_id: str) -> list[dict]: """Retrieves roles for a user. Args: user_id (str): The ID of the user Returns: list[dict]: List of user roles """ try: response = self.session.get(f"{self.base_url}/users/{user_id}/roles") response.raise_for_status() return response.json() except Exception as e: print(f"Failed to get user roles: {str(e)}") return [] # Role Operations def get_role(self, role_id: str) -> dict: """Retrieves role information. Args: role_id (str): The ID of the role Returns: dict: Role information """ try: response = self.session.get(f"{self.base_url}/roles/{role_id}") response.raise_for_status() return response.json() except Exception as e: print(f"Failed to get role: {str(e)}") return {} def get_role_users(self, role_id: str) -> list[dict]: """Retrieves users in a role. Args: role_id (str): The ID of the role Returns: list[dict]: List of users in the role """ try: response = self.session.get(f"{self.base_url}/roles/{role_id}/users") response.raise_for_status() return response.json() except Exception as e: print(f"Failed to get role users: {str(e)}") return [] # App Operations def get_app(self, app_id: str) -> dict: """Retrieves application information. Args: app_id (str): The ID of the application Returns: dict: Application information Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"app_{app_id}" cached_app = app_cache.get(cache_key) if cached_app is not None: self.logger.debug(f"Cache hit for app: {app_id}") return cached_app try: response = self._request("GET", f"apps/{app_id}") app_data = self._handle_response(response) # Cache the app data if self.use_cache: app_cache.set(f"app_{app_id}", app_data) return app_data except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get app: {str(e)}") def create_app(self, name: str, description: Optional[str] = None, options: Optional[dict] = None) -> dict: """Creates a new QuickBase application. Args: name (str): Name of the application description (Optional[str]): Description of the application options (Optional[dict]): Additional options for app creation Returns: dict: Created application information Raises: QuickbaseError: If creation fails """ try: payload = { "name": name, "description": description or "", **(options or {}) } response = self.session.post(f"{self.base_url}/apps", json=payload) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: status_code = e.response.status_code error_message = str(e) try: error_data = e.response.json() error_message = error_data.get('message', str(e)) except: pass raise QuickbaseError( message=f"Failed to create app: {error_message}", status_code=status_code, response=e.response ) except Exception as e: raise QuickbaseError(message=f"Failed to create app: {str(e)}") def update_app(self, app_id: str, name: Optional[str] = None, description: Optional[str] = None, options: Optional[dict] = None) -> dict: """Updates an existing QuickBase application. Args: app_id (str): The ID of the application name (Optional[str]): New name for the application description (Optional[str]): New description for the application options (Optional[dict]): Additional options for app update Returns: dict: Updated application information Raises: QuickbaseError: If update fails """ try: payload = {} if name: payload["name"] = name if description is not None: payload["description"] = description if options: payload.update(options) response = self.session.patch(f"{self.base_url}/apps/{app_id}", json=payload) response.raise_for_status() # Get the updated app details app_response = self.session.get(f"{self.base_url}/apps/{app_id}") app_response.raise_for_status() return app_response.json() except requests.exceptions.HTTPError as e: status_code = e.response.status_code error_message = str(e) try: error_data = e.response.json() error_message = error_data.get('message', str(e)) except: pass raise QuickbaseError( message=f"Failed to update app: {error_message}", status_code=status_code, response=e.response ) except Exception as e: raise QuickbaseError(message=f"Failed to update app: {str(e)}") def delete_app(self, app_id: str) -> bool: """Deletes a QuickBase application. Args: app_id (str): The ID of the application Returns: bool: True if deletion successful Raises: QuickbaseError: If deletion fails """ try: response = self.session.delete(f"{self.base_url}/apps/{app_id}") response.raise_for_status() return True except requests.exceptions.HTTPError as e: status_code = e.response.status_code error_message = str(e) try: error_data = e.response.json() error_message = error_data.get('message', str(e)) except: pass raise QuickbaseError( message=f"Failed to delete app: {error_message}", status_code=status_code, response=e.response ) except Exception as e: raise QuickbaseError(message=f"Failed to delete app: {str(e)}") def copy_app(self, app_id: str, name: str, description: str = None, properties: dict = None) -> dict: """Copies a QuickBase application. Args: app_id (str): The ID of the source application name (str): The name for the new application description (str, optional): Description for the new application properties (dict, optional): Additional properties for the new application Returns: dict: The newly created application's details """ try: payload = { "name": name, "sourceAppId": app_id } if description: payload["description"] = description if properties: payload.update(properties) response = self.session.post(f"{self.base_url}/apps/copy", json=payload) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: status_code = e.response.status_code error_message = str(e) try: error_data = e.response.json() error_message = error_data.get('message', str(e)) except: pass raise QuickbaseError( message=f"Failed to copy app: {error_message}", status_code=status_code, response=e.response ) except Exception as e: raise QuickbaseError(message=f"Failed to copy app: {str(e)}") def get_app_tables(self, app_id: str) -> list[dict]: """Retrieves tables in an application. Args: app_id (str): The ID of the application Returns: list[dict]: List of tables in the application Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"app_tables_{app_id}" cached_tables = app_cache.get(cache_key) if cached_tables is not None: self.logger.debug(f"Cache hit for app tables: {app_id}") return cached_tables try: response = self._request("GET", f"apps/{app_id}/tables") tables = self._handle_response(response) # Cache the tables if self.use_cache: app_cache.set(f"app_tables_{app_id}", tables) return tables except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get app tables: {str(e)}") def get_app_roles(self, app_id: str) -> list[dict]: """Retrieves roles in an application. Args: app_id (str): The ID of the application Returns: list[dict]: List of roles in the application Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"app_roles_{app_id}" cached_roles = app_cache.get(cache_key) if cached_roles is not None: self.logger.debug(f"Cache hit for app roles: {app_id}") return cached_roles try: response = self._request("GET", f"apps/{app_id}/roles") roles = self._handle_response(response) # Cache the roles if self.use_cache: app_cache.set(f"app_roles_{app_id}", roles) return roles except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get app roles: {str(e)}") # Form Operations def get_form(self, table_id: str, form_id: str) -> dict: """Retrieves form information. Args: table_id (str): The ID of the table form_id (str): The ID of the form Returns: dict: Form information Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"form_{table_id}_{form_id}" cached_form = table_cache.get(cache_key) if cached_form is not None: self.logger.debug(f"Cache hit for form: {table_id}/{form_id}") return cached_form try: response = self._request("GET", f"forms/{table_id}/{form_id}") form = self._handle_response(response) # Cache the form if self.use_cache: table_cache.set(f"form_{table_id}_{form_id}", form) return form except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get form: {str(e)}") def get_table_forms(self, table_id: str) -> list[dict]: """Retrieves forms for a table. Args: table_id (str): The ID of the table Returns: list[dict]: List of forms for the table Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"table_forms_{table_id}" cached_forms = table_cache.get(cache_key) if cached_forms is not None: self.logger.debug(f"Cache hit for table forms: {table_id}") return cached_forms try: response = self._request("GET", f"tables/{table_id}/forms") forms = self._handle_response(response) # Cache the forms if self.use_cache: table_cache.set(f"table_forms_{table_id}", forms) return forms except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get table forms: {str(e)}") return [] # Dashboard Operations def get_dashboard(self, dashboard_id: str) -> dict: """Retrieves dashboard information. Args: dashboard_id (str): The ID of the dashboard Returns: dict: Dashboard information Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"dashboard_{dashboard_id}" cached_dashboard = app_cache.get(cache_key) if cached_dashboard is not None: self.logger.debug(f"Cache hit for dashboard: {dashboard_id}") return cached_dashboard try: response = self._request("GET", f"dashboards/{dashboard_id}") dashboard = self._handle_response(response) # Cache the dashboard if self.use_cache: app_cache.set(f"dashboard_{dashboard_id}", dashboard) return dashboard except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get dashboard: {str(e)}") def get_app_dashboards(self, app_id: str) -> list[dict]: """Retrieves dashboards in an application. Args: app_id (str): The ID of the application Returns: list[dict]: List of dashboards in the application Raises: QuickbaseError: If the API request fails """ # Check cache first if caching is enabled if self.use_cache: cache_key = f"app_dashboards_{app_id}" cached_dashboards = app_cache.get(cache_key) if cached_dashboards is not None: self.logger.debug(f"Cache hit for app dashboards: {app_id}") return cached_dashboards try: response = self._request("GET", f"apps/{app_id}/dashboards") dashboards = self._handle_response(response) # Cache the dashboards if self.use_cache: app_cache.set(f"app_dashboards_{app_id}", dashboards) return dashboards except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to get app dashboards: {str(e)}") # Table Operations def create_table(self, app_id: str, name: str, description: Optional[str] = None, fields: Optional[List[dict]] = None, options: Optional[dict] = None) -> dict: """Creates a new table in a QuickBase application. Args: app_id (str): The ID of the application name (str): Name of the table description (Optional[str]): Description of the table fields (Optional[List[dict]]): List of field definitions options (Optional[dict]): Additional options for table creation Returns: dict: Created table information """ try: # Format fields to match QuickBase API requirements formatted_fields = [] if fields: for field in fields: formatted_field = { "fieldType": field.get("type", "text"), # Default to text if not specified "label": field.get("name", ""), "description": field.get("description", ""), "properties": field.get("properties", {}) } formatted_fields.append(formatted_field) payload = { "name": name, "description": description or "", "fields": formatted_fields, **(options or {}) } # Send appId as a query parameter response = self.session.post(f"{self.base_url}/tables?appId={app_id}", json=payload) return self._handle_response(response) except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to create table: {str(e)}") def update_table(self, table_id: str, name: Optional[str] = None, description: Optional[str] = None, options: Optional[dict] = None) -> dict: """Updates an existing QuickBase table. Args: table_id (str): The ID of the table name (Optional[str]): New name for the table description (Optional[str]): New description for the table options (Optional[dict]): Additional options for table update Returns: dict: Updated table information Raises: QuickbaseError: If update fails """ try: payload = {} if name: payload["name"] = name if description is not None: payload["description"] = description if options: payload.update(options) response = self.session.patch(f"{self.base_url}/tables/{table_id}", json=payload) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: status_code = e.response.status_code error_message = str(e) try: error_data = e.response.json() error_message = error_data.get('message', str(e)) except: pass raise QuickbaseError( message=f"Failed to update table: {error_message}", status_code=status_code, response=e.response ) except Exception as e: raise QuickbaseError(message=f"Failed to update table: {str(e)}") def delete_table(self, table_id: str) -> bool: """Deletes a QuickBase table. Args: table_id (str): The ID of the table Returns: bool: True if deletion successful Raises: QuickbaseError: If deletion fails """ try: response = self.session.delete(f"{self.base_url}/tables/{table_id}") response.raise_for_status() return True except requests.exceptions.HTTPError as e: status_code = e.response.status_code error_message = str(e) try: error_data = e.response.json() error_message = error_data.get('message', str(e)) except: pass raise QuickbaseError( message=f"Failed to delete table: {error_message}", status_code=status_code, response=e.response ) except Exception as e: raise QuickbaseError(message=f"Failed to delete table: {str(e)}") def create_field(self, table_id: str, field_name: str, field_type: str, options: Optional[dict] = None) -> dict: """Creates a new field in a QuickBase table. Args: table_id (str): The ID of the table field_name (str): Name of the field field_type (str): Type of the field (e.g., "text", "number", "date", etc.) options (Optional[dict]): Additional field options Returns: dict: Created field information Raises: QuickbaseError: If the API request fails """ try: # Format payload to match QuickBase API requirements payload = { "label": field_name, "fieldType": field_type, "properties": options.get("properties", {}), **(options or {}) } # Use the query parameter format for table_id response = self.session.post(f"{self.base_url}/fields?tableId={table_id}", json=payload) return self._handle_response(response) except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to create field: {str(e)}") def update_field(self, table_id: str, field_id: int, name: Optional[str] = None, field_type: Optional[str] = None, options: Optional[dict] = None) -> dict: """Updates an existing field in a QuickBase table. Args: table_id (str): The ID of the table field_id (int): The ID of the field name (Optional[str]): New name for the field field_type (Optional[str]): New type for the field (do not provide this unless you're changing the field type) options (Optional[dict]): Additional field options Returns: dict: Updated field information Raises: QuickbaseError: If the API request fails """ try: # Ensure field_id is an integer if isinstance(field_id, str): field_id = int(field_id) # Get the current field definition field_url = f"{self.base_url}/fields/{field_id}?tableId={table_id}" get_response = self.session.get(field_url) current_field = self._handle_response(get_response) # Create a new payload based on the current field but with updates update_payload = {"label": current_field.get("label")} # Update with the new label if provided if name: update_payload["label"] = name # Add fieldHelp if in the options if options and "fieldHelp" in options: update_payload["fieldHelp"] = options["fieldHelp"] elif "fieldHelp" in current_field: update_payload["fieldHelp"] = current_field["fieldHelp"] # Add properties if in the options if options and "properties" in options: # Start with current properties properties = current_field.get("properties", {}).copy() # Update with new properties properties.update(options["properties"]) update_payload["properties"] = properties elif "properties" in current_field: update_payload["properties"] = current_field["properties"] # Important: Do NOT include fieldType # Debug the payload import json print(f"Update field payload: {json.dumps(update_payload)}") # Make the API call to update the field response = self.session.post(field_url, json=update_payload) return self._handle_response(response) except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to update field: {str(e)}") def delete_field(self, table_id: str, field_id: int) -> bool: """Deletes a field from a QuickBase table. Args: table_id (str): The ID of the table field_id (int): The ID of the field Returns: bool: True if deletion successful Raises: QuickbaseError: If the API request fails """ try: # Ensure field_id is an integer if isinstance(field_id, str): field_id = int(field_id) url = f"{self.base_url}/fields?tableId={table_id}" payload = { "fieldIds": [field_id] } response = self.session.delete(url, json=payload) self._handle_response(response) return True except QuickbaseError: raise except Exception as e: raise QuickbaseError(f"Failed to delete field: {str(e)}") # Create a server instance server = Server("quickbase-mcp") # Log version information print(f"Quickbase MCP Integration Version: {__version__}", file=sys.stderr) print(f"Minimum Python Version: {__min_python_version__}", file=sys.stderr) print(f"Minimum Node.js Version: {__min_node_version__}", file=sys.stderr) # Helper function to check for .env file def check_env_file(): """Check if the .env file exists and provide instructions if not.""" env_file = os.path.join(os.getcwd(), '.env') example_file = os.path.join(os.getcwd(), '.env.example') if not os.path.exists(env_file): print("WARNING: .env file not found!", file=sys.stderr) if os.path.exists(example_file): print("\nAn .env.example file was found. Please create an .env file:", file=sys.stderr) print("1. Copy .env.example to .env", file=sys.stderr) print(" cp .env.example .env", file=sys.stderr) print("2. Edit the .env file with your Quickbase credentials", file=sys.stderr) print(" - Set QUICKBASE_REALM_HOST to your Quickbase realm (e.g., your-company.quickbase.com)", file=sys.stderr) print(" - Set QUICKBASE_USER_TOKEN to your Quickbase user token", file=sys.stderr) print(" - Set QUICKBASE_APP_ID to your Quickbase application ID", file=sys.stderr) else: print("\nNo .env or .env.example file found. Please create an .env file with the following content:", file=sys.stderr) print("QUICKBASE_REALM_HOST=your-realm.quickbase.com", file=sys.stderr) print("QUICKBASE_USER_TOKEN=your_user_token_here", file=sys.stderr) print("QUICKBASE_APP_ID=your_app_id_here", file=sys.stderr) return False return True # Load environment variables print("Loading environment variables...", file=sys.stderr) env_file_exists = check_env_file() load_dotenv() # Check for required environment variables before connecting missing_vars = [] for var in ['QUICKBASE_REALM_HOST', 'QUICKBASE_USER_TOKEN', 'QUICKBASE_APP_ID']: if not os.getenv(var): missing_vars.append(var) if missing_vars: print(f"WARNING: Missing required environment variables: {', '.join(missing_vars)}", file=sys.stderr) print("The Quickbase MCP integration requires the following environment variables:", file=sys.stderr) print(" QUICKBASE_REALM_HOST - Your Quickbase realm (e.g., your-company.quickbase.com)", file=sys.stderr) print(" QUICKBASE_USER_TOKEN - Your Quickbase user token", file=sys.stderr) print(" QUICKBASE_APP_ID - The ID of your Quickbase application", file=sys.stderr) print("\nPlease set these variables in your .env file or environment.", file=sys.stderr) print("The server will start in limited functionality mode.", file=sys.stderr) # Configure with Quickbase credentials from environment variables connected = False qb_client = QuickbaseClient() try: if qb_client.connect(): connected = True print("Successfully initialized Quickbase connection!", file=sys.stderr) else: print("Failed to initialize Quickbase connection", file=sys.stderr) except QuickbaseAuthenticationError as auth_error: print(f"Authentication Error: {auth_error}", file=sys.stderr) print("Server will start in limited functionality mode - only test_connection tool will be available.", file=sys.stderr) except Exception as e: print(f"ERROR: Failed to initialize Quickbase connection: {str(e)}", file=sys.stderr) print("Server will start in limited functionality mode - only test_connection tool will be available.", file=sys.stderr) # Server will continue to run even if authentication fails, but with limited functionality if not connected: print("\nTROUBLESHOOTING TIPS:", file=sys.stderr) print("1. Check that your .env file exists and contains the correct variables", file=sys.stderr) print("2. Verify your user token has access to the specified app", file=sys.stderr) print("3. Confirm your realm hostname is correct (should be in format 'realm.quickbase.com')", file=sys.stderr) print("4. Make sure your app ID is valid", file=sys.stderr) print("5. Check network connectivity to Quickbase API", file=sys.stderr) @server.list_resources() async def handle_list_resources() -> list[types.Resource]: """List available Quickbase resources.""" return [ types.Resource( name="tables", description="Tables in the Quickbase application", uri="quickbase://tables", properties={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the table" }, "name": { "type": "string", "description": "The name of the table" } } } ), types.Resource( name="reports", description="Reports in the Quickbase application", uri="quickbase://reports", properties={ "type": "object", "properties": { "report_id": { "type": "string", "description": "The ID of the report" }, "name": { "type": "string", "description": "The name of the report" }, "table_id": { "type": "string", "description": "The ID of the table this report belongs to" } } } ), types.Resource( name="forms", description="Forms in the Quickbase application", uri="quickbase://forms", properties={ "type": "object", "properties": { "form_id": { "type": "string", "description": "The ID of the form" }, "name": { "type": "string", "description": "The name of the form" }, "table_id": { "type": "string", "description": "The ID of the table this form belongs to" } } } ) ] @server.list_prompts() async def handle_list_prompts() -> list[types.Prompt]: """List available prompts for common Quickbase operations.""" return [ types.Prompt( name="query_table", description="Query records from a Quickbase table", prompt="Query {table_name} table for {fields} where {condition}", parameters={ "type": "object", "properties": { "table_name": { "type": "string", "description": "Name of the table to query" }, "fields": { "type": "string", "description": "Fields to retrieve (comma-separated)" }, "condition": { "type": "string", "description": "Query condition (optional)" } }, "required": ["table_name", "fields"] } ), types.Prompt( name="create_new_record", description="Create a new record in a Quickbase table", prompt="Create a new record in {table_name} with {field_values}", parameters={ "type": "object", "properties": { "table_name": { "type": "string", "description": "Name of the table" }, "field_values": { "type": "string", "description": "Field values in format: field1=value1, field2=value2" } }, "required": ["table_name", "field_values"] } ) ] @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """ List available tools. Each tool specifies its arguments using JSON Schema validation. """ return [ # Connection Operations types.Tool( name="test_connection", description="Tests the connection to Quickbase", inputSchema={ "type": "object", "properties": {}, }, ), types.Tool( name="configure_cache", description="Configures caching behavior for Quickbase operations", inputSchema={ "type": "object", "properties": { "enabled": { "type": "boolean", "description": "Whether to enable caching (default: true)" }, "clear": { "type": "boolean", "description": "Whether to clear all existing caches (default: false)" } }, }, ), # App Operations types.Tool( name="create_app", description="Creates a new QuickBase application", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Name of the application", }, "description": { "type": "string", "description": "Description of the application", }, "options": { "type": "object", "description": "Additional options for app creation", "additionalProperties": True, } }, "required": ["name"], }, ), types.Tool( name="update_app", description="Updates an existing QuickBase application", inputSchema={ "type": "object", "properties": { "app_id": { "type": "string", "description": "The ID of the application", }, "name": { "type": "string", "description": "New name for the application", }, "description": { "type": "string", "description": "New description for the application", }, "options": { "type": "object", "description": "Additional options for app update", "additionalProperties": True, } }, "required": ["app_id"], }, ), # Table Operations types.Tool( name="list_tables", description="Lists all tables in the Quickbase application", inputSchema={ "type": "object", "properties": {}, }, ), types.Tool( name="create_table", description="Creates a new table in a QuickBase application", inputSchema={ "type": "object", "properties": { "app_id": { "type": "string", "description": "The ID of the application", }, "name": { "type": "string", "description": "Name of the table", }, "description": { "type": "string", "description": "Description of the table", }, "fields": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "type": {"type": "string"}, "description": {"type": "string"}, "properties": {"type": "object"} }, "required": ["name", "type"] }, "description": "List of field definitions", }, "options": { "type": "object", "description": "Additional options for table creation", "additionalProperties": True, } }, "required": ["app_id", "name"], }, ), types.Tool( name="update_table", description="Updates an existing QuickBase table", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the table", }, "name": { "type": "string", "description": "New name for the table", }, "description": { "type": "string", "description": "New description for the table", }, "options": { "type": "object", "description": "Additional options for table update", "additionalProperties": True, } }, "required": ["table_id"], }, ), # Field Operations types.Tool( name="get_table_fields", description="Retrieves field information for a specific Quickbase table", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the Quickbase table", }, }, "required": ["table_id"], }, ), types.Tool( name="create_field", description="Creates a new field in a QuickBase table", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the table", }, "field_name": { "type": "string", "description": "Name of the field", }, "field_type": { "type": "string", "description": "Type of the field (e.g., text, number, date)", }, "options": { "type": "object", "description": "Additional field options", "additionalProperties": True, } }, "required": ["table_id", "field_name", "field_type"], }, ), types.Tool( name="update_field", description="Updates an existing field in a QuickBase table", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the table", }, "field_id": { "type": "string", "description": "The ID of the field", }, "name": { "type": "string", "description": "New name for the field", }, "field_type": { "type": "string", "description": "New type for the field", }, "options": { "type": "object", "description": "Additional field options", "additionalProperties": True, } }, "required": ["table_id", "field_id"], }, ), # Record Operations types.Tool( name="query_records", description="Executes a query against a Quickbase table with optional pagination", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the Quickbase table", }, "select": { "type": "array", "items": {"type": "string"}, "description": "Fields to select", }, "where": { "type": "string", "description": "Query criteria", }, "options": { "type": "object", "properties": { "skip": {"type": "integer", "description": "Number of records to skip"}, "top": {"type": "integer", "description": "Number of records to retrieve per page"}, "groupBy": { "type": "array", "items": {"type": "string"}, "description": "Fields to group results by" }, "orderBy": { "type": "array", "items": { "type": "object", "properties": { "fieldId": {"type": "string"}, "order": {"type": "string", "enum": ["ASC", "DESC"]} } }, "description": "Fields to order results by" } }, "description": "Query options for filtering, ordering, and pagination" }, "paginate": { "type": "boolean", "description": "Whether to automatically handle pagination for large result sets", }, "max_records": { "type": "string", "description": "Maximum number of records to return when paginating (default: 1000)", } }, "required": ["table_id"], }, ), types.Tool( name="create_record", description="Creates a new record in a Quickbase table", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the Quickbase table", }, "data": { "type": "string", "description": "The data for the new record", }, }, "required": ["table_id", "data"], }, ), types.Tool( name="update_record", description="Updates an existing record in a Quickbase table", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the Quickbase table", }, "record_id": { "type": "string", "description": "The ID of the record to update", }, "data": { "type": "object", "description": "The updated data for the record", "properties": {}, "additionalProperties": True, }, }, "required": ["table_id", "record_id", "data"], }, ), types.Tool( name="bulk_create_records", description="Creates multiple records in a Quickbase table", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the Quickbase table", }, "records": { "type": "array", "description": "Array of record data to insert", "items": { "type": "object", "additionalProperties": True } }, }, "required": ["table_id", "records"], }, ), types.Tool( name="bulk_update_records", description="Updates multiple records in a Quickbase table", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the Quickbase table", }, "records": { "type": "array", "description": "Array of record data to update (must include record IDs)", "items": { "type": "object", "additionalProperties": True } }, }, "required": ["table_id", "records"], }, ), # File Operations types.Tool( name="upload_file", description="Uploads a file to a field in a Quickbase record", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the Quickbase table", }, "record_id": { "type": "string", "description": "The ID of the record", }, "field_id": { "type": "string", "description": "The ID of the field (must be a file attachment field)", }, "file_path": { "type": "string", "description": "Path to the file to upload", } }, "required": ["table_id", "record_id", "field_id", "file_path"], }, ), types.Tool( name="download_file", description="Downloads a file from a field in a Quickbase record", inputSchema={ "type": "object", "properties": { "table_id": { "type": "string", "description": "The ID of the Quickbase table", }, "record_id": { "type": "string", "description": "The ID of the record", }, "field_id": { "type": "string", "description": "The ID of the field (must be a file attachment field)", }, "version": { "type": "string", "description": "The version of the file to download (default 0 for latest)", }, "output_path": { "type": "string", "description": "Path where the file should be saved", } }, "required": ["table_id", "record_id", "field_id", "output_path"], }, ), # Report Operations types.Tool( name="run_report", description="Executes a Quickbase report", inputSchema={ "type": "object", "properties": { "report_id": { "type": "string", "description": "The ID of the report to run", }, "options": { "type": "object", "properties": { "skip": {"type": "integer"}, "top": {"type": "integer"}, "format": {"type": "string"}, "filters": {"type": "object"}, "groupBy": { "type": "array", "items": {"type": "string"} }, "sortBy": { "type": "array", "items": {"type": "string"} } } } }, "required": ["report_id"], }, ), ] @server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, str]) -> list[types.TextContent]: try: if name == "list_tables": try: app_id = os.getenv('QUICKBASE_APP_ID') if not app_id: raise QuickbaseError("Missing QUICKBASE_APP_ID environment variable") response = qb_client.session.get(f"{qb_client.base_url}/tables?appId={app_id}") tables = qb_client._handle_response(response) formatted_tables = [] for table in tables: formatted_tables.append({ "id": table.get("id"), "name": table.get("name"), "description": table.get("description", "") }) return [ types.TextContent( type="text", text=f"Available Tables in App {app_id}:\n{json.dumps(formatted_tables, indent=2)}" ) ] except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error listing tables: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] elif name == "test_connection": # Enhanced test_connection tool with detailed diagnostics try: # Check environment variables first realm = os.getenv('QUICKBASE_REALM_HOST') token = os.getenv('QUICKBASE_USER_TOKEN') app_id = os.getenv('QUICKBASE_APP_ID') # Prepare diagnostic info env_vars_status = [] for var, value in [ ("QUICKBASE_REALM_HOST", realm), ("QUICKBASE_USER_TOKEN", token), ("QUICKBASE_APP_ID", app_id) ]: if not value: env_vars_status.append(f"❌ {var}: Missing") else: # Mask token for security if var == "QUICKBASE_USER_TOKEN" and value: masked = value[:4] + "..." + value[-4:] if len(value) > 8 else "****" env_vars_status.append(f"✅ {var}: {masked}") else: env_vars_status.append(f"✅ {var}: {value}") env_status = "\n".join(env_vars_status) # Check for connection status if not realm or not token or not app_id: return [ types.TextContent( type="text", text=f"Quickbase Connection: FAILED\n\nEnvironment Variables Check:\n{env_status}\n\nMissing required environment variables. Please update your .env file with the required credentials." ) ] # Test actual connection if qb_client.session is None: success = qb_client.connect() status = "Connected successfully" if success else "Connection failed" else: # Try a simple API call to verify connection is working try: response = qb_client.session.get(f"{qb_client.base_url}/apps/{app_id}") response.raise_for_status() status = "Already connected" except Exception: status = "Reconnecting..." success = qb_client.connect() status = "Connected successfully" if success else "Connection failed" # Return detailed connection information return [ types.TextContent( type="text", text=f"Quickbase Connection Status: {status}\n\nEnvironment Variables Check:\n{env_status}\n\nAPI Connection:\nRealm: {realm}\nEndpoint: {qb_client.base_url}\nApp ID: {app_id}\nVersion: {__version__}" ) ] except QuickbaseAuthenticationError as e: error_detail = "" if e.response and isinstance(e.response, dict): if 'message' in e.response: error_detail += f"\nAPI Error: {e.response.get('message')}" if 'description' in e.response: error_detail += f"\nDescription: {e.response.get('description')}" troubleshooting = "\nTroubleshooting Steps:\n" troubleshooting += "1. Verify your user token is correct and not expired\n" troubleshooting += "2. Confirm the token has access to the specified app\n" troubleshooting += "3. Check that the realm hostname is correct\n" troubleshooting += "4. Ensure the app ID exists and is accessible with your token" return [ types.TextContent( type="text", text=f"Authentication Error: {e.message}\nStatus Code: {e.status_code}{error_detail}{troubleshooting}" ) ] except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Connection Error: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}\n\nPlease verify your Quickbase credentials and network connectivity." ) ] except Exception as e: return [ types.TextContent( type="text", text=f"Unexpected Error: {str(e)}\n\nPlease check your .env file and network connectivity." ) ] elif name == "query_records": table_id = arguments.get("table_id") select = arguments.get("select", []) where = arguments.get("where", "") options = arguments.get("options", {}) paginate = arguments.get("paginate", False) max_records = int(arguments.get("max_records", 1000)) if not table_id: raise ValueError("Missing 'table_id' argument") try: results = qb_client.get_table_records( table_id, query={ "from": table_id, "select": select, "where": where, "options": options }, paginate=paginate, max_records=max_records ) # Format the output for better readability record_count = len(results.get("data", [])) metadata = results.get("metadata", {}) pagination_info = results.get("pagination", {}) summary = f"Retrieved {record_count} records from table {table_id}" if paginate: summary += f" (paginated: {pagination_info.get('total_records_fetched', 0)} records fetched)" return [ types.TextContent( type="text", text=f"{summary}\n\nQuery Results (JSON):\n{json.dumps(results, indent=2)}", ) ] except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error querying records: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] except Exception as e: raise ValueError(f"Error querying records: {str(e)}") elif name == "get_table_fields": table_id = arguments.get("table_id") if not table_id: raise ValueError("Missing 'table_id' argument") results = qb_client.get_table_fields(table_id) return [ types.TextContent( type="text", text=f"Table Fields (JSON):\n{json.dumps(results, indent=2)}", ) ] elif name == "create_record": table_id = arguments.get("table_id") data = arguments.get("data") if not table_id or not data: raise ValueError("Missing 'table_id' or 'data' argument") try: # Parse the data if it's a string if isinstance(data, str): data = json.loads(data) # Ensure data is a dictionary and not empty if not isinstance(data, dict) or not data: raise ValueError("Data must be a non-empty dictionary") results = qb_client.create_record(table_id, data) return [ types.TextContent( type="text", text=f"Create Record Result (JSON):\n{json.dumps(results, indent=2)}", ) ] except json.JSONDecodeError as e: raise ValueError(f"Invalid data format: {str(e)}") except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error creating record: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] elif name == "update_record": table_id = arguments.get("table_id") record_id = arguments.get("record_id") data = arguments.get("data") if not table_id or not record_id or not data: raise ValueError("Missing 'table_id', 'record_id', or 'data' argument") try: # Parse the data if it's a string if isinstance(data, str): data = json.loads(data) # Ensure data is a dictionary and not empty if not isinstance(data, dict) or not data: raise ValueError("Data must be a non-empty dictionary") results = qb_client.update_record(table_id, record_id, data) return [ types.TextContent( type="text", text=f"Update Record Result (JSON):\n{json.dumps(results, indent=2)}", ) ] except json.JSONDecodeError as e: raise ValueError(f"Invalid data format: {str(e)}") except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error updating record: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] elif name == "run_report": report_id = arguments.get("report_id") options = arguments.get("options", {}) if not report_id: raise ValueError("Missing 'report_id' argument") results = qb_client.run_report(report_id, options) return [ types.TextContent( type="text", text=f"Report Results (JSON):\n{json.dumps(results, indent=2)}", ) ] elif name == "upload_file": table_id = arguments.get("table_id") record_id = arguments.get("record_id") field_id = arguments.get("field_id") file_path = arguments.get("file_path") if not table_id or not record_id or not field_id or not file_path: raise ValueError("Missing required arguments: table_id, record_id, field_id, and file_path are all required") try: # Convert IDs to integers if they're strings if isinstance(record_id, str): record_id = int(record_id) if isinstance(field_id, str): field_id = int(field_id) results = qb_client.upload_file(table_id, record_id, field_id, file_path) return [ types.TextContent( type="text", text=f"File Upload Result (JSON):\n{json.dumps(results, indent=2)}", ) ] except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error uploading file: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] except Exception as e: raise ValueError(f"Error uploading file: {str(e)}") elif name == "download_file": table_id = arguments.get("table_id") record_id = arguments.get("record_id") field_id = arguments.get("field_id") version = arguments.get("version", 0) output_path = arguments.get("output_path") if not table_id or not record_id or not field_id or not output_path: raise ValueError("Missing required arguments: table_id, record_id, field_id, and output_path are required") try: # Get the file content file_content = qb_client.download_file(table_id, record_id, field_id, version) # Save the file to the specified path with open(output_path, 'wb') as f: f.write(file_content) return [ types.TextContent( type="text", text=f"File downloaded successfully and saved to: {output_path}" ) ] except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error downloading file: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] except Exception as e: raise ValueError(f"Error downloading file: {str(e)}") elif name == "manage_attachments": action = arguments.get("action") table_id = arguments.get("table_id") record_id = arguments.get("record_id") attachment_id = arguments.get("attachment_id") file_path = arguments.get("file_path") if not action or not table_id or not record_id: raise ValueError("Missing required arguments") if action == "upload" and not file_path: raise ValueError("Missing 'file_path' for upload action") elif action in ["download", "delete"] and not attachment_id: raise ValueError("Missing 'attachment_id' for download/delete action") try: if action == "upload": # We now have a dedicated upload_file tool field_id = attachment_id # For backwards compatibility if isinstance(record_id, str): record_id = int(record_id) if isinstance(field_id, str): field_id = int(field_id) results = qb_client.upload_file(table_id, record_id, field_id, file_path) return [ types.TextContent( type="text", text=f"File Upload Result (JSON):\n{json.dumps(results, indent=2)}", ) ] elif action == "download": # We now have a dedicated download_file tool file_content = qb_client.download_file(attachment_id) with open(file_path, 'wb') as f: f.write(file_content) return [ types.TextContent( type="text", text=f"File downloaded successfully and saved to: {file_path}" ) ] elif action == "delete": # We now have a dedicated delete_file tool result = qb_client.delete_file(attachment_id) return [ types.TextContent( type="text", text=f"File deletion {'successful' if result else 'failed'}" ) ] else: raise ValueError(f"Unknown action: {action}") except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error with file operation: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] except Exception as e: raise ValueError(f"Error with file operation: {str(e)}") elif name == "manage_users": action = arguments.get("action") email = arguments.get("email") role_id = arguments.get("role_id") options = arguments.get("options", {}) if not action or not email or not role_id: raise ValueError("Missing required arguments") try: results = qb_client.session.post(f"{qb_client.base_url}/users", json={"action": action, "email": email, "roleId": role_id, **options}) result_json = qb_client._handle_response(results) return [ types.TextContent( type="text", text=f"User Management Result (JSON):\n{json.dumps(result_json, indent=2)}", ) ] except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error managing users: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] except Exception as e: raise ValueError(f"Error managing users: {str(e)}") elif name == "bulk_create_records": table_id = arguments.get("table_id") records = arguments.get("records") if not table_id or not records: raise ValueError("Missing required arguments: table_id and records") try: # Format the records according to QuickBase API expectations formatted_records = [] for record in records: formatted_record = {} for field_id, value in record.items(): # Convert field_id to string if it's an integer field_id_str = str(field_id) formatted_record[field_id_str] = {"value": value} formatted_records.append(formatted_record) results = qb_client.bulk_create_records(table_id, formatted_records) return [ types.TextContent( type="text", text=f"Bulk Create Records Result (JSON):\n{json.dumps(results, indent=2)}", ) ] except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error creating records: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] except Exception as e: raise ValueError(f"Error creating records: {str(e)}") elif name == "bulk_update_records": table_id = arguments.get("table_id") records = arguments.get("records") if not table_id or not records: raise ValueError("Missing required arguments: table_id and records") try: # Format the records according to QuickBase API expectations formatted_records = [] for record in records: formatted_record = {} for field_id, value in record.items(): # Make sure the record ID (field 3) is included if field_id == "3" or field_id == 3: record_id = value formatted_record["3"] = {"value": record_id} else: # Convert field_id to string if it's an integer field_id_str = str(field_id) formatted_record[field_id_str] = {"value": value} formatted_records.append(formatted_record) results = qb_client.bulk_update_records(table_id, formatted_records) return [ types.TextContent( type="text", text=f"Bulk Update Records Result (JSON):\n{json.dumps(results, indent=2)}", ) ] except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error updating records: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] except Exception as e: raise ValueError(f"Error updating records: {str(e)}") elif name == "create_table": app_id = arguments.get("app_id") name = arguments.get("name") description = arguments.get("description") fields = arguments.get("fields", []) options = arguments.get("options", {}) if not app_id or not name: raise ValueError("Missing 'app_id' or 'name' argument") try: # Format fields to match QuickBase API requirements formatted_fields = [] if fields: for field in fields: formatted_field = { "fieldType": field.get("type", "text"), # Default to text if not specified "label": field.get("name", ""), "description": field.get("description", ""), "properties": field.get("properties", {}) } formatted_fields.append(formatted_field) results = qb_client.create_table(app_id, name, description, formatted_fields, options) return [ types.TextContent( type="text", text=f"Create Table Result (JSON):\n{json.dumps(results, indent=2)}", ) ] except json.JSONDecodeError as e: raise ValueError(f"Invalid fields format: {str(e)}") except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error creating table: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] elif name == "update_table": table_id = arguments.get("table_id") name = arguments.get("name") description = arguments.get("description") options = arguments.get("options", {}) if not table_id: raise ValueError("Missing 'table_id' argument") results = qb_client.update_table(table_id, name, description, options) return [ types.TextContent( type="text", text=f"Update Table Result (JSON):\n{json.dumps(results, indent=2)}", ) ] elif name == "create_field": table_id = arguments.get("table_id") field_name = arguments.get("field_name") field_type = arguments.get("field_type") options = arguments.get("options", {}) if not table_id or not field_name or not field_type: raise ValueError("Missing required arguments") try: results = qb_client.create_field(table_id, field_name, field_type, options) return [ types.TextContent( type="text", text=f"Create Field Result (JSON):\n{json.dumps(results, indent=2)}", ) ] except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error creating field: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] elif name == "update_field": table_id = arguments.get("table_id") field_id = arguments.get("field_id") name = arguments.get("name") options = arguments.get("options", {}) if not table_id or not field_id: raise ValueError("Missing 'table_id' or 'field_id' argument") # Since we continue to have issues with field updates, # provide a more detailed response about limitations return [ types.TextContent( type="text", text=f"Field Update Status: Limited API Support\n\n" f"The QuickBase API has limitations with field updates. While we've attempted multiple solutions, " f"the API consistently returns errors related to field updating.\n\n" f"Requested Update:\n" f"- Field ID: {field_id}\n" f"- Table ID: {table_id}\n" f"- New Name: {name if name else 'Not specified'}\n" f"- Field Help: {options.get('fieldHelp', 'Not specified')}\n" f"- Properties: {json.dumps(options.get('properties', {}))}\n\n" f"To update fields, please use the QuickBase UI or direct API calls with the correct authentication." ) ] raise ValueError(f"Unknown tool: {name}") except QuickbaseError as e: return [ types.TextContent( type="text", text=f"Error: {e.message}\nStatus: {e.status_code}\nDetails: {json.dumps(e.response, indent=2)}" ) ] except Exception as e: return [ types.TextContent( type="text", text=f"Unexpected error: {str(e)}" ) ] async def run(): # Check version compatibility if not check_version_compatibility(): print("Version compatibility check failed. Exiting.") return print(f"Starting Quickbase MCP Integration v{__version__}") # Load environment variables load_dotenv() # Check for required environment variables required_vars = ["QUICKBASE_REALM_HOST", "QUICKBASE_USER_TOKEN", "QUICKBASE_APP_ID"] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: print(f"Error: Missing required environment variables: {', '.join(missing_vars)}") print("Please set these in your .env file or environment.") return async with mcp.server.stdio.stdio_server() as (read, write): await server.run( read, write, InitializationOptions( server_name="quickbase-mcp", server_version=__version__, capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(run())