Superset MCP Integration

by aptro
Verified
from typing import ( Any, Dict, List, Optional, AsyncIterator, Callable, TypeVar, Awaitable, Union, ) import os import httpx from contextlib import asynccontextmanager from dataclasses import dataclass from functools import wraps import inspect from threading import Thread import webbrowser import uvicorn from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from mcp.server.fastmcp import FastMCP, Context from dotenv import load_dotenv import json """ Superset MCP Integration This module provides a Model Control Protocol (MCP) server for Apache Superset, enabling AI assistants to interact with and control a Superset instance programmatically. It includes tools for: - Authentication and token management - Dashboard operations (list, get, create, update, delete) - Chart management (list, get, create, update, delete) - Database and dataset operations - SQL execution and query management - User information and recent activity tracking - Advanced data type handling - Tag management Each tool follows a consistent naming convention: superset_<category>_<action> """ # Load environment variables from .env file load_dotenv() # Constants SUPERSET_BASE_URL = os.getenv("SUPERSET_BASE_URL", "http://localhost:8088") SUPERSET_USERNAME = os.getenv("SUPERSET_USERNAME") SUPERSET_PASSWORD = os.getenv("SUPERSET_PASSWORD") ACCESS_TOKEN_STORE_PATH = os.path.join(os.path.dirname(__file__), ".superset_token") # Initialize FastAPI app for handling additional web endpoints if needed app = FastAPI(title="Superset MCP Server") @dataclass class SupersetContext: """Typed context for the Superset MCP server""" client: httpx.AsyncClient base_url: str access_token: Optional[str] = None csrf_token: Optional[str] = None app: FastAPI = None def load_stored_token() -> Optional[str]: """Load stored access token if it exists""" try: if os.path.exists(ACCESS_TOKEN_STORE_PATH): with open(ACCESS_TOKEN_STORE_PATH, "r") as f: return f.read().strip() except Exception: return None return None def save_access_token(token: str): """Save access token to file""" try: with open(ACCESS_TOKEN_STORE_PATH, "w") as f: f.write(token) except Exception as e: print(f"Warning: Could not save access token: {e}") @asynccontextmanager async def superset_lifespan(server: FastMCP) -> AsyncIterator[SupersetContext]: """Manage application lifecycle for Superset integration""" print("Initializing Superset context...") # Create HTTP client client = httpx.AsyncClient(base_url=SUPERSET_BASE_URL, timeout=30.0) # Create context ctx = SupersetContext(client=client, base_url=SUPERSET_BASE_URL, app=app) # Try to load existing token stored_token = load_stored_token() if stored_token: ctx.access_token = stored_token # Set the token in the client headers client.headers.update({"Authorization": f"Bearer {stored_token}"}) print("Using stored access token") # Verify token validity try: response = await client.get("/api/v1/me/") if response.status_code != 200: print( f"Stored token is invalid (status {response.status_code}). Will need to re-authenticate." ) ctx.access_token = None client.headers.pop("Authorization", None) except Exception as e: print(f"Error verifying stored token: {e}") ctx.access_token = None client.headers.pop("Authorization", None) try: yield ctx finally: # Cleanup on shutdown print("Shutting down Superset context...") await client.aclose() # Initialize FastMCP server with lifespan and dependencies mcp = FastMCP( "superset", lifespan=superset_lifespan, dependencies=["fastapi", "uvicorn", "python-dotenv", "httpx"], ) # Type variables for generic function annotations T = TypeVar("T") R = TypeVar("R") # ===== Helper Functions and Decorators ===== def requires_auth( func: Callable[..., Awaitable[Dict[str, Any]]], ) -> Callable[..., Awaitable[Dict[str, Any]]]: """Decorator to check authentication before executing a function""" @wraps(func) async def wrapper(ctx: Context, *args, **kwargs) -> Dict[str, Any]: superset_ctx: SupersetContext = ctx.request_context.lifespan_context if not superset_ctx.access_token: return {"error": "Not authenticated. Please authenticate first."} return await func(ctx, *args, **kwargs) return wrapper def handle_api_errors( func: Callable[..., Awaitable[Dict[str, Any]]], ) -> Callable[..., Awaitable[Dict[str, Any]]]: """Decorator to handle API errors in a consistent way""" @wraps(func) async def wrapper(ctx: Context, *args, **kwargs) -> Dict[str, Any]: try: return await func(ctx, *args, **kwargs) except Exception as e: # Extract function name for better error context function_name = func.__name__ return {"error": f"Error in {function_name}: {str(e)}"} return wrapper async def with_auto_refresh( ctx: Context, api_call: Callable[[], Awaitable[httpx.Response]] ) -> httpx.Response: """ Helper function to handle automatic token refreshing for API calls This function will attempt to execute the provided API call. If the call fails with a 401 Unauthorized error, it will try to refresh the token and retry the API call once. Args: ctx: The MCP context api_call: The API call function to execute (should be a callable that returns a response) """ superset_ctx: SupersetContext = ctx.request_context.lifespan_context if not superset_ctx.access_token: raise HTTPException(status_code=401, detail="Not authenticated") # First attempt try: response = await api_call() # If not an auth error, return the response if response.status_code != 401: return response except httpx.HTTPStatusError as e: if e.response.status_code != 401: raise e response = e.response except Exception as e: # For other errors, just raise raise e # If we got a 401, try to refresh the token print("Received 401 Unauthorized. Attempting to refresh token...") refresh_result = await superset_auth_refresh_token(ctx) if refresh_result.get("error"): # If refresh failed, try to re-authenticate print( f"Token refresh failed: {refresh_result.get('error')}. Attempting re-authentication..." ) auth_result = await superset_auth_authenticate_user(ctx) if auth_result.get("error"): # If re-authentication failed, raise an exception raise HTTPException(status_code=401, detail="Authentication failed") # Retry the API call with the new token return await api_call() async def get_csrf_token(ctx: Context) -> Optional[str]: """ Get a CSRF token from Superset Makes a request to the /api/v1/security/csrf_token endpoint to get a token Args: ctx: MCP context """ superset_ctx: SupersetContext = ctx.request_context.lifespan_context client = superset_ctx.client try: response = await client.get("/api/v1/security/csrf_token/") if response.status_code == 200: data = response.json() csrf_token = data.get("result") superset_ctx.csrf_token = csrf_token return csrf_token else: print(f"Failed to get CSRF token: {response.status_code} - {response.text}") return None except Exception as e: print(f"Error getting CSRF token: {str(e)}") return None async def make_api_request( ctx: Context, method: str, endpoint: str, data: Dict[str, Any] = None, params: Dict[str, Any] = None, auto_refresh: bool = True, ) -> Dict[str, Any]: """ Helper function to make API requests to Superset Args: ctx: MCP context method: HTTP method (get, post, put, delete) endpoint: API endpoint (without base URL) data: Optional JSON payload for POST/PUT requests params: Optional query parameters auto_refresh: Whether to auto-refresh token on 401 """ superset_ctx: SupersetContext = ctx.request_context.lifespan_context client = superset_ctx.client # For non-GET requests, make sure we have a CSRF token if method.lower() != "get" and not superset_ctx.csrf_token: await get_csrf_token(ctx) async def make_request() -> httpx.Response: headers = {} # Add CSRF token for non-GET requests if method.lower() != "get" and superset_ctx.csrf_token: headers["X-CSRFToken"] = superset_ctx.csrf_token if method.lower() == "get": return await client.get(endpoint, params=params) elif method.lower() == "post": return await client.post( endpoint, json=data, params=params, headers=headers ) elif method.lower() == "put": return await client.put(endpoint, json=data, headers=headers) elif method.lower() == "delete": return await client.delete(endpoint, headers=headers) else: raise ValueError(f"Unsupported HTTP method: {method}") # Use auto_refresh if requested response = ( await with_auto_refresh(ctx, make_request) if auto_refresh else await make_request() ) if response.status_code not in [200, 201]: return { "error": f"API request failed: {response.status_code} - {response.text}" } return response.json() # ===== Authentication Tools ===== @mcp.tool() @handle_api_errors async def superset_auth_check_token_validity(ctx: Context) -> Dict[str, Any]: """ Check if the current access token is still valid Makes a request to the /api/v1/me/ endpoint to test if the current token is valid. Use this to verify authentication status before making other API calls. Returns: A dictionary with token validity status and any error information """ superset_ctx: SupersetContext = ctx.request_context.lifespan_context if not superset_ctx.access_token: return {"valid": False, "error": "No access token available"} try: # Make a simple API call to test if token is valid (get user info) response = await superset_ctx.client.get("/api/v1/me/") if response.status_code == 200: return {"valid": True} else: return { "valid": False, "status_code": response.status_code, "error": response.text, } except Exception as e: return {"valid": False, "error": str(e)} @mcp.tool() @handle_api_errors async def superset_auth_refresh_token(ctx: Context) -> Dict[str, Any]: """ Refresh the access token using the refresh endpoint Makes a request to the /api/v1/security/refresh endpoint to get a new access token without requiring re-authentication with username/password. Returns: A dictionary with the new access token or error information """ superset_ctx: SupersetContext = ctx.request_context.lifespan_context if not superset_ctx.access_token: return {"error": "No access token to refresh. Please authenticate first."} try: # Use the refresh endpoint to get a new token response = await superset_ctx.client.post("/api/v1/security/refresh") if response.status_code != 200: return { "error": f"Failed to refresh token: {response.status_code} - {response.text}" } data = response.json() access_token = data.get("access_token") if not access_token: return {"error": "No access token returned from refresh"} # Save and set the new access token save_access_token(access_token) superset_ctx.access_token = access_token superset_ctx.client.headers.update({"Authorization": f"Bearer {access_token}"}) return { "message": "Successfully refreshed access token", "access_token": access_token, } except Exception as e: return {"error": f"Error refreshing token: {str(e)}"} @mcp.tool() @handle_api_errors async def superset_auth_authenticate_user( ctx: Context, username: Optional[str] = None, password: Optional[str] = None, refresh: bool = True, ) -> Dict[str, Any]: """ Authenticate with Superset and get access token Makes a request to the /api/v1/security/login endpoint to authenticate and obtain an access token. If there's an existing token, will first try to check its validity. If invalid, will attempt to refresh token before falling back to re-authentication. Args: username: Superset username (falls back to environment variable if not provided) password: Superset password (falls back to environment variable if not provided) refresh: Whether to refresh the token if invalid (defaults to True) Returns: A dictionary with authentication status and access token or error information """ superset_ctx: SupersetContext = ctx.request_context.lifespan_context # If we already have a token, check if it's valid if superset_ctx.access_token: validity = await superset_auth_check_token_validity(ctx) if validity.get("valid"): return { "message": "Already authenticated with valid token", "access_token": superset_ctx.access_token, } # Token invalid, try to refresh if requested if refresh: refresh_result = await superset_auth_refresh_token(ctx) if not refresh_result.get("error"): return refresh_result # If refresh fails, fall back to re-authentication # Use provided credentials or fall back to env vars username = username or SUPERSET_USERNAME password = password or SUPERSET_PASSWORD if not username or not password: return { "error": "Username and password must be provided either as arguments or set in environment variables" } try: # Get access token directly using the security login API endpoint response = await superset_ctx.client.post( "/api/v1/security/login", json={ "username": username, "password": password, "provider": "db", "refresh": refresh, }, ) if response.status_code != 200: return { "error": f"Failed to get access token: {response.status_code} - {response.text}" } data = response.json() access_token = data.get("access_token") if not access_token: return {"error": "No access token returned"} # Save and set the access token save_access_token(access_token) superset_ctx.access_token = access_token superset_ctx.client.headers.update({"Authorization": f"Bearer {access_token}"}) # Get CSRF token after successful authentication await get_csrf_token(ctx) return { "message": "Successfully authenticated with Superset", "access_token": access_token, } except Exception as e: return {"error": f"Authentication error: {str(e)}"} # ===== Dashboard Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_dashboard_list(ctx: Context) -> Dict[str, Any]: """ Get a list of dashboards from Superset Makes a request to the /api/v1/dashboard/ endpoint to retrieve all dashboards the current user has access to view. Results are paginated. Returns: A dictionary containing dashboard data including id, title, url, and metadata """ return await make_api_request(ctx, "get", "/api/v1/dashboard/") @mcp.tool() @requires_auth @handle_api_errors async def superset_dashboard_get_by_id( ctx: Context, dashboard_id: int ) -> Dict[str, Any]: """ Get details for a specific dashboard Makes a request to the /api/v1/dashboard/{id} endpoint to retrieve detailed information about a specific dashboard. Args: dashboard_id: ID of the dashboard to retrieve Returns: A dictionary with complete dashboard information including components and layout """ return await make_api_request(ctx, "get", f"/api/v1/dashboard/{dashboard_id}") @mcp.tool() @requires_auth @handle_api_errors async def superset_dashboard_create( ctx: Context, dashboard_title: str, json_metadata: Dict[str, Any] = None ) -> Dict[str, Any]: """ Create a new dashboard in Superset Makes a request to the /api/v1/dashboard/ POST endpoint to create a new dashboard. Args: dashboard_title: Title of the dashboard json_metadata: Optional JSON metadata for dashboard configuration, can include layout, color scheme, and filter configuration Returns: A dictionary with the created dashboard information including its ID """ payload = {"dashboard_title": dashboard_title} if json_metadata: payload["json_metadata"] = json_metadata return await make_api_request(ctx, "post", "/api/v1/dashboard/", data=payload) @mcp.tool() @requires_auth @handle_api_errors async def superset_dashboard_update( ctx: Context, dashboard_id: int, data: Dict[str, Any] ) -> Dict[str, Any]: """ Update an existing dashboard Makes a request to the /api/v1/dashboard/{id} PUT endpoint to update dashboard properties. Args: dashboard_id: ID of the dashboard to update data: Data to update, can include dashboard_title, slug, owners, position, and metadata Returns: A dictionary with the updated dashboard information """ return await make_api_request( ctx, "put", f"/api/v1/dashboard/{dashboard_id}", data=data ) @mcp.tool() @requires_auth @handle_api_errors async def superset_dashboard_delete(ctx: Context, dashboard_id: int) -> Dict[str, Any]: """ Delete a dashboard Makes a request to the /api/v1/dashboard/{id} DELETE endpoint to remove a dashboard. This operation is permanent and cannot be undone. Args: dashboard_id: ID of the dashboard to delete Returns: A dictionary with deletion confirmation message """ response = await make_api_request( ctx, "delete", f"/api/v1/dashboard/{dashboard_id}" ) # For delete endpoints, we may want a custom success message if not response.get("error"): return {"message": f"Dashboard {dashboard_id} deleted successfully"} return response # ===== Chart Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_chart_list(ctx: Context) -> Dict[str, Any]: """ Get a list of charts from Superset Makes a request to the /api/v1/chart/ endpoint to retrieve all charts the current user has access to view. Results are paginated. Returns: A dictionary containing chart data including id, slice_name, viz_type, and datasource info """ return await make_api_request(ctx, "get", "/api/v1/chart/") @mcp.tool() @requires_auth @handle_api_errors async def superset_chart_get_by_id(ctx: Context, chart_id: int) -> Dict[str, Any]: """ Get details for a specific chart Makes a request to the /api/v1/chart/{id} endpoint to retrieve detailed information about a specific chart/slice. Args: chart_id: ID of the chart to retrieve Returns: A dictionary with complete chart information including visualization configuration """ return await make_api_request(ctx, "get", f"/api/v1/chart/{chart_id}") @mcp.tool() @requires_auth @handle_api_errors async def superset_chart_create( ctx: Context, slice_name: str, datasource_id: int, datasource_type: str, viz_type: str, params: Dict[str, Any], ) -> Dict[str, Any]: """ Create a new chart in Superset Makes a request to the /api/v1/chart/ POST endpoint to create a new visualization. Args: slice_name: Name/title of the chart datasource_id: ID of the dataset or SQL table datasource_type: Type of datasource ('table' for datasets, 'query' for SQL) viz_type: Visualization type (e.g., 'bar', 'line', 'pie', 'big_number', etc.) params: Visualization parameters including metrics, groupby, time_range, etc. Returns: A dictionary with the created chart information including its ID """ payload = { "slice_name": slice_name, "datasource_id": datasource_id, "datasource_type": datasource_type, "viz_type": viz_type, "params": params, } return await make_api_request(ctx, "post", "/api/v1/chart/", data=payload) @mcp.tool() @requires_auth @handle_api_errors async def superset_chart_update( ctx: Context, chart_id: int, data: Dict[str, Any] ) -> Dict[str, Any]: """ Update an existing chart Makes a request to the /api/v1/chart/{id} PUT endpoint to update chart properties and visualization settings. Args: chart_id: ID of the chart to update data: Data to update, can include slice_name, description, viz_type, params, etc. Returns: A dictionary with the updated chart information """ return await make_api_request(ctx, "put", f"/api/v1/chart/{chart_id}", data=data) @mcp.tool() @requires_auth @handle_api_errors async def superset_chart_delete(ctx: Context, chart_id: int) -> Dict[str, Any]: """ Delete a chart Makes a request to the /api/v1/chart/{id} DELETE endpoint to remove a chart. This operation is permanent and cannot be undone. Args: chart_id: ID of the chart to delete Returns: A dictionary with deletion confirmation message """ response = await make_api_request(ctx, "delete", f"/api/v1/chart/{chart_id}") if not response.get("error"): return {"message": f"Chart {chart_id} deleted successfully"} return response # ===== Database Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_database_list(ctx: Context) -> Dict[str, Any]: """ Get a list of databases from Superset Makes a request to the /api/v1/database/ endpoint to retrieve all database connections the current user has access to. Results are paginated. Returns: A dictionary containing database connection information including id, name, and configuration """ return await make_api_request(ctx, "get", "/api/v1/database/") @mcp.tool() @requires_auth @handle_api_errors async def superset_database_get_by_id(ctx: Context, database_id: int) -> Dict[str, Any]: """ Get details for a specific database Makes a request to the /api/v1/database/{id} endpoint to retrieve detailed information about a specific database connection. Args: database_id: ID of the database to retrieve Returns: A dictionary with complete database configuration information """ return await make_api_request(ctx, "get", f"/api/v1/database/{database_id}") @mcp.tool() @requires_auth @handle_api_errors async def superset_database_create( ctx: Context, engine: str, configuration_method: str, database_name: str, sqlalchemy_uri: str, ) -> Dict[str, Any]: """ Create a new database connection in Superset IMPORTANT: Don't call this tool, unless user have given connection details. This function will only create database connections with explicit user consent and input. No default values or assumptions will be made without user confirmation. All connection parameters, including sensitive credentials, must be explicitly provided by the user. Makes a POST request to /api/v1/database/ to create a new database connection in Superset. The endpoint requires a valid SQLAlchemy URI and database configuration parameters. The engine parameter will be automatically determined from the SQLAlchemy URI prefix if not specified: - 'postgresql://' -> engine='postgresql' - 'mysql://' -> engine='mysql' - 'mssql://' -> engine='mssql' - 'oracle://' -> engine='oracle' - 'sqlite://' -> engine='sqlite' The SQLAlchemy URI must follow the format: dialect+driver://username:password@host:port/database If the URI is not provided, the function will prompt for individual connection parameters to construct it. All required parameters must be provided and validated before creating the connection. The configuration_method parameter should typically be set to 'sqlalchemy_form'. Args: engine: Database engine (e.g., 'postgresql', 'mysql', etc.) configuration_method: Method used for configuration (typically 'sqlalchemy_form') database_name: Name for the database connection sqlalchemy_uri: SQLAlchemy URI for the connection (e.g., 'postgresql://user:pass@host/db') Returns: A dictionary with the created database connection information including its ID """ payload = { "engine": engine, "configuration_method": configuration_method, "database_name": database_name, "sqlalchemy_uri": sqlalchemy_uri, "allow_dml": True, "allow_cvas": True, "allow_ctas": True, "expose_in_sqllab": True, } return await make_api_request(ctx, "post", "/api/v1/database/", data=payload) @mcp.tool() @requires_auth @handle_api_errors async def superset_database_get_tables( ctx: Context, database_id: int ) -> Dict[str, Any]: """ Get a list of tables for a given database Makes a request to the /api/v1/database/{id}/tables/ endpoint to retrieve all tables available in the database. Args: database_id: ID of the database Returns: A dictionary with list of tables including schema and table name information """ return await make_api_request(ctx, "get", f"/api/v1/database/{database_id}/tables/") @mcp.tool() @requires_auth @handle_api_errors async def superset_database_schemas(ctx: Context, database_id: int) -> Dict[str, Any]: """ Get schemas for a specific database Makes a request to the /api/v1/database/{id}/schemas/ endpoint to retrieve all schemas available in the database. Args: database_id: ID of the database Returns: A dictionary with list of schema names """ return await make_api_request( ctx, "get", f"/api/v1/database/{database_id}/schemas/" ) @mcp.tool() @requires_auth @handle_api_errors async def superset_database_test_connection( ctx: Context, database_data: Dict[str, Any] ) -> Dict[str, Any]: """ Test a database connection Makes a request to the /api/v1/database/test_connection endpoint to verify if the provided connection details can successfully connect to the database. Args: database_data: Database connection details including sqlalchemy_uri and other parameters Returns: A dictionary with connection test results """ return await make_api_request( ctx, "post", "/api/v1/database/test_connection", data=database_data ) @mcp.tool() @requires_auth @handle_api_errors async def superset_database_update( ctx: Context, database_id: int, data: Dict[str, Any] ) -> Dict[str, Any]: """ Update an existing database connection Makes a request to the /api/v1/database/{id} PUT endpoint to update database connection properties. Args: database_id: ID of the database to update data: Data to update, can include database_name, sqlalchemy_uri, password, and extra configs Returns: A dictionary with the updated database information """ return await make_api_request( ctx, "put", f"/api/v1/database/{database_id}", data=data ) @mcp.tool() @requires_auth @handle_api_errors async def superset_database_delete(ctx: Context, database_id: int) -> Dict[str, Any]: """ Delete a database connection Makes a request to the /api/v1/database/{id} DELETE endpoint to remove a database connection. This operation is permanent and cannot be undone. This will also remove associated datasets. Args: database_id: ID of the database to delete Returns: A dictionary with deletion confirmation message """ response = await make_api_request(ctx, "delete", f"/api/v1/database/{database_id}") if not response.get("error"): return {"message": f"Database {database_id} deleted successfully"} return response @mcp.tool() @requires_auth @handle_api_errors async def superset_database_get_catalogs( ctx: Context, database_id: int ) -> Dict[str, Any]: """ Get all catalogs from a database Makes a request to the /api/v1/database/{id}/catalogs/ endpoint to retrieve all catalogs available in the database. Args: database_id: ID of the database Returns: A dictionary with list of catalog names for databases that support catalogs """ return await make_api_request( ctx, "get", f"/api/v1/database/{database_id}/catalogs/" ) @mcp.tool() @requires_auth @handle_api_errors async def superset_database_get_connection( ctx: Context, database_id: int ) -> Dict[str, Any]: """ Get database connection information Makes a request to the /api/v1/database/{id}/connection endpoint to retrieve connection details for a specific database. Args: database_id: ID of the database Returns: A dictionary with detailed connection information """ return await make_api_request( ctx, "get", f"/api/v1/database/{database_id}/connection" ) @mcp.tool() @requires_auth @handle_api_errors async def superset_database_get_function_names( ctx: Context, database_id: int ) -> Dict[str, Any]: """ Get function names supported by a database Makes a request to the /api/v1/database/{id}/function_names/ endpoint to retrieve all SQL functions supported by the database. Args: database_id: ID of the database Returns: A dictionary with list of supported function names """ return await make_api_request( ctx, "get", f"/api/v1/database/{database_id}/function_names/" ) @mcp.tool() @requires_auth @handle_api_errors async def superset_database_get_related_objects( ctx: Context, database_id: int ) -> Dict[str, Any]: """ Get charts and dashboards associated with a database Makes a request to the /api/v1/database/{id}/related_objects/ endpoint to retrieve counts and references of charts and dashboards that depend on this database. Args: database_id: ID of the database Returns: A dictionary with counts and lists of related charts and dashboards """ return await make_api_request( ctx, "get", f"/api/v1/database/{database_id}/related_objects/" ) @mcp.tool() @requires_auth @handle_api_errors async def superset_database_validate_sql( ctx: Context, database_id: int, sql: str ) -> Dict[str, Any]: """ Validate arbitrary SQL against a database Makes a request to the /api/v1/database/{id}/validate_sql/ endpoint to check if the provided SQL is valid for the specified database. Args: database_id: ID of the database sql: SQL query to validate Returns: A dictionary with validation results """ payload = {"sql": sql} return await make_api_request( ctx, "post", f"/api/v1/database/{database_id}/validate_sql/", data=payload ) @mcp.tool() @requires_auth @handle_api_errors async def superset_database_validate_parameters( ctx: Context, parameters: Dict[str, Any] ) -> Dict[str, Any]: """ Validate database connection parameters Makes a request to the /api/v1/database/validate_parameters/ endpoint to verify if the provided connection parameters are valid without creating a connection. Args: parameters: Connection parameters to validate Returns: A dictionary with validation results """ return await make_api_request( ctx, "post", "/api/v1/database/validate_parameters/", data=parameters ) # ===== Dataset Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_dataset_list(ctx: Context) -> Dict[str, Any]: """ Get a list of datasets from Superset Makes a request to the /api/v1/dataset/ endpoint to retrieve all datasets the current user has access to view. Results are paginated. Returns: A dictionary containing dataset information including id, table_name, and database """ return await make_api_request(ctx, "get", "/api/v1/dataset/") @mcp.tool() @requires_auth @handle_api_errors async def superset_dataset_get_by_id(ctx: Context, dataset_id: int) -> Dict[str, Any]: """ Get details for a specific dataset Makes a request to the /api/v1/dataset/{id} endpoint to retrieve detailed information about a specific dataset including columns and metrics. Args: dataset_id: ID of the dataset to retrieve Returns: A dictionary with complete dataset information """ return await make_api_request(ctx, "get", f"/api/v1/dataset/{dataset_id}") @mcp.tool() @requires_auth @handle_api_errors async def superset_dataset_create( ctx: Context, table_name: str, database_id: int, schema: str = None, owners: List[int] = None, ) -> Dict[str, Any]: """ Create a new dataset in Superset Makes a request to the /api/v1/dataset/ POST endpoint to create a new dataset from an existing database table or view. Args: table_name: Name of the physical table in the database database_id: ID of the database where the table exists schema: Optional database schema name where the table is located owners: Optional list of user IDs who should own this dataset Returns: A dictionary with the created dataset information including its ID """ payload = { "table_name": table_name, "database": database_id, } if schema: payload["schema"] = schema if owners: payload["owners"] = owners return await make_api_request(ctx, "post", "/api/v1/dataset/", data=payload) # ===== SQL Lab Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_sqllab_execute_query( ctx: Context, database_id: int, sql: str ) -> Dict[str, Any]: """ Execute a SQL query in SQL Lab Makes a request to the /api/v1/sqllab/execute/ endpoint to run a SQL query against the specified database. Args: database_id: ID of the database to query sql: SQL query to execute Returns: A dictionary with query results or execution status for async queries """ # Ensure we have a CSRF token before executing the query superset_ctx: SupersetContext = ctx.request_context.lifespan_context if not superset_ctx.csrf_token: await get_csrf_token(ctx) payload = { "database_id": database_id, "sql": sql, "schema": "", "tab": "MCP Query", "runAsync": False, "select_as_cta": False, } return await make_api_request(ctx, "post", "/api/v1/sqllab/execute/", data=payload) @mcp.tool() @requires_auth @handle_api_errors async def superset_sqllab_get_saved_queries(ctx: Context) -> Dict[str, Any]: """ Get a list of saved queries from SQL Lab Makes a request to the /api/v1/saved_query/ endpoint to retrieve all saved queries the current user has access to. Results are paginated. Returns: A dictionary containing saved query information including id, label, and database """ return await make_api_request(ctx, "get", "/api/v1/saved_query/") @mcp.tool() @requires_auth @handle_api_errors async def superset_sqllab_format_sql(ctx: Context, sql: str) -> Dict[str, Any]: """ Format a SQL query for better readability Makes a request to the /api/v1/sqllab/format_sql endpoint to apply standard formatting rules to the provided SQL query. Args: sql: SQL query to format Returns: A dictionary with the formatted SQL """ payload = {"sql": sql} return await make_api_request( ctx, "post", "/api/v1/sqllab/format_sql", data=payload ) @mcp.tool() @requires_auth @handle_api_errors async def superset_sqllab_get_results(ctx: Context, key: str) -> Dict[str, Any]: """ Get results of a previously executed SQL query Makes a request to the /api/v1/sqllab/results/ endpoint to retrieve results for an asynchronous query using its result key. Args: key: Result key to retrieve Returns: A dictionary with query results including column information and data rows """ return await make_api_request( ctx, "get", f"/api/v1/sqllab/results/", params={"key": key} ) @mcp.tool() @requires_auth @handle_api_errors async def superset_sqllab_estimate_query_cost( ctx: Context, database_id: int, sql: str, schema: str = None ) -> Dict[str, Any]: """ Estimate the cost of executing a SQL query Makes a request to the /api/v1/sqllab/estimate endpoint to get approximate cost information for a query before executing it. Args: database_id: ID of the database sql: SQL query to estimate schema: Optional schema name Returns: A dictionary with estimated query cost metrics """ payload = { "database_id": database_id, "sql": sql, } if schema: payload["schema"] = schema return await make_api_request(ctx, "post", "/api/v1/sqllab/estimate", data=payload) @mcp.tool() @requires_auth @handle_api_errors async def superset_sqllab_export_query_results( ctx: Context, client_id: str ) -> Dict[str, Any]: """ Export the results of a SQL query to CSV Makes a request to the /api/v1/sqllab/export/{client_id} endpoint to download query results in CSV format. Args: client_id: Client ID of the query Returns: A dictionary with the exported data or error information """ superset_ctx: SupersetContext = ctx.request_context.lifespan_context try: response = await superset_ctx.client.get(f"/api/v1/sqllab/export/{client_id}") if response.status_code != 200: return { "error": f"Failed to export query results: {response.status_code} - {response.text}" } return {"message": "Query results exported successfully", "data": response.text} except Exception as e: return {"error": f"Error exporting query results: {str(e)}"} @mcp.tool() @requires_auth @handle_api_errors async def superset_sqllab_get_bootstrap_data(ctx: Context) -> Dict[str, Any]: """ Get the bootstrap data for SQL Lab Makes a request to the /api/v1/sqllab/ endpoint to retrieve configuration data needed for the SQL Lab interface. Returns: A dictionary with SQL Lab configuration including allowed databases and settings """ return await make_api_request(ctx, "get", "/api/v1/sqllab/") # ===== Saved Query Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_saved_query_get_by_id(ctx: Context, query_id: int) -> Dict[str, Any]: """ Get details for a specific saved query Makes a request to the /api/v1/saved_query/{id} endpoint to retrieve information about a saved SQL query. Args: query_id: ID of the saved query to retrieve Returns: A dictionary with the saved query details including SQL text and database """ return await make_api_request(ctx, "get", f"/api/v1/saved_query/{query_id}") @mcp.tool() @requires_auth @handle_api_errors async def superset_saved_query_create( ctx: Context, query_data: Dict[str, Any] ) -> Dict[str, Any]: """ Create a new saved query Makes a request to the /api/v1/saved_query/ POST endpoint to save a SQL query for later reuse. Args: query_data: Dictionary containing the query information including: - db_id: Database ID - schema: Schema name (optional) - sql: SQL query text - label: Display name for the saved query - description: Optional description of the query Returns: A dictionary with the created saved query information including its ID """ return await make_api_request(ctx, "post", "/api/v1/saved_query/", data=query_data) # ===== Query Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_query_stop(ctx: Context, client_id: str) -> Dict[str, Any]: """ Stop a running query Makes a request to the /api/v1/query/stop endpoint to terminate a query that is currently running. Args: client_id: Client ID of the query to stop Returns: A dictionary with confirmation of query termination """ payload = {"client_id": client_id} return await make_api_request(ctx, "post", "/api/v1/query/stop", data=payload) @mcp.tool() @requires_auth @handle_api_errors async def superset_query_list(ctx: Context) -> Dict[str, Any]: """ Get a list of queries from Superset Makes a request to the /api/v1/query/ endpoint to retrieve query history. Results are paginated and include both finished and running queries. Returns: A dictionary containing query information including status, duration, and SQL """ return await make_api_request(ctx, "get", "/api/v1/query/") @mcp.tool() @requires_auth @handle_api_errors async def superset_query_get_by_id(ctx: Context, query_id: int) -> Dict[str, Any]: """ Get details for a specific query Makes a request to the /api/v1/query/{id} endpoint to retrieve detailed information about a specific query execution. Args: query_id: ID of the query to retrieve Returns: A dictionary with complete query execution information """ return await make_api_request(ctx, "get", f"/api/v1/query/{query_id}") # ===== Activity and User Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_activity_get_recent(ctx: Context) -> Dict[str, Any]: """ Get recent activity data for the current user Makes a request to the /api/v1/log/recent_activity/ endpoint to retrieve a history of actions performed by the current user. Returns: A dictionary with recent user activities including viewed charts and dashboards """ return await make_api_request(ctx, "get", "/api/v1/log/recent_activity/") @mcp.tool() @requires_auth @handle_api_errors async def superset_user_get_current(ctx: Context) -> Dict[str, Any]: """ Get information about the currently authenticated user Makes a request to the /api/v1/me/ endpoint to retrieve the user's profile information including permissions and preferences. Returns: A dictionary with user profile data """ return await make_api_request(ctx, "get", "/api/v1/me/") @mcp.tool() @requires_auth @handle_api_errors async def superset_user_get_roles(ctx: Context) -> Dict[str, Any]: """ Get roles for the current user Makes a request to the /api/v1/me/roles/ endpoint to retrieve all roles assigned to the current user. Returns: A dictionary with user role information """ return await make_api_request(ctx, "get", "/api/v1/me/roles/") # ===== Tag Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_tag_list(ctx: Context) -> Dict[str, Any]: """ Get a list of tags from Superset Makes a request to the /api/v1/tag/ endpoint to retrieve all tags defined in the Superset instance. Returns: A dictionary containing tag information including id and name """ return await make_api_request(ctx, "get", "/api/v1/tag/") @mcp.tool() @requires_auth @handle_api_errors async def superset_tag_create(ctx: Context, name: str) -> Dict[str, Any]: """ Create a new tag in Superset Makes a request to the /api/v1/tag/ POST endpoint to create a new tag that can be applied to objects like charts and dashboards. Args: name: Name for the tag Returns: A dictionary with the created tag information """ payload = {"name": name} return await make_api_request(ctx, "post", "/api/v1/tag/", data=payload) @mcp.tool() @requires_auth @handle_api_errors async def superset_tag_get_by_id(ctx: Context, tag_id: int) -> Dict[str, Any]: """ Get details for a specific tag Makes a request to the /api/v1/tag/{id} endpoint to retrieve information about a specific tag. Args: tag_id: ID of the tag to retrieve Returns: A dictionary with tag details """ return await make_api_request(ctx, "get", f"/api/v1/tag/{tag_id}") @mcp.tool() @requires_auth @handle_api_errors async def superset_tag_objects(ctx: Context) -> Dict[str, Any]: """ Get objects associated with tags Makes a request to the /api/v1/tag/get_objects/ endpoint to retrieve all objects that have tags assigned to them. Returns: A dictionary with tagged objects grouped by tag """ return await make_api_request(ctx, "get", "/api/v1/tag/get_objects/") @mcp.tool() @requires_auth @handle_api_errors async def superset_tag_delete(ctx: Context, tag_id: int) -> Dict[str, Any]: """ Delete a tag Makes a request to the /api/v1/tag/{id} DELETE endpoint to remove a tag. This operation is permanent and cannot be undone. Args: tag_id: ID of the tag to delete Returns: A dictionary with deletion confirmation message """ response = await make_api_request(ctx, "delete", f"/api/v1/tag/{tag_id}") if not response.get("error"): return {"message": f"Tag {tag_id} deleted successfully"} return response @mcp.tool() @requires_auth @handle_api_errors async def superset_tag_object_add( ctx: Context, object_type: str, object_id: int, tag_name: str ) -> Dict[str, Any]: """ Add a tag to an object Makes a request to tag an object with a specific tag. This creates an association between the tag and the specified object (chart, dashboard, etc.) Args: object_type: Type of the object ('chart', 'dashboard', etc.) object_id: ID of the object to tag tag_name: Name of the tag to apply Returns: A dictionary with the tagging confirmation """ payload = { "object_type": object_type, "object_id": object_id, "tag_name": tag_name, } return await make_api_request( ctx, "post", "/api/v1/tag/tagged_objects", data=payload ) @mcp.tool() @requires_auth @handle_api_errors async def superset_tag_object_remove( ctx: Context, object_type: str, object_id: int, tag_name: str ) -> Dict[str, Any]: """ Remove a tag from an object Makes a request to remove a tag association from a specific object. Args: object_type: Type of the object ('chart', 'dashboard', etc.) object_id: ID of the object to untag tag_name: Name of the tag to remove Returns: A dictionary with the untagging confirmation message """ response = await make_api_request( ctx, "delete", f"/api/v1/tag/{object_type}/{object_id}", params={"tag_name": tag_name}, ) if not response.get("error"): return { "message": f"Tag '{tag_name}' removed from {object_type} {object_id} successfully" } return response # ===== Explore Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_explore_form_data_create( ctx: Context, form_data: Dict[str, Any] ) -> Dict[str, Any]: """ Create form data for chart exploration Makes a request to the /api/v1/explore/form_data POST endpoint to store chart configuration data temporarily. Args: form_data: Chart configuration including datasource, metrics, and visualization settings Returns: A dictionary with a key that can be used to retrieve the form data """ return await make_api_request( ctx, "post", "/api/v1/explore/form_data", data=form_data ) @mcp.tool() @requires_auth @handle_api_errors async def superset_explore_form_data_get(ctx: Context, key: str) -> Dict[str, Any]: """ Get form data for chart exploration Makes a request to the /api/v1/explore/form_data/{key} endpoint to retrieve previously stored chart configuration. Args: key: Key of the form data to retrieve Returns: A dictionary with the stored chart configuration """ return await make_api_request(ctx, "get", f"/api/v1/explore/form_data/{key}") @mcp.tool() @requires_auth @handle_api_errors async def superset_explore_permalink_create( ctx: Context, state: Dict[str, Any] ) -> Dict[str, Any]: """ Create a permalink for chart exploration Makes a request to the /api/v1/explore/permalink POST endpoint to generate a shareable link to a specific chart exploration state. Args: state: State data for the permalink including form_data Returns: A dictionary with a key that can be used to access the permalink """ return await make_api_request(ctx, "post", "/api/v1/explore/permalink", data=state) @mcp.tool() @requires_auth @handle_api_errors async def superset_explore_permalink_get(ctx: Context, key: str) -> Dict[str, Any]: """ Get a permalink for chart exploration Makes a request to the /api/v1/explore/permalink/{key} endpoint to retrieve a previously saved exploration state. Args: key: Key of the permalink to retrieve Returns: A dictionary with the stored exploration state """ return await make_api_request(ctx, "get", f"/api/v1/explore/permalink/{key}") # ===== Menu Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_menu_get(ctx: Context) -> Dict[str, Any]: """ Get the Superset menu data Makes a request to the /api/v1/menu/ endpoint to retrieve the navigation menu structure based on user permissions. Returns: A dictionary with menu items and their configurations """ return await make_api_request(ctx, "get", "/api/v1/menu/") # ===== Configuration Tools ===== @mcp.tool() @handle_api_errors async def superset_config_get_base_url(ctx: Context) -> Dict[str, Any]: """ Get the base URL of the Superset instance Returns the configured Superset base URL that this MCP server is connecting to. This can be useful for constructing full URLs to Superset resources or for displaying information about the connected instance. This tool does not require authentication as it only returns configuration information. Returns: A dictionary with the Superset base URL """ superset_ctx: SupersetContext = ctx.request_context.lifespan_context return { "base_url": superset_ctx.base_url, "message": f"Connected to Superset instance at: {superset_ctx.base_url}", } # ===== Advanced Data Type Tools ===== @mcp.tool() @requires_auth @handle_api_errors async def superset_advanced_data_type_convert( ctx: Context, type_name: str, value: Any ) -> Dict[str, Any]: """ Convert a value to an advanced data type Makes a request to the /api/v1/advanced_data_type/convert endpoint to transform a value into the specified advanced data type format. Args: type_name: Name of the advanced data type value: Value to convert Returns: A dictionary with the converted value """ params = { "type_name": type_name, "value": value, } return await make_api_request( ctx, "get", "/api/v1/advanced_data_type/convert", params=params ) @mcp.tool() @requires_auth @handle_api_errors async def superset_advanced_data_type_list(ctx: Context) -> Dict[str, Any]: """ Get list of available advanced data types Makes a request to the /api/v1/advanced_data_type/types endpoint to retrieve all advanced data types supported by this Superset instance. Returns: A dictionary with available advanced data types and their configurations """ return await make_api_request(ctx, "get", "/api/v1/advanced_data_type/types") if __name__ == "__main__": print("Starting Superset MCP server...") mcp.run()