Skip to main content
Glama
cgrdavies

mcp-clickhouse-long-running

by cgrdavies

run_select_query

Execute SELECT queries in a ClickHouse database to retrieve specific data, leveraging the mcp-clickhouse-long-running server for optimized query performance.

Instructions

Run a SELECT query in a ClickHouse database

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes

Implementation Reference

  • The primary handler function for the 'run_select_query' MCP tool. It manages query execution asynchronously with timeout handling, error catching, and structured responses.
    @mcp.tool()
    def run_select_query(query: str):
        """Run a SELECT query in a ClickHouse database"""
        logger.info(f"Executing SELECT query: {query}")
        try:
            future = QUERY_EXECUTOR.submit(execute_query, query)
            try:
                result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS)
                # Check if we received an error structure from execute_query
                if isinstance(result, dict) and "error" in result:
                    logger.warning(f"Query failed: {result['error']}")
                    # MCP requires structured responses; string error messages can cause
                    # serialization issues leading to BrokenResourceError
                    return {
                        "status": "error",
                        "message": f"Query failed: {result['error']}",
                    }
                return result
            except concurrent.futures.TimeoutError:
                logger.warning(
                    f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}"
                )
                future.cancel()
                # Return a properly structured response for timeout errors
                return {
                    "status": "error",
                    "message": f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds",
                }
        except Exception as e:
            logger.error(f"Unexpected error in run_select_query: {str(e)}")
            # Catch all other exceptions and return them in a structured format
            # to prevent MCP serialization failures
            return {"status": "error", "message": f"Unexpected error: {str(e)}"}
  • Supporting helper function that performs the actual ClickHouse query execution, processes results into a list of dictionaries, and returns structured errors if any occur.
    def execute_query(query: str):
        client = create_clickhouse_client()
        try:
            read_only = get_readonly_setting(client)
            res = client.query(query, settings={"readonly": read_only})
            column_names = res.column_names
            rows = []
            for row in res.result_rows:
                row_dict = {}
                for i, col_name in enumerate(column_names):
                    row_dict[col_name] = row[i]
                rows.append(row_dict)
            logger.info(f"Query returned {len(rows)} rows")
            return rows
        except Exception as err:
            logger.error(f"Error executing query: {err}")
            # Return a structured dictionary rather than a string to ensure proper serialization
            # by the MCP protocol. String responses for errors can cause BrokenResourceError.
            return {"error": str(err)}
  • Helper function to create and test a ClickHouse client connection using configuration from mcp_env.py, used by query execution.
    def create_clickhouse_client():
        client_config = get_config().get_client_config()
        logger.info(
            f"Creating ClickHouse client connection to {client_config['host']}:{client_config['port']} "
            f"as {client_config['username']} "
            f"(secure={client_config['secure']}, verify={client_config['verify']}, "
            f"connect_timeout={client_config['connect_timeout']}s, "
            f"send_receive_timeout={client_config['send_receive_timeout']}s)"
        )
    
        try:
            client = clickhouse_connect.get_client(**client_config)
            # Test the connection
            version = client.server_version
            logger.info(f"Successfully connected to ClickHouse server version {version}")
            return client
        except Exception as e:
            logger.error(f"Failed to connect to ClickHouse: {str(e)}")
            raise
  • Package __init__ exports the run_select_query tool function for import in tests and other modules.
    from .mcp_server import (
        create_clickhouse_client,
        list_databases,
        list_tables,
        run_select_query,
    )
    
    __all__ = [
        "list_databases",
        "list_tables",
        "run_select_query",
        "create_clickhouse_client",
    ]
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cgrdavies/mcp-clickhouse'

If you have feedback or need assistance with the MCP directory API, please join our Discord server