import logging
import json
from typing import Optional, List, Any
import concurrent.futures
import atexit
import clickhouse_connect
from clickhouse_connect.driver.binding import format_query_value
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from dataclasses import dataclass, field, asdict, is_dataclass
from mcp_clickhouse.mcp_env import get_config
@dataclass
class Column:
database: str
table: str
name: str
column_type: str
default_kind: Optional[str]
default_expression: Optional[str]
comment: Optional[str]
@dataclass
class Table:
database: str
name: str
engine: str
create_table_query: str
dependencies_database: str
dependencies_table: str
engine_full: str
sorting_key: str
primary_key: str
total_rows: int
total_bytes: int
total_bytes_uncompressed: int
parts: int
active_parts: int
total_marks: int
comment: Optional[str] = None
columns: List[Column] = field(default_factory=list)
MCP_SERVER_NAME = "mcp-clickhouse"
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
QUERY_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10)
atexit.register(lambda: QUERY_EXECUTOR.shutdown(wait=True))
SELECT_QUERY_TIMEOUT_SECS = 30 * 60
load_dotenv()
deps = [
"clickhouse-connect",
"python-dotenv",
"uvicorn",
"pip-system-certs",
]
mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps)
def result_to_table(query_columns, result) -> List[Table]:
return [Table(**dict(zip(query_columns, row))) for row in result]
def result_to_column(query_columns, result) -> List[Column]:
return [Column(**dict(zip(query_columns, row))) for row in result]
def to_json(obj: Any) -> str:
if is_dataclass(obj):
return json.dumps(asdict(obj), default=to_json)
elif isinstance(obj, list):
return [to_json(item) for item in obj]
elif isinstance(obj, dict):
return {key: to_json(value) for key, value in obj.items()}
return obj
@mcp.tool()
def list_databases():
"""List available ClickHouse databases"""
logger.info("Listing all databases")
client = create_clickhouse_client()
result = client.command("SHOW DATABASES")
logger.info(f"Found {len(result) if isinstance(result, list) else 1} databases")
return result
@mcp.tool()
def list_tables(
database: str, like: Optional[str] = None, not_like: Optional[str] = None
):
"""List available ClickHouse tables in a database, including schema, comment,
row count, and column count."""
logger.info(f"Listing tables in database '{database}'")
client = create_clickhouse_client()
query = f"SELECT database, name, engine, create_table_query, dependencies_database, dependencies_table, engine_full, sorting_key, primary_key, total_rows, total_bytes, total_bytes_uncompressed, parts, active_parts, total_marks, comment FROM system.tables WHERE database = {format_query_value(database)}"
if like:
query += f" AND name LIKE {format_query_value(like)}"
if not_like:
query += f" AND name NOT LIKE {format_query_value(not_like)}"
result = client.query(query)
# Deserialize result as Table dataclass instances
tables = result_to_table(result.column_names, result.result_rows)
for table in tables:
column_data_query = f"SELECT database, table, name, type AS column_type, default_kind, default_expression, comment FROM system.columns WHERE database = {format_query_value(database)} AND table = {format_query_value(table.name)}"
column_data_query_result = client.query(column_data_query)
table.columns = [
c
for c in result_to_column(
column_data_query_result.column_names,
column_data_query_result.result_rows,
)
]
logger.info(f"Found {len(tables)} tables")
return [asdict(table) for table in tables]
def execute_query(query: str):
client = create_clickhouse_client()
try:
read_only = get_readonly_setting(client)
res = client.query(query, settings={"readonly": read_only})
column_names = res.column_names
rows = []
for row in res.result_rows:
row_dict = {}
for i, col_name in enumerate(column_names):
row_dict[col_name] = row[i]
rows.append(row_dict)
logger.info(f"Query returned {len(rows)} rows")
return rows
except Exception as err:
logger.error(f"Error executing query: {err}")
# Return a structured dictionary rather than a string to ensure proper serialization
# by the MCP protocol. String responses for errors can cause BrokenResourceError.
return {"error": str(err)}
@mcp.tool()
def run_select_query(query: str):
"""Run a SELECT query in a ClickHouse database"""
logger.info(f"Executing SELECT query: {query}")
try:
future = QUERY_EXECUTOR.submit(execute_query, query)
try:
result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS)
# Check if we received an error structure from execute_query
if isinstance(result, dict) and "error" in result:
logger.warning(f"Query failed: {result['error']}")
# MCP requires structured responses; string error messages can cause
# serialization issues leading to BrokenResourceError
return {
"status": "error",
"message": f"Query failed: {result['error']}",
}
return result
except concurrent.futures.TimeoutError:
logger.warning(
f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}"
)
future.cancel()
# Return a properly structured response for timeout errors
return {
"status": "error",
"message": f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds",
}
except Exception as e:
logger.error(f"Unexpected error in run_select_query: {str(e)}")
# Catch all other exceptions and return them in a structured format
# to prevent MCP serialization failures
return {"status": "error", "message": f"Unexpected error: {str(e)}"}
def create_clickhouse_client():
client_config = get_config().get_client_config()
logger.info(
f"Creating ClickHouse client connection to {client_config['host']}:{client_config['port']} "
f"as {client_config['username']} "
f"(secure={client_config['secure']}, verify={client_config['verify']}, "
f"connect_timeout={client_config['connect_timeout']}s, "
f"send_receive_timeout={client_config['send_receive_timeout']}s)"
)
try:
client = clickhouse_connect.get_client(**client_config)
# Test the connection
version = client.server_version
logger.info(f"Successfully connected to ClickHouse server version {version}")
return client
except Exception as e:
logger.error(f"Failed to connect to ClickHouse: {str(e)}")
raise
def get_readonly_setting(client) -> str:
"""Get the appropriate readonly setting value to use for queries.
This function handles potential conflicts between server and client readonly settings:
- readonly=0: No read-only restrictions
- readonly=1: Only read queries allowed, settings cannot be changed
- readonly=2: Only read queries allowed, settings can be changed (except readonly itself)
If server has readonly=2 and client tries to set readonly=1, it would cause:
"Setting readonly is unknown or readonly" error
This function preserves the server's readonly setting unless it's 0, in which case
we enforce readonly=1 to ensure queries are read-only.
Args:
client: ClickHouse client connection
Returns:
String value of readonly setting to use
"""
read_only = client.server_settings.get("readonly")
if read_only:
if read_only == "0":
return "1" # Force read-only mode if server has it disabled
else:
return read_only.value # Respect server's readonly setting (likely 2)
else:
return "1" # Default to basic read-only mode if setting isn't present