execute_sql_query
Run SQL queries on Metabase databases to retrieve and analyze data directly through the MCP server interface.
Instructions
Execute a native SQL query through Metabase.
Args: database_id (int): ID of the database to execute the query on. native_query (str): The SQL query to execute. parameters (list, optional): Query parameters.
Returns: Dict[str, Any]: Query execution result.
Notes: - For PostgreSQL databases, column names are be case-sensitive - Use double quotes around column names with mixed case (e.g., "columnName") - Example with quoted column names: SELECT "userId", "orderDate", COUNT(*) FROM "Orders" GROUP BY "userId", "orderDate"
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| database_id | Yes | ||
| query | Yes | ||
| parameters | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/metabase_mcp_server.py:1310-1341 (handler)The main handler function for the execute_sql_query tool. It accepts database_id, query string, and optional parameters, constructs a native query payload, and sends it to Metabase's /api/dataset endpoint via the make_metabase_request helper.
@mcp.tool() async def execute_sql_query( database_id: int, query: str, parameters: Optional[List] = None ) -> Dict[str, Any]: """ Execute a native SQL query through Metabase. Args: database_id (int): ID of the database to execute the query on. native_query (str): The SQL query to execute. parameters (list, optional): Query parameters. Returns: Dict[str, Any]: Query execution result. Notes: - For PostgreSQL databases, column names are be case-sensitive - Use double quotes around column names with mixed case (e.g., "columnName") - Example with quoted column names: SELECT "userId", "orderDate", COUNT(*) FROM "Orders" GROUP BY "userId", "orderDate" """ query_payload = { "database": database_id, "type": "native", "native": {"query": query}, "parameters": parameters or [] } logger.info(f"Executing SQL query on database {database_id}") logger.debug(f"Query: {query[:100]}...") return await make_metabase_request(RequestMethod.POST, "/api/dataset", json=query_payload) - src/metabase_mcp_server.py:159-160 (registration)Initializes the FastMCP server instance. The @mcp.tool() decorator on execute_sql_query registers it as an MCP tool with this server instance.
# Initialize FastMCP agent mcp = FastMCP("metabase", lifespan=app_lifespan) - src/metabase_mcp_server.py:162-247 (helper)Helper utility function that handles HTTP requests to the Metabase API. Used by execute_sql_query to post the SQL query to the /api/dataset endpoint. Includes error handling, logging, and response validation.
async def make_metabase_request( method: RequestMethod, endpoint: str, data: Optional[Dict[str, Any] | bytes] = None, params: Optional[Dict[str, Any]] = None, json: Any = None, headers: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """ Make a request to the Metabase API. Args: method: HTTP method to use (GET, POST, PUT, DELETE) endpoint: API endpoint path data: Request data (for form data) params: URL parameters json: JSON request body headers: Additional headers Returns: Dict[str, Any]: Response data Raises: MetabaseConnectionError: When the Metabase server is unreachable MetabaseResponseError: When Metabase returns a non-2xx status code RuntimeError: For other errors """ if not METABASE_URL or not METABASE_API_KEY: raise RuntimeError("METABASE_URL or METABASE_API_KEY environment variable is not set. Metabase API requests will fail.") if session is None: raise RuntimeError("HTTP session is not initialized. Ensure app_lifespan was called.") try: request_headers = headers or {} logger.debug(f"Making {method.name} request to {METABASE_URL}{endpoint}") # Log request payload for debugging (omit sensitive info) if json and logger.level <= logging.DEBUG: sanitized_json = {**json} if 'password' in sanitized_json: sanitized_json['password'] = '********' logger.debug(f"Request payload: {sanitized_json}") response = await session.request( method=method.name, url=endpoint, timeout=aiohttp.ClientTimeout(total=30), headers=request_headers, data=data, params=params, json=json, ) try: # Handle 500 errors with more detailed info if response.status >= 500: error_text = await response.text() logger.error(f"Server error {response.status}: {error_text[:200]}") raise MetabaseResponseError(response.status, f"Server Error: {error_text[:200]}", endpoint) response.raise_for_status() response_data = await response.json() # Ensure the response is a dictionary for FastMCP compatibility return ensure_dict_response(response_data) except aiohttp.ContentTypeError: # Handle empty responses or non-JSON responses content = await response.text() if not content: return {"data": {}} logger.warning(f"Received non-JSON response: {content}") return {"data": content} except aiohttp.ClientConnectionError as e: logger.error(f"Connection error: {str(e)}") raise MetabaseConnectionError("Metabase is unreachable. Is the Metabase server running?") from e except aiohttp.ClientResponseError as e: logger.error(f"Response error: {e.status}, {e.message}, {e.request_info.url}") raise MetabaseResponseError(e.status, e.message, str(e.request_info.url)) from e except Exception as e: logger.error(f"Request error: {str(e)}") raise RuntimeError(f"Request error: {str(e)}") from e