execute_query
Execute SQL queries on ClickHouse databases. Run any valid ClickHouse query and retrieve results.
Instructions
Execute ClickHouse queries
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes |
Implementation Reference
- The 'execute_query' tool handler function, registered via @mcp.tool() decorator. It calls executor.execute_multiple_queries() and returns results as JSON.
@mcp.tool() def execute_query(query: str, ctx: Context) -> str: """Execute ClickHouse queries""" try: executor = _get_executor(ctx) results = executor.execute_multiple_queries(query) if len(results) == 1: return json.dumps(results[0], indent=2) return json.dumps(results, indent=2) except (ConnectionError, QueryError) as e: return str(e) - src/clickhouse_mcp_server/server.py:193-194 (registration)The @mcp.tool() decorator registers 'execute_query' as an MCP tool on the FastMCP server instance.
@mcp.tool() def execute_query(query: str, ctx: Context) -> str: - QueryExecutor.execute_multiple_queries() - splits input by semicolons and executes each query via execute_single_query().
def execute_multiple_queries(self, query: str) -> List[Dict[str, Any]]: """Execute multiple queries and return results""" queries = [q.strip() for q in query.split(';') if q.strip()] results = [] for single_query in queries: try: result = self.execute_single_query(single_query) results.append(result) except QueryError as e: results.append({"error": str(e)}) return results - QueryExecutor.execute_single_query() - executes a single ClickHouse query and processes results into dictionaries.
def execute_single_query(self, query: str) -> Dict[str, Any]: """Execute a single query and return results""" self.context.ensure_connected() try: # Handle USE statements if self._is_use_statement(query): db_name = query.strip().split()[-1].strip('`').strip() self.context.database = db_name self.context.client.execute(f'USE {db_name}') return {"message": f"Switched to database: {db_name}"} # Execute query result = self.context.client.execute(query, with_column_types=True) if not result: return {"affected_rows": 0} rows, columns = result if not rows: return {"affected_rows": 0} # Convert rows to dictionaries column_names = [col[0] for col in columns] results = [] for row in rows: row_dict = dict(zip(column_names, row)) results.append(self._process_row(row_dict)) return results if len(results) > 0 else {"affected_rows": 0} except Exception as e: raise QueryError(f"Error executing query: {str(e)}") - _get_executor(ctx) helper that retrieves the ClickHouseContext from the request context and creates a QueryExecutor.
def _get_executor(ctx: Context) -> QueryExecutor: """Helper function to get QueryExecutor from context""" clickhouse_ctx = ctx.request_context.lifespan_context return QueryExecutor(clickhouse_ctx)