Skip to main content
Glama
metabase.py10.2 kB
import httpx from typing import Dict, Any, Optional from src.config.settings import Config class MetabaseAPI: """Class for interacting with the Metabase API""" @staticmethod async def make_request(endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Any: """Make a request to the Metabase API with proper error handling. Args: endpoint: API endpoint to call (without the base URL) method: HTTP method to use (GET, POST, etc.) data: Optional JSON data to send with the request Returns: JSON response from the API or error dict """ # Get fresh values from config using the getter methods metabase_url = Config.get_metabase_url() api_key = Config.get_metabase_api_key() headers = { "x-api-key": api_key, "Content-Type": "application/json" } url = f"{metabase_url}/api/{endpoint.lstrip('/')}" print(f"Making request to: {url}") # Debugging async with httpx.AsyncClient() as client: try: if method == "GET": response = await client.get(url, headers=headers, timeout=30.0) elif method == "POST": response = await client.post(url, headers=headers, json=data, timeout=30.0) elif method == "PUT": response = await client.put(url, headers=headers, json=data, timeout=30.0) elif method == "DELETE": response = await client.delete(url, headers=headers, timeout=30.0) else: return {"error": f"Unsupported HTTP method: {method}"} response.raise_for_status() # Try to parse as JSON, but handle non-JSON responses try: return response.json() except ValueError: # If response is not JSON, return as error with the text content return {"error": "Non-JSON response", "message": response.text} except httpx.HTTPStatusError as e: # Try to get JSON error response try: error_json = e.response.json() return {"error": f"HTTP error: {e.response.status_code}", "message": str(error_json)} except ValueError: # If error is not JSON, return the text return {"error": f"HTTP error: {e.response.status_code}", "message": e.response.text} except Exception as e: return {"error": "Failed to make request", "message": str(e)} @classmethod async def get_request(cls, endpoint: str) -> Any: """Shorthand for GET requests""" return await cls.make_request(endpoint, method="GET") @classmethod async def post_request(cls, endpoint: str, data: Dict) -> Any: """Shorthand for POST requests""" return await cls.make_request(endpoint, method="POST", data=data) @classmethod async def test_connection(cls) -> tuple: """Test connection to Metabase API""" try: response = await cls.get_request("database") if response is None or "error" in response: return False, f"Connection failed: {response.get('message', 'Unknown error')}" return True, "Connection successful!" except Exception as e: return False, f"Connection failed: {str(e)}" @classmethod async def get_databases(cls): """Get list of all databases""" response = await cls.get_request("database") # Handle case where response might be a string if isinstance(response, str): return {"error": "Unexpected string response", "message": response} # Check if response is a dictionary with a 'data' key (new Metabase API format) if isinstance(response, dict) and 'data' in response: print(f"Found 'data' key in response with {len(response['data'])} databases") return response['data'] # Return just the list of databases return response @classmethod async def get_database_metadata(cls, database_id: int): """Get metadata for a specific database""" return await cls.get_request(f"database/{database_id}/metadata") @classmethod async def get_actions(cls): """Get list of all actions with support for different Metabase versions""" # Try the standard endpoint first response = await cls.get_request("action") # If that fails, try alternative endpoints if response is None or "error" in response: # Try the legacy endpoint (some older Metabase versions) response_legacy = await cls.get_request("api/action") if response_legacy and not "error" in response_legacy: return response_legacy # If all attempts failed, return the original error return response return response @classmethod async def get_action(cls, action_id: int): """Get details for a specific action""" return await cls.get_request(f"action/{action_id}") @classmethod async def execute_action(cls, action_id: int, parameters: Dict = None): """Execute an action with parameters""" if parameters is None: parameters = {} # Validate action_id if not isinstance(action_id, int) or action_id <= 0: return {"error": "Invalid action ID", "message": "Action ID must be a positive integer"} # Sanitize parameters sanitized_params = {} for key, value in parameters.items(): sanitized_params[str(key)] = value return await cls.post_request(f"action/{action_id}/execute", {"parameters": sanitized_params}) @classmethod async def get_table_metadata(cls, table_id: int): """Get detailed metadata for a specific table""" return await cls.get_request(f"table/{table_id}/query_metadata") @classmethod async def get_field_metadata(cls, field_id: int): """Get detailed metadata for a specific field""" return await cls.get_request(f"field/{field_id}") @classmethod async def get_database_schema(cls, database_id: int): """Get the database schema with relationships between tables""" # First get the basic metadata metadata = await cls.get_database_metadata(database_id) if metadata is None or "error" in metadata: return metadata # For each table, get detailed metadata including foreign keys tables = metadata.get('tables', []) enhanced_tables = [] for table in tables: table_id = table.get('id') if table_id: table_details = await cls.get_table_metadata(table_id) if table_details and not "error" in table_details: enhanced_tables.append(table_details) else: enhanced_tables.append(table) else: enhanced_tables.append(table) # Replace tables with enhanced versions metadata['tables'] = enhanced_tables return metadata @classmethod async def run_query(cls, database_id: int, query_string: str, row_limit: int = 5): """Run a native query against a database with a row limit Args: database_id: The ID of the database to query query_string: The SQL query to execute row_limit: Maximum number of rows to return (default: 5) Returns: Query results or error message """ # Remove trailing semicolons that can cause issues with Metabase API query_string = query_string.strip() if query_string.endswith(';'): query_string = query_string[:-1] # Ensure the query has a LIMIT clause for safety query_string = cls._ensure_query_limit(query_string, row_limit) # Prepare the query payload payload = { "database": database_id, "type": "native", "native": { "query": query_string, "template-tags": {} } } # Execute the query response = await cls.post_request("dataset", payload) # Improved error handling for Metabase error responses if response and isinstance(response, dict) and "error" in response: error_data = response.get("error") # Check for common error patterns in Metabase responses if isinstance(error_data, str) and "does not exist" in error_data: # This is likely a SQL syntax error from the database return {"error": "SQL Error", "message": error_data} # If the error message contains additional info message = response.get("message", "Unknown error") if isinstance(message, dict) and "data" in message: if "errors" in message["data"]: return {"error": "SQL Error", "message": message["data"]["errors"]} return response @staticmethod def _ensure_query_limit(query: str, limit: int) -> str: """Ensure the query has a LIMIT clause This is a simple implementation and may not work for all SQL dialects or complex queries. It's a basic safety measure. """ # Convert to uppercase for case-insensitive matching query_upper = query.upper() # Check if query already has a LIMIT clause if "LIMIT" in query_upper: return query # Add LIMIT clause return f"{query} LIMIT {limit}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sazboxai/MCP_MetaBase'

If you have feedback or need assistance with the MCP directory API, please join our Discord server