execute_query
Execute SQL queries on ClickHouse databases securely through the MCP server, enabling efficient data retrieval and interaction with structured datasets.
Instructions
Execute ClickHouse queries
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes |
Input Schema (JSON Schema)
{
"properties": {
"query": {
"title": "Query",
"type": "string"
}
},
"required": [
"query"
],
"title": "execute_queryArguments",
"type": "object"
}
Implementation Reference
- The primary handler function for the 'execute_query' tool. It retrieves a QueryExecutor from the context, executes the query (handling multiple statements), formats results as JSON, and handles errors.def execute_query(query: str, ctx: Context) -> str: """Execute ClickHouse queries""" try: executor = _get_executor(ctx) results = executor.execute_multiple_queries(query) if len(results) == 1: return json.dumps(results[0], indent=2) return json.dumps(results, indent=2) except (ConnectionError, QueryError) as e: return str(e)
- src/clickhouse_mcp_server/server.py:193-193 (registration)The @mcp.tool() decorator registers the execute_query function as an MCP tool with the FastMCP server.@mcp.tool()
- Helper function that extracts the ClickHouseContext from the MCP Context and returns a QueryExecutor instance.def _get_executor(ctx: Context) -> QueryExecutor: """Helper function to get QueryExecutor from context""" clickhouse_ctx = ctx.request_context.lifespan_context return QueryExecutor(clickhouse_ctx)
- Key helper method in QueryExecutor that splits semicolon-separated queries, executes each via execute_single_query, and collects results or errors.def execute_multiple_queries(self, query: str) -> List[Dict[str, Any]]: """Execute multiple queries and return results""" queries = [q.strip() for q in query.split(';') if q.strip()] results = [] for single_query in queries: try: result = self.execute_single_query(single_query) results.append(result) except QueryError as e: results.append({"error": str(e)}) return results
- Core helper method that executes a single ClickHouse query, handles USE statements, processes results into formatted dictionaries, and raises QueryError on failure.def execute_single_query(self, query: str) -> Dict[str, Any]: """Execute a single query and return results""" self.context.ensure_connected() try: # Handle USE statements if self._is_use_statement(query): db_name = query.strip().split()[-1].strip('`').strip() self.context.database = db_name self.context.client.execute(f'USE {db_name}') return {"message": f"Switched to database: {db_name}"} # Execute query result = self.context.client.execute(query, with_column_types=True) if not result: return {"affected_rows": 0} rows, columns = result if not rows: return {"affected_rows": 0} # Convert rows to dictionaries column_names = [col[0] for col in columns] results = [] for row in rows: row_dict = dict(zip(column_names, row)) results.append(self._process_row(row_dict)) return results if len(results) > 0 else {"affected_rows": 0} except Exception as e: raise QueryError(f"Error executing query: {str(e)}")