We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/cgrdavies/mcp-clickhouse'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import logging
import json
from typing import Optional, List, Any
import concurrent.futures
import atexit
import clickhouse_connect
from clickhouse_connect.driver.binding import format_query_value
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from dataclasses import dataclass, field, asdict, is_dataclass
from mcp_clickhouse.mcp_env import get_config
@dataclass
class Column:
database: str
table: str
name: str
column_type: str
default_kind: Optional[str]
default_expression: Optional[str]
comment: Optional[str]
@dataclass
class Table:
database: str
name: str
engine: str
create_table_query: str
dependencies_database: str
dependencies_table: str
engine_full: str
sorting_key: str
primary_key: str
total_rows: int
total_bytes: int
total_bytes_uncompressed: int
parts: int
active_parts: int
total_marks: int
comment: Optional[str] = None
columns: List[Column] = field(default_factory=list)
MCP_SERVER_NAME = "mcp-clickhouse"
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
QUERY_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10)
atexit.register(lambda: QUERY_EXECUTOR.shutdown(wait=True))
SELECT_QUERY_TIMEOUT_SECS = 30 * 60
load_dotenv()
deps = [
"clickhouse-connect",
"python-dotenv",
"uvicorn",
"pip-system-certs",
]
mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps)
def result_to_table(query_columns, result) -> List[Table]:
return [Table(**dict(zip(query_columns, row))) for row in result]
def result_to_column(query_columns, result) -> List[Column]:
return [Column(**dict(zip(query_columns, row))) for row in result]
def to_json(obj: Any) -> str:
if is_dataclass(obj):
return json.dumps(asdict(obj), default=to_json)
elif isinstance(obj, list):
return [to_json(item) for item in obj]
elif isinstance(obj, dict):
return {key: to_json(value) for key, value in obj.items()}
return obj
@mcp.tool()
def list_databases():
"""List available ClickHouse databases"""
logger.info("Listing all databases")
client = create_clickhouse_client()
result = client.command("SHOW DATABASES")
logger.info(f"Found {len(result) if isinstance(result, list) else 1} databases")
return result
@mcp.tool()
def list_tables(
database: str, like: Optional[str] = None, not_like: Optional[str] = None
):
"""List available ClickHouse tables in a database, including schema, comment,
row count, and column count."""
logger.info(f"Listing tables in database '{database}'")
client = create_clickhouse_client()
query = f"SELECT database, name, engine, create_table_query, dependencies_database, dependencies_table, engine_full, sorting_key, primary_key, total_rows, total_bytes, total_bytes_uncompressed, parts, active_parts, total_marks, comment FROM system.tables WHERE database = {format_query_value(database)}"
if like:
query += f" AND name LIKE {format_query_value(like)}"
if not_like:
query += f" AND name NOT LIKE {format_query_value(not_like)}"
result = client.query(query)
# Deserialize result as Table dataclass instances
tables = result_to_table(result.column_names, result.result_rows)
for table in tables:
column_data_query = f"SELECT database, table, name, type AS column_type, default_kind, default_expression, comment FROM system.columns WHERE database = {format_query_value(database)} AND table = {format_query_value(table.name)}"
column_data_query_result = client.query(column_data_query)
table.columns = [
c
for c in result_to_column(
column_data_query_result.column_names,
column_data_query_result.result_rows,
)
]
logger.info(f"Found {len(tables)} tables")
return [asdict(table) for table in tables]
def execute_query(query: str):
client = create_clickhouse_client()
try:
read_only = get_readonly_setting(client)
res = client.query(query, settings={"readonly": read_only})
column_names = res.column_names
rows = []
for row in res.result_rows:
row_dict = {}
for i, col_name in enumerate(column_names):
row_dict[col_name] = row[i]
rows.append(row_dict)
logger.info(f"Query returned {len(rows)} rows")
return rows
except Exception as err:
logger.error(f"Error executing query: {err}")
# Return a structured dictionary rather than a string to ensure proper serialization
# by the MCP protocol. String responses for errors can cause BrokenResourceError.
return {"error": str(err)}
@mcp.tool()
def run_select_query(query: str):
"""Run a SELECT query in a ClickHouse database"""
logger.info(f"Executing SELECT query: {query}")
try:
future = QUERY_EXECUTOR.submit(execute_query, query)
try:
result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS)
# Check if we received an error structure from execute_query
if isinstance(result, dict) and "error" in result:
logger.warning(f"Query failed: {result['error']}")
# MCP requires structured responses; string error messages can cause
# serialization issues leading to BrokenResourceError
return {
"status": "error",
"message": f"Query failed: {result['error']}",
}
return result
except concurrent.futures.TimeoutError:
logger.warning(
f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}"
)
future.cancel()
# Return a properly structured response for timeout errors
return {
"status": "error",
"message": f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds",
}
except Exception as e:
logger.error(f"Unexpected error in run_select_query: {str(e)}")
# Catch all other exceptions and return them in a structured format
# to prevent MCP serialization failures
return {"status": "error", "message": f"Unexpected error: {str(e)}"}
def create_clickhouse_client():
client_config = get_config().get_client_config()
logger.info(
f"Creating ClickHouse client connection to {client_config['host']}:{client_config['port']} "
f"as {client_config['username']} "
f"(secure={client_config['secure']}, verify={client_config['verify']}, "
f"connect_timeout={client_config['connect_timeout']}s, "
f"send_receive_timeout={client_config['send_receive_timeout']}s)"
)
try:
client = clickhouse_connect.get_client(**client_config)
# Test the connection
version = client.server_version
logger.info(f"Successfully connected to ClickHouse server version {version}")
return client
except Exception as e:
logger.error(f"Failed to connect to ClickHouse: {str(e)}")
raise
def get_readonly_setting(client) -> str:
"""Get the appropriate readonly setting value to use for queries.
This function handles potential conflicts between server and client readonly settings:
- readonly=0: No read-only restrictions
- readonly=1: Only read queries allowed, settings cannot be changed
- readonly=2: Only read queries allowed, settings can be changed (except readonly itself)
If server has readonly=2 and client tries to set readonly=1, it would cause:
"Setting readonly is unknown or readonly" error
This function preserves the server's readonly setting unless it's 0, in which case
we enforce readonly=1 to ensure queries are read-only.
Args:
client: ClickHouse client connection
Returns:
String value of readonly setting to use
"""
read_only = client.server_settings.get("readonly")
if read_only:
if read_only == "0":
return "1" # Force read-only mode if server has it disabled
else:
return read_only.value # Respect server's readonly setting (likely 2)
else:
return "1" # Default to basic read-only mode if setting isn't present