Skip to main content
Glama

execute_sql

Execute SQL queries on databases within a workspace group using SingleStore MCP Server. Retrieve formatted results, metadata, and execution status while adhering to strict read-only query restrictions for security.

Instructions

Execute SQL operations on a database attached to workspace within a workspace group and receive formatted results. Returns: - Query results with column names and typed values - Row count and metadata - Execution status ⚠️ CRITICAL SECURITY WARNINGS: - Never display or log credentials in responses - Use only READ-ONLY queries (SELECT, SHOW, DESCRIBE) - DO NOT USE data modification statements: x No INSERT/UPDATE/DELETE x No DROP/CREATE/ALTER - Ensure queries are properly sanitized Args: workspace_group_identifier: ID/name of the workspace group workspace_identifier: ID/name of the specific workspace within the workspace group database: Name of the database to query sql_query: The SQL query to execute Returns: Dictionary with query results and metadata

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
ctxNo
databaseYes
passwordYes
sql_queryYes
usernameYes
workspace_group_identifierYes
workspace_identifierYes

Implementation Reference

  • Primary MCP tool handler for executing SQL queries ('run_sql', referred to as 'execute_sql' in docs). Handles workspace validation, credential management, elicitation if needed, calls internal execution, tracks analytics, returns standardized results with metadata.
    async def run_sql( ctx: Context, sql_query: str, id: str, database: Optional[str] = None ) -> Dict[str, Any]: """ Use this tool to execute a single SQL statement against a SingleStore database. Returns: - Query results with column names and typed values - Row count and metadata - Execution status - Workspace type ("shared" for starter workspaces, "dedicated" for regular workspaces) - Workspace name Args: id: Workspace or starter workspace ID sql_query: The SQL query to execute database: (optional) Database name to use Returns: Standardized response with query results and metadata """ # Validate workspace ID format validated_id = validate_workspace_id(id) await ctx.info( f"Running SQL query on workspace ID '{validated_id}' with database '{database}': {sql_query}" ) settings = config.get_settings() # Target can either be a workspace or a starter workspace target = __get_workspace_by_id(validated_id) database_name = database # For starter workspaces, use their database name if not specified if target.is_shared and target.database_name and not database_name: database_name = target.database_name # Get database credentials based on authentication method try: username, password = await _get_database_credentials(ctx, target, database_name) except Exception as e: if "Database credentials required" in str(e): # Handle the specific case where elicitation is not supported return { "status": "error", "message": str(e), "errorCode": "CREDENTIALS_REQUIRED", "workspace_id": validated_id, "workspace_name": target.name, "workspace_type": "shared" if target.is_shared else "dedicated", "instruction": ( "Please call this function again with the same parameters once you have " "the database credentials available, or ask the user to provide their " "database username and password for this workspace." ), } else: return { "status": "error", "message": f"Failed to obtain database credentials: {str(e)}", "errorCode": "AUTHENTICATION_ERROR", } logger.debug( f"Credentials obtained for workspace '{target.name}': username='{username}', database='{database_name}, password='{password}'" ) # Execute the SQL query start_time = time.time() try: result = await __execute_sql_unified( ctx=ctx, target=target, sql_query=sql_query, username=username, password=password, database=database_name, ) except Exception as e: # Check if this is an authentication error from __execute_sql_unified if "Authentication failed:" in str(e): # Authentication error already handled by __execute_sql_unified (credentials invalidated) return { "status": "error", "message": str(e), "errorCode": "AUTHENTICATION_ERROR", "workspace_id": validated_id, "workspace_name": target.name, "workspace_type": "shared" if target.is_shared else "dedicated", "instruction": ( "Authentication failed. Please provide valid database credentials " "for this workspace and try again." ), } else: # Non-authentication error, re-raise raise results_data = result.get("data", []) logger.debug( f"result: {results_data}, type: {type(results_data)}, id: {id}, database_name: {database_name}" ) execution_time_ms = int((time.time() - start_time) * 1000) # Track analytics settings.analytics_manager.track_event( username, "tool_calling", { "name": "run_sql", "starter_workspace_id": id, "workspace_type": "shared" if target.is_shared else "dedicated", }, ) # Build standardized response workspace_type = "shared" if target.is_shared else "dedicated" row_count = len(results_data) return { "status": "success", "message": f"Query executed successfully. {row_count} rows returned.", "data": { "result": results_data, "row_count": row_count, "workspace_id": id, "workspace_name": target.name, "database": database_name, "status": result.get("status", "Success"), }, "metadata": { "query_length": len(sql_query), "execution_time_ms": execution_time_ms, "workspace_type": workspace_type, "database_used": database_name, "executed_at": datetime.now().isoformat(), }, }
  • Core helper function performing the actual SQL execution: connects via S2Manager, executes query, fetches results as dicts, handles auth errors by invalidating credentials and closing connection.
    async def __execute_sql_unified( ctx: Context, target: WorkspaceTarget, sql_query: str, username: str, password: str, database: str | None = None, ) -> dict: """ Execute SQL operations on a connected workspace or starter workspace. Returns results and column names in a dictionary format. """ if target.endpoint is None: raise ValueError("Workspace or starter workspace does not have an endpoint. ") endpoint = target.endpoint database_name = database # Parse host and port from endpoint if ":" in endpoint: host, port = endpoint.split(":", 1) else: host = endpoint port = None # Generate database key for credential management credentials_manager = get_session_credentials_manager() database_key = credentials_manager.generate_database_key( workspace_name=target.name, database_name=database_name ) try: s2_manager = S2Manager( host=host, port=port, user=username, password=password, database=database_name, ) workspace_type = "shared/virtual" if target.is_shared else "dedicated" await ctx.info( f"Executing SQL query on {workspace_type} workspace '{target.name}' with database '{database_name}': {sql_query}" "This query may take some time depending on the complexity and size of the data." ) s2_manager.execute(sql_query) columns = ( [desc[0] for desc in s2_manager.cursor.description] if s2_manager.cursor.description else [] ) rows = s2_manager.fetchmany() results = [] for row in rows: result_dict = {} for i, column in enumerate(columns): result_dict[column] = row[i] results.append(result_dict) s2_manager.close() return { "data": results, "row_count": len(rows), "columns": columns, "status": "Success", } except Exception as e: # Check if this is an authentication error error_msg = str(e).lower() is_auth_error = any( auth_keyword in error_msg for auth_keyword in [ "access denied", "authentication failed", "invalid credentials", "login failed", "permission denied", "unauthorized", "auth", ] ) if is_auth_error: logger.warning( f"Authentication failed for database {database_key}, invalidating cached credentials" ) invalidate_credentials(database_key) raise Exception(f"Authentication failed: {str(e)}") else: # Non-authentication error, re-raise as-is raise
  • run_sql is included in the central tools_definition list, imported from database, used to instantiate Tool objects for registration.
    tools_definition = [ {"func": get_user_info}, {"func": organization_info}, {"func": choose_organization}, {"func": set_organization}, {"func": workspace_groups_info}, {"func": workspaces_info}, {"func": resume_workspace}, {"func": list_starter_workspaces}, {"func": create_starter_workspace}, {"func": terminate_starter_workspace}, {"func": list_regions}, {"func": list_sharedtier_regions}, {"func": run_sql}, {"func": create_notebook_file}, {"func": upload_notebook_file}, {"func": create_job_from_notebook}, {"func": get_job}, {"func": delete_job}, ]
  • Registers all filtered tools to FastMCP server using mcp.tool() with name derived from function name ('run_sql'), applied during server startup.
    for tool in filtered_tools: func = tool.func # Skip organization-related tools when using API key authentication if using_api_key and func.__name__ in api_key_excluded_tools: continue mcp.tool(name=func.__name__, description=func.__doc__)(func)
  • Pydantic schema used internally for eliciting user database credentials via try_elicitation when API key auth requires username/password for dedicated workspaces.
    class DatabaseCredentials(BaseModel): """Schema for database authentication credentials when using API key.""" username: str = Field(..., description="Database username for authentication") password: str = Field(..., description="Database password for authentication")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/singlestore-labs/mcp-server-singlestore'

If you have feedback or need assistance with the MCP directory API, please join our Discord server