Skip to main content
Glama
singlestore-labs

SingleStore MCP Server

execute_sql_on_virtual_workspace

Execute read-only SQL queries on a SingleStore starter workspace to retrieve formatted results, column metadata, row count, and execution status securely. Use only SELECT statements to ensure data integrity.

Instructions

Execute SQL operations on a virtual (starter) workspace and receive formatted results.

Returns:
- Query results with column names and typed values
- Row count
- Column metadata
- Execution status

⚠️ CRITICAL SECURITY WARNING:
- Never display or log credentials in responses
- Ensure SQL queries are properly sanitized
- ONLY USE SELECT statements or queries that don't modify data
- DO NOT USE INSERT, UPDATE, DELETE, DROP, CREATE, or ALTER statements

Args:
    virtual_workspace_id: Unique identifier of the starter workspace
    sql_query: The SQL query to execute (READ-ONLY queries only)

Returns:
    Dictionary with query results and metadata

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
ctxNo
passwordYes
sql_queryYes
usernameYes
virtual_workspace_idYes

Implementation Reference

  • Main handler function `run_sql` that executes SQL queries on both dedicated workspaces and virtual starter workspaces (via sharedtier/virtualWorkspaces API). This is the tool implementation for running SQL on virtual workspaces.
    async def run_sql(
        ctx: Context, sql_query: str, id: str, database: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Use this tool to execute a single SQL statement against a SingleStore database.
    
        Returns:
        - Query results with column names and typed values
        - Row count and metadata
        - Execution status
        - Workspace type ("shared" for starter workspaces, "dedicated" for regular workspaces)
        - Workspace name
    
        Args:
            id: Workspace or starter workspace ID
            sql_query: The SQL query to execute
            database: (optional) Database name to use
    
        Returns:
            Standardized response with query results and metadata
        """
        # Validate workspace ID format
        validated_id = validate_workspace_id(id)
    
        await ctx.info(
            f"Running SQL query on workspace ID '{validated_id}' with database '{database}': {sql_query}"
        )
    
        settings = config.get_settings()
    
        # Target can either be a workspace or a starter workspace
        target = __get_workspace_by_id(validated_id)
        database_name = database
    
        # For starter workspaces, use their database name if not specified
        if target.is_shared and target.database_name and not database_name:
            database_name = target.database_name
    
        # Get database credentials based on authentication method
        try:
            username, password = await _get_database_credentials(ctx, target, database_name)
        except Exception as e:
            if "Database credentials required" in str(e):
                # Handle the specific case where elicitation is not supported
                return {
                    "status": "error",
                    "message": str(e),
                    "errorCode": "CREDENTIALS_REQUIRED",
                    "workspace_id": validated_id,
                    "workspace_name": target.name,
                    "workspace_type": "shared" if target.is_shared else "dedicated",
                    "instruction": (
                        "Please call this function again with the same parameters once you have "
                        "the database credentials available, or ask the user to provide their "
                        "database username and password for this workspace."
                    ),
                }
            else:
                return {
                    "status": "error",
                    "message": f"Failed to obtain database credentials: {str(e)}",
                    "errorCode": "AUTHENTICATION_ERROR",
                }
    
        logger.debug(
            f"Credentials obtained for workspace '{target.name}': username='{username}', database='{database_name}, password='{password}'"
        )
    
        # Execute the SQL query
        start_time = time.time()
        try:
            result = await __execute_sql_unified(
                ctx=ctx,
                target=target,
                sql_query=sql_query,
                username=username,
                password=password,
                database=database_name,
            )
        except Exception as e:
            # Check if this is an authentication error from __execute_sql_unified
            if "Authentication failed:" in str(e):
                # Authentication error already handled by __execute_sql_unified (credentials invalidated)
                return {
                    "status": "error",
                    "message": str(e),
                    "errorCode": "AUTHENTICATION_ERROR",
                    "workspace_id": validated_id,
                    "workspace_name": target.name,
                    "workspace_type": "shared" if target.is_shared else "dedicated",
                    "instruction": (
                        "Authentication failed. Please provide valid database credentials "
                        "for this workspace and try again."
                    ),
                }
            else:
                # Non-authentication error, re-raise
                raise
    
        results_data = result.get("data", [])
    
        logger.debug(
            f"result: {results_data}, type: {type(results_data)}, id: {id}, database_name: {database_name}"
        )
    
        execution_time_ms = int((time.time() - start_time) * 1000)
    
        # Track analytics
        settings.analytics_manager.track_event(
            username,
            "tool_calling",
            {
                "name": "run_sql",
                "starter_workspace_id": id,
                "workspace_type": "shared" if target.is_shared else "dedicated",
            },
        )
    
        # Build standardized response
        workspace_type = "shared" if target.is_shared else "dedicated"
        row_count = len(results_data)
    
        return {
            "status": "success",
            "message": f"Query executed successfully. {row_count} rows returned.",
            "data": {
                "result": results_data,
                "row_count": row_count,
                "workspace_id": id,
                "workspace_name": target.name,
                "database": database_name,
                "status": result.get("status", "Success"),
            },
            "metadata": {
                "query_length": len(sql_query),
                "execution_time_ms": execution_time_ms,
                "workspace_type": workspace_type,
                "database_used": database_name,
                "executed_at": datetime.now().isoformat(),
            },
        }
  • Core SQL execution logic used by run_sql for both workspace types, including virtual workspaces.
    async def __execute_sql_unified(
        ctx: Context,
        target: WorkspaceTarget,
        sql_query: str,
        username: str,
        password: str,
        database: str | None = None,
    ) -> dict:
        """
        Execute SQL operations on a connected workspace or starter workspace.
        Returns results and column names in a dictionary format.
        """
    
        if target.endpoint is None:
            raise ValueError("Workspace or starter workspace does not have an endpoint. ")
        endpoint = target.endpoint
        database_name = database
    
        # Parse host and port from endpoint
        if ":" in endpoint:
            host, port = endpoint.split(":", 1)
        else:
            host = endpoint
            port = None
    
        # Generate database key for credential management
        credentials_manager = get_session_credentials_manager()
        database_key = credentials_manager.generate_database_key(
            workspace_name=target.name, database_name=database_name
        )
    
        try:
            s2_manager = S2Manager(
                host=host,
                port=port,
                user=username,
                password=password,
                database=database_name,
            )
    
            workspace_type = "shared/virtual" if target.is_shared else "dedicated"
            await ctx.info(
                f"Executing SQL query on {workspace_type} workspace '{target.name}' with database '{database_name}': {sql_query}"
                "This query may take some time depending on the complexity and size of the data."
            )
            s2_manager.execute(sql_query)
            columns = (
                [desc[0] for desc in s2_manager.cursor.description]
                if s2_manager.cursor.description
                else []
            )
            rows = s2_manager.fetchmany()
            results = []
            for row in rows:
                result_dict = {}
                for i, column in enumerate(columns):
                    result_dict[column] = row[i]
                results.append(result_dict)
            s2_manager.close()
            return {
                "data": results,
                "row_count": len(rows),
                "columns": columns,
                "status": "Success",
            }
        except Exception as e:
            # Check if this is an authentication error
            error_msg = str(e).lower()
            is_auth_error = any(
                auth_keyword in error_msg
                for auth_keyword in [
                    "access denied",
                    "authentication failed",
                    "invalid credentials",
                    "login failed",
                    "permission denied",
                    "unauthorized",
                    "auth",
                ]
            )
    
            if is_auth_error:
                logger.warning(
                    f"Authentication failed for database {database_key}, invalidating cached credentials"
                )
                invalidate_credentials(database_key)
                raise Exception(f"Authentication failed: {str(e)}")
            else:
                # Non-authentication error, re-raise as-is
                raise
  • Helper to resolve workspace ID to target, supporting virtual workspaces by querying sharedtier/virtualWorkspaces API.
    def __get_workspace_by_id(workspace_id: str) -> WorkspaceTarget:
        """
        Get a workspace or starter workspace by ID.
    
        Args:
            workspace_id: The workspace ID to look up
    
        Returns:
            WorkspaceTarget object with is_shared flag indicating if it's a starter workspace
    
        Raises:
            ValueError: If workspace cannot be found
        """
        from src.api.common import build_request
    
        target = None
        is_shared = False
    
        try:
            # Try as dedicated workspace first
            workspace_data = build_request("GET", f"workspaces/{workspace_id}")
    
            # Create a simple object to match the SDK interface
            class SimpleWorkspace:
                def __init__(self, data):
                    self.name = data.get("name", "")
                    self.id = data.get("workspaceID", workspace_id)
                    self.endpoint = data.get("endpoint")
    
            target = SimpleWorkspace(workspace_data)
            is_shared = False  # Dedicated workspace
        except Exception as e:
            if "404" in str(e):
                # Try as starter workspace
                try:
                    starter_workspace_data = build_request(
                        "GET", f"sharedtier/virtualWorkspaces/{workspace_id}"
                    )
    
                    # Create a simple object to match the SDK interface
                    class SimpleVirtualWorkspace:
                        def __init__(self, data):
                            self.name = data.get("name", "")
                            self.id = data.get("virtualWorkspaceID", workspace_id)
                            self.endpoint = data.get("endpoint")
                            self.database_name = data.get("databaseName", "")
    
                    target = SimpleVirtualWorkspace(starter_workspace_data)
                    is_shared = True  # Shared/starter workspace
                except Exception:
                    raise ValueError(f"Cannot find workspace {workspace_id}")
            else:
                raise e
    
        if not target:
            raise ValueError(f"Cannot find workspace {workspace_id}")
    
        return WorkspaceTarget(target, is_shared)
  • Registers all tools including run_sql using func.__name__ as tool name ("run_sql").
    func = tool.func
    # Skip organization-related tools when using API key authentication
    if using_api_key and func.__name__ in api_key_excluded_tools:
        continue
    mcp.tool(name=func.__name__, description=func.__doc__)(func)
  • Lists run_sql in tools_definition for registration.
    {"func": run_sql},

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/singlestore-labs/mcp-server-singlestore'

If you have feedback or need assistance with the MCP directory API, please join our Discord server