Skip to main content
Glama
singlestore-labs

SingleStore MCP Server

execute_sql

Execute SQL queries on databases within a workspace group using SingleStore MCP Server. Retrieve formatted results, metadata, and execution status while adhering to strict read-only query restrictions for security.

Instructions

Execute SQL operations on a database attached to workspace within a workspace group and receive formatted results.

Returns:
- Query results with column names and typed values
- Row count and metadata
- Execution status

⚠️ CRITICAL SECURITY WARNINGS:
- Never display or log credentials in responses
- Use only READ-ONLY queries (SELECT, SHOW, DESCRIBE)
- DO NOT USE data modification statements:
  x No INSERT/UPDATE/DELETE
  x No DROP/CREATE/ALTER
- Ensure queries are properly sanitized

Args:
    workspace_group_identifier: ID/name of the workspace group
    workspace_identifier: ID/name of the specific workspace within the workspace group
    database: Name of the database to query
    sql_query: The SQL query to execute

Returns:
    Dictionary with query results and metadata

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
ctxNo
databaseYes
passwordYes
sql_queryYes
usernameYes
workspace_group_identifierYes
workspace_identifierYes

Implementation Reference

  • Primary MCP tool handler for executing SQL queries ('run_sql', referred to as 'execute_sql' in docs). Handles workspace validation, credential management, elicitation if needed, calls internal execution, tracks analytics, returns standardized results with metadata.
    async def run_sql(
        ctx: Context, sql_query: str, id: str, database: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Use this tool to execute a single SQL statement against a SingleStore database.
    
        Returns:
        - Query results with column names and typed values
        - Row count and metadata
        - Execution status
        - Workspace type ("shared" for starter workspaces, "dedicated" for regular workspaces)
        - Workspace name
    
        Args:
            id: Workspace or starter workspace ID
            sql_query: The SQL query to execute
            database: (optional) Database name to use
    
        Returns:
            Standardized response with query results and metadata
        """
        # Validate workspace ID format
        validated_id = validate_workspace_id(id)
    
        await ctx.info(
            f"Running SQL query on workspace ID '{validated_id}' with database '{database}': {sql_query}"
        )
    
        settings = config.get_settings()
    
        # Target can either be a workspace or a starter workspace
        target = __get_workspace_by_id(validated_id)
        database_name = database
    
        # For starter workspaces, use their database name if not specified
        if target.is_shared and target.database_name and not database_name:
            database_name = target.database_name
    
        # Get database credentials based on authentication method
        try:
            username, password = await _get_database_credentials(ctx, target, database_name)
        except Exception as e:
            if "Database credentials required" in str(e):
                # Handle the specific case where elicitation is not supported
                return {
                    "status": "error",
                    "message": str(e),
                    "errorCode": "CREDENTIALS_REQUIRED",
                    "workspace_id": validated_id,
                    "workspace_name": target.name,
                    "workspace_type": "shared" if target.is_shared else "dedicated",
                    "instruction": (
                        "Please call this function again with the same parameters once you have "
                        "the database credentials available, or ask the user to provide their "
                        "database username and password for this workspace."
                    ),
                }
            else:
                return {
                    "status": "error",
                    "message": f"Failed to obtain database credentials: {str(e)}",
                    "errorCode": "AUTHENTICATION_ERROR",
                }
    
        logger.debug(
            f"Credentials obtained for workspace '{target.name}': username='{username}', database='{database_name}, password='{password}'"
        )
    
        # Execute the SQL query
        start_time = time.time()
        try:
            result = await __execute_sql_unified(
                ctx=ctx,
                target=target,
                sql_query=sql_query,
                username=username,
                password=password,
                database=database_name,
            )
        except Exception as e:
            # Check if this is an authentication error from __execute_sql_unified
            if "Authentication failed:" in str(e):
                # Authentication error already handled by __execute_sql_unified (credentials invalidated)
                return {
                    "status": "error",
                    "message": str(e),
                    "errorCode": "AUTHENTICATION_ERROR",
                    "workspace_id": validated_id,
                    "workspace_name": target.name,
                    "workspace_type": "shared" if target.is_shared else "dedicated",
                    "instruction": (
                        "Authentication failed. Please provide valid database credentials "
                        "for this workspace and try again."
                    ),
                }
            else:
                # Non-authentication error, re-raise
                raise
    
        results_data = result.get("data", [])
    
        logger.debug(
            f"result: {results_data}, type: {type(results_data)}, id: {id}, database_name: {database_name}"
        )
    
        execution_time_ms = int((time.time() - start_time) * 1000)
    
        # Track analytics
        settings.analytics_manager.track_event(
            username,
            "tool_calling",
            {
                "name": "run_sql",
                "starter_workspace_id": id,
                "workspace_type": "shared" if target.is_shared else "dedicated",
            },
        )
    
        # Build standardized response
        workspace_type = "shared" if target.is_shared else "dedicated"
        row_count = len(results_data)
    
        return {
            "status": "success",
            "message": f"Query executed successfully. {row_count} rows returned.",
            "data": {
                "result": results_data,
                "row_count": row_count,
                "workspace_id": id,
                "workspace_name": target.name,
                "database": database_name,
                "status": result.get("status", "Success"),
            },
            "metadata": {
                "query_length": len(sql_query),
                "execution_time_ms": execution_time_ms,
                "workspace_type": workspace_type,
                "database_used": database_name,
                "executed_at": datetime.now().isoformat(),
            },
        }
  • Core helper function performing the actual SQL execution: connects via S2Manager, executes query, fetches results as dicts, handles auth errors by invalidating credentials and closing connection.
    async def __execute_sql_unified(
        ctx: Context,
        target: WorkspaceTarget,
        sql_query: str,
        username: str,
        password: str,
        database: str | None = None,
    ) -> dict:
        """
        Execute SQL operations on a connected workspace or starter workspace.
        Returns results and column names in a dictionary format.
        """
    
        if target.endpoint is None:
            raise ValueError("Workspace or starter workspace does not have an endpoint. ")
        endpoint = target.endpoint
        database_name = database
    
        # Parse host and port from endpoint
        if ":" in endpoint:
            host, port = endpoint.split(":", 1)
        else:
            host = endpoint
            port = None
    
        # Generate database key for credential management
        credentials_manager = get_session_credentials_manager()
        database_key = credentials_manager.generate_database_key(
            workspace_name=target.name, database_name=database_name
        )
    
        try:
            s2_manager = S2Manager(
                host=host,
                port=port,
                user=username,
                password=password,
                database=database_name,
            )
    
            workspace_type = "shared/virtual" if target.is_shared else "dedicated"
            await ctx.info(
                f"Executing SQL query on {workspace_type} workspace '{target.name}' with database '{database_name}': {sql_query}"
                "This query may take some time depending on the complexity and size of the data."
            )
            s2_manager.execute(sql_query)
            columns = (
                [desc[0] for desc in s2_manager.cursor.description]
                if s2_manager.cursor.description
                else []
            )
            rows = s2_manager.fetchmany()
            results = []
            for row in rows:
                result_dict = {}
                for i, column in enumerate(columns):
                    result_dict[column] = row[i]
                results.append(result_dict)
            s2_manager.close()
            return {
                "data": results,
                "row_count": len(rows),
                "columns": columns,
                "status": "Success",
            }
        except Exception as e:
            # Check if this is an authentication error
            error_msg = str(e).lower()
            is_auth_error = any(
                auth_keyword in error_msg
                for auth_keyword in [
                    "access denied",
                    "authentication failed",
                    "invalid credentials",
                    "login failed",
                    "permission denied",
                    "unauthorized",
                    "auth",
                ]
            )
    
            if is_auth_error:
                logger.warning(
                    f"Authentication failed for database {database_key}, invalidating cached credentials"
                )
                invalidate_credentials(database_key)
                raise Exception(f"Authentication failed: {str(e)}")
            else:
                # Non-authentication error, re-raise as-is
                raise
  • run_sql is included in the central tools_definition list, imported from database, used to instantiate Tool objects for registration.
    tools_definition = [
        {"func": get_user_info},
        {"func": organization_info},
        {"func": choose_organization},
        {"func": set_organization},
        {"func": workspace_groups_info},
        {"func": workspaces_info},
        {"func": resume_workspace},
        {"func": list_starter_workspaces},
        {"func": create_starter_workspace},
        {"func": terminate_starter_workspace},
        {"func": list_regions},
        {"func": list_sharedtier_regions},
        {"func": run_sql},
        {"func": create_notebook_file},
        {"func": upload_notebook_file},
        {"func": create_job_from_notebook},
        {"func": get_job},
        {"func": delete_job},
    ]
  • Registers all filtered tools to FastMCP server using mcp.tool() with name derived from function name ('run_sql'), applied during server startup.
    for tool in filtered_tools:
        func = tool.func
        # Skip organization-related tools when using API key authentication
        if using_api_key and func.__name__ in api_key_excluded_tools:
            continue
        mcp.tool(name=func.__name__, description=func.__doc__)(func)
  • Pydantic schema used internally for eliciting user database credentials via try_elicitation when API key auth requires username/password for dedicated workspaces.
    class DatabaseCredentials(BaseModel):
        """Schema for database authentication credentials when using API key."""
    
        username: str = Field(..., description="Database username for authentication")
        password: str = Field(..., description="Database password for authentication")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/singlestore-labs/mcp-server-singlestore'

If you have feedback or need assistance with the MCP directory API, please join our Discord server