ydb_query
Execute SQL queries on YDB databases using input parameters. Integrate with AI-powered operations and natural language interactions via the YDB MCP server.
Instructions
Run a SQL query against YDB database
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| params | No | ||
| sql | Yes |
Input Schema (JSON Schema)
{
"properties": {
"params": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Params"
},
"sql": {
"title": "Sql",
"type": "string"
}
},
"required": [
"sql"
],
"title": "queryArguments",
"type": "object"
}
Implementation Reference
- ydb_mcp/server.py:431-465 (handler)Core handler function that executes the SQL query against the YDB database, handles optional parameters, processes result sets, and returns formatted JSON output.async def query(self, sql: str, params: Optional[Dict[str, Any]] = None) -> List[TextContent]: """Run a SQL query against YDB. Args: sql: SQL query to execute params: Optional query parameters Returns: List of TextContent objects with JSON-formatted results """ # Check if there's an authentication error if self.auth_error: return [TextContent(type="text", text=json.dumps({"error": self.auth_error}, indent=2))] try: pool = await self.get_pool() ydb_params = None if params: ydb_params = {} for key, value in params.items(): param_key = key if key.startswith("$") else f"${key}" ydb_params[param_key] = value result_sets = await pool.execute_with_retries(sql, ydb_params) all_results = [] for result_set in result_sets: processed = self._process_result_set(result_set) all_results.append(processed) # Convert all dict keys to strings for JSON serialization safe_result = self._stringify_dict_keys({"result_sets": all_results}) return [TextContent(type="text", text=json.dumps(safe_result, indent=2, cls=CustomJSONEncoder))] except Exception as e: error_message = str(e) safe_error = self._stringify_dict_keys({"error": error_message}) return [TextContent(type="text", text=json.dumps(safe_error, indent=2))]
- ydb_mcp/server.py:589-598 (registration)Tool specification in register_tools() method defining the 'ydb_query' tool: name, description, handler reference (self.query), and input schema requiring 'sql' parameter.{ "name": "ydb_query", "description": "Run a SQL query against YDB database", "handler": self.query, # Use real handler "parameters": { "properties": {"sql": {"type": "string", "title": "Sql"}}, "required": ["sql"], "type": "object", }, },
- ydb_mcp/server.py:593-597 (schema)JSON schema definition for 'ydb_query' tool inputs: object with required 'sql' string property."parameters": { "properties": {"sql": {"type": "string", "title": "Sql"}}, "required": ["sql"], "type": "object", },
- ydb_mcp/server.py:1060-1062 (handler)Dispatch logic in overridden call_tool() method that specifically handles 'ydb_query' by calling the query handler with the sql parameter.if tool_name == "ydb_query" and "sql" in params: result = await self.query(sql=params["sql"]) elif tool_name == "ydb_query_with_params" and "sql" in params and "params" in params:
- ydb_mcp/server.py:466-503 (helper)Helper function called by query handler to process YDB result sets into structured dictionary with columns and rows.def _process_result_set(self, result_set): """Process YDB result set into a dictionary format. Args: result_set: YDB result set object Returns: Processed result set as a dictionary """ try: # Extract columns columns = [] try: # Get column names from the columns attribute columns_attr = getattr(result_set, "columns") columns = [col.name for col in columns_attr] except Exception as e: logger.exception(f"Error getting columns: {e}") return {"error": str(e), "columns": [], "rows": []} # Extract rows rows = [] try: rows_attr = getattr(result_set, "rows") for row in rows_attr: row_values = [] for i in range(len(columns)): row_values.append(row[i]) rows.append(row_values) except Exception as e: logger.exception(f"Error getting rows: {e}") return {"error": str(e), "columns": columns, "rows": []} return {"columns": columns, "rows": rows} except Exception as e: logger.exception(f"Error processing result set: {e}") return {"error": str(e), "columns": [], "rows": []}