execute_sql
Execute SQL queries on OceanBase databases via the MCP Server, enabling AI assistants to securely interact with and manage data through a controlled interface.
Instructions
Execute an SQL query on the OceanBase server
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | The SQL query to execute |
Implementation Reference
- The @app.call_tool() decorator defines the handler for the 'execute_sql' tool. It validates the tool name, extracts the SQL query from arguments, connects to the OceanBase database using mysql.connector, executes the query, handles special cases like SHOW TABLES, SHOW COLUMNS, DESCRIBE, SELECT queries by formatting results as CSV-like text, commits non-SELECT queries, and returns TextContent or error messages.@app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Execute SQL commands.""" config = get_db_config() logger.info(f"Calling tool: {name} with arguments: {arguments}") if name != "execute_sql": raise ValueError(f"Unknown tool: {name}") query = arguments.get("query") if not query: raise ValueError("Query is required") try: with connect(**config) as conn: with conn.cursor() as cursor: cursor.execute(query) # Special handling for SHOW TABLES if query.strip().upper().startswith("SHOW TABLES"): tables = cursor.fetchall() result = ["Tables_in_" + config["database"]] # Header result.extend([table[0] for table in tables]) return [TextContent(type="text", text="\n".join(result))] elif query.strip().upper().startswith("SHOW COLUMNS"): resp_header = "Columns info of this table: \n" columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() result = [",".join(map(str, row)) for row in rows] return [TextContent(type="text", text=resp_header + ("\n".join([",".join(columns)] + result)))] elif query.strip().upper().startswith("DESCRIBE"): resp_header = "Description of this table: \n" columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() result = [",".join(map(str, row)) for row in rows] return [TextContent(type="text", text=resp_header + ("\n".join([",".join(columns)] + result)))] # Regular SELECT queries elif query.strip().upper().startswith("SELECT"): columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() result = [",".join(map(str, row)) for row in rows] return [TextContent(type="text", text="\n".join([",".join(columns)] + result))] # Non-SELECT queries else: conn.commit() return [ TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")] except Error as e: logger.error(f"Error executing SQL '{query}': {e}") return [TextContent(type="text", text=f"Error executing query: {str(e)}")]
- src/oceanbase_mcp_server/server.py:93-113 (registration)The @app.list_tools() function registers the 'execute_sql' tool, providing its name, description, and input schema requiring a 'query' string.@app.list_tools() async def list_tools() -> list[Tool]: """List available OceanBase tools.""" logger.info("Listing tools...") return [ Tool( name="execute_sql", description="Execute an SQL query on the OceanBase server", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The SQL query to execute" } }, "required": ["query"] } ) ]
- The inputSchema defines the expected input for the 'execute_sql' tool: an object with a required 'query' string property.inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The SQL query to execute" } }, "required": ["query"] }
- Helper function get_db_config() retrieves OceanBase connection details from environment variables (OB_HOST, OB_PORT, OB_USER, OB_PASSWORD, OB_DATABASE) and validates required fields, used by the handler.def get_db_config(): """Get database configuration from environment variables.""" config = { "host": os.getenv("OB_HOST", "localhost"), "port": int(os.getenv("OB_PORT", "2881")), "user": os.getenv("OB_USER"), "password": os.getenv("OB_PASSWORD"), "database": os.getenv("OB_DATABASE") } if not all([config["user"], config["password"], config["database"]]): logger.error("Missing required database configuration. Please check environment variables:") logger.error("OB_USER, OB_PASSWORD, and OB_DATABASE are required") raise ValueError("Missing required database configuration") return config