Skip to main content
Glama

mcp-server-motherduck

Official
by motherduckdb
database.py10.3 kB
import os import duckdb from typing import Literal, Optional from tabulate import tabulate import logging import threading from .configs import SERVER_VERSION logger = logging.getLogger("mcp_server_motherduck") class DatabaseClient: def __init__( self, db_path: str | None = None, motherduck_token: str | None = None, home_dir: str | None = None, saas_mode: bool = False, read_only: bool = False, max_rows: int = 1024, max_chars: int = 50000, query_timeout: int = -1, ): self._read_only = read_only self._max_rows = max_rows self._max_chars = max_chars self._query_timeout = query_timeout self.db_path, self.db_type = self._resolve_db_path_type( db_path, motherduck_token, saas_mode ) logger.info(f"Database client initialized in `{self.db_type}` mode") # Set the home directory for DuckDB if home_dir: os.environ["HOME"] = home_dir self.conn = self._initialize_connection() def _initialize_connection(self) -> Optional[duckdb.DuckDBPyConnection]: """Initialize connection to the MotherDuck or DuckDB database""" logger.info(f"🔌 Connecting to {self.db_type} database") # S3 databases don't support read-only mode if self.db_type == "s3" and self._read_only: raise ValueError("Read-only mode is not supported for S3 databases") if self.db_type == "duckdb" and self._read_only: # check that we can connect, issue a `select 1` and then close + return None try: conn = duckdb.connect( self.db_path, config={ "custom_user_agent": f"mcp-server-motherduck/{SERVER_VERSION}" }, read_only=self._read_only, ) conn.execute("SELECT 1") conn.close() return None except Exception as e: logger.error(f"❌ Read-only check failed: {e}") raise # Check if this is an S3 path if self.db_type == "s3": # For S3, we need to create an in-memory connection and attach the S3 database conn = duckdb.connect(':memory:') # Install and load the httpfs extension for S3 support import io from contextlib import redirect_stdout, redirect_stderr null_file = io.StringIO() with redirect_stdout(null_file), redirect_stderr(null_file): try: conn.execute("INSTALL httpfs;") except: pass # Extension might already be installed conn.execute("LOAD httpfs;") # Configure S3 credentials from environment variables using CREATE SECRET aws_access_key = os.environ.get('AWS_ACCESS_KEY_ID') aws_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY') aws_session_token = os.environ.get('AWS_SESSION_TOKEN') aws_region = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1') if aws_access_key and aws_secret_key and not aws_session_token: # Use CREATE SECRET for better credential management conn.execute(f""" CREATE SECRET IF NOT EXISTS s3_secret ( TYPE S3, KEY_ID '{aws_access_key}', SECRET '{aws_secret_key}', REGION '{aws_region}' ); """) elif aws_session_token: # Use credential_chain provider to automatically fetch credentials # This supports IAM roles, SSO, instance profiles, etc. conn.execute(f""" CREATE SECRET IF NOT EXISTS s3_secret ( TYPE S3, PROVIDER credential_chain, REGION '{aws_region}' ); """) # Attach the S3 database try: # For S3, we always attach as READ_ONLY since S3 storage is typically read-only # Even when not in read_only mode, we attach as READ_ONLY for S3 conn.execute(f"ATTACH '{self.db_path}' AS s3db (READ_ONLY);") # Use the attached database conn.execute("USE s3db;") logger.info(f"✅ Successfully connected to {self.db_type} database (attached as read-only)") except Exception as e: logger.error(f"Failed to attach S3 database: {e}") # If the database doesn't exist and we're not in read-only mode, try to create it if "database does not exist" in str(e) and not self._read_only: logger.info("S3 database doesn't exist, attempting to create it...") try: # Create a new database at the S3 location conn.execute(f"ATTACH '{self.db_path}' AS s3db;") conn.execute("USE s3db;") logger.info(f"✅ Created new S3 database at {self.db_path}") except Exception as create_error: logger.error(f"Failed to create S3 database: {create_error}") raise else: raise return conn conn = duckdb.connect( self.db_path, config={"custom_user_agent": f"mcp-server-motherduck/{SERVER_VERSION}"}, read_only=self._read_only, ) logger.info(f"✅ Successfully connected to {self.db_type} database") return conn def _resolve_db_path_type( self, db_path: str, motherduck_token: str | None = None, saas_mode: bool = False ) -> tuple[str, Literal["duckdb", "motherduck", "s3"]]: """Resolve and validate the database path""" # Handle S3 paths if db_path.startswith("s3://"): return db_path, "s3" # Handle MotherDuck paths if db_path.startswith("md:"): if motherduck_token: logger.info("Using MotherDuck token to connect to database `md:`") if saas_mode: logger.info("Connecting to MotherDuck in SaaS mode") return ( f"{db_path}?motherduck_token={motherduck_token}&saas_mode=true", "motherduck", ) else: return ( f"{db_path}?motherduck_token={motherduck_token}", "motherduck", ) elif os.getenv("motherduck_token"): logger.info( "Using MotherDuck token from env to connect to database `md:`" ) return ( f"{db_path}?motherduck_token={os.getenv('motherduck_token')}", "motherduck", ) else: raise ValueError( "Please set the `motherduck_token` as an environment variable or pass it as an argument with `--motherduck-token` when using `md:` as db_path." ) if db_path == ":memory:": return db_path, "duckdb" return db_path, "duckdb" def _execute(self, query: str) -> str: # Get connection to use if self.conn is None: conn = duckdb.connect( self.db_path, config={"custom_user_agent": f"mcp-server-motherduck/{SERVER_VERSION}"}, read_only=self._read_only, ) else: conn = self.conn # Execute with or without timeout if self._query_timeout > 0: rows, has_more_rows, headers = self._execute_with_timeout(conn, query) else: rows, has_more_rows, headers = self._execute_direct(conn, query) # Close connection if it was temporary if self.conn is None: conn.close() returned_rows = len(rows) # Format results as table out = tabulate(rows, headers=headers, tablefmt="pretty") # Apply character limit if output is too long char_truncated = len(out) > self._max_chars if char_truncated: out = out[:self._max_chars] # Add informative feedback message if has_more_rows: out += f"\n\n⚠️ Showing first {returned_rows} rows." elif char_truncated: out += f"\n\n⚠️ Output truncated at {self._max_chars:,} characters." return out def _execute_direct(self, conn, query: str) -> tuple: """Execute query without timeout - original code path""" q = conn.execute(query) rows = q.fetchmany(self._max_rows) has_more_rows = q.fetchone() is not None headers = [d[0] + "\n" + str(d[1]) for d in q.description] return rows, has_more_rows, headers def _execute_with_timeout(self, conn, query: str) -> tuple: """Execute query with timeout using threading.Timer and conn.interrupt()""" timer = threading.Timer(self._query_timeout, conn.interrupt) timer.start() try: q = conn.execute(query) rows = q.fetchmany(self._max_rows) has_more_rows = q.fetchone() is not None headers = [d[0] + "\n" + str(d[1]) for d in q.description] return rows, has_more_rows, headers except duckdb.InterruptException: raise ValueError( f"Query execution timed out after {self._query_timeout} seconds. " f"Increase timeout with --query-timeout argument when starting the mcp server." ) finally: timer.cancel() def query(self, query: str) -> str: try: return self._execute(query) except Exception as e: raise ValueError(f"❌ Error executing query: {e}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/motherduckdb/mcp-server-motherduck'

If you have feedback or need assistance with the MCP directory API, please join our Discord server