server.py•5.84 kB
import asyncio
import logging
import os
from typing import Annotated
from clickhouse_connect import get_client
from clickhouse_connect.driver.client import Client
from mcp.server import Server
from mcp.types import Resource, TextContent, Tool
from pydantic import AnyUrl, UrlConstraints
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("clickhouse_mcp_server")
ClickhouseDsn = Annotated[
    AnyUrl,
    UrlConstraints(
        host_required=True,
        allowed_schemes=[
            "clickhouse",
        ],
    ),
]
def get_clickhouse_client(
    host: str | None = None,
    port: int | None = None,
    username: str | None = None,
    password: str | None = None,
    database: str | None = None,
) -> Client:
    """Create and return a ClickHouse client."""
    CLICKHOUSE_HOST = host or os.getenv("CLICKHOUSE_HOST", "localhost")
    CLICKHOUSE_PORT = port or int(os.getenv("CLICKHOUSE_PORT", "8123"))
    CLICKHOUSE_USER = username or os.getenv("CLICKHOUSE_USER", "default")
    CLICKHOUSE_PASSWORD = password or os.getenv("CLICKHOUSE_PASSWORD", "")
    CLICKHOUSE_DATABASE = database or os.getenv("CLICKHOUSE_DATABASE", "default")
    return get_client(
        host=CLICKHOUSE_HOST,
        port=CLICKHOUSE_PORT,
        username=CLICKHOUSE_USER,
        password=CLICKHOUSE_PASSWORD,
        database=CLICKHOUSE_DATABASE,
    )
app = Server("clickhouse_mcp_server")
@app.list_resources()
async def list_resources() -> list[Resource]:
    """List ClickHouse databases and tables as resources."""
    client = get_clickhouse_client()
    resources = []
    db_query = """
    SELECT
        name,
        engine
    FROM system.databases
    WHERE name NOT IN ('system', 'information_schema', 'INFORMATION_SCHEMA')
    """
    db_result = client.query(db_query)
    for db in db_result.result_rows:
        database = db[0]
        resources.append(
            Resource(
                uri=AnyUrl(f"clickhouse://{database}/tables"),
                name=f"Database: {database}",
                mimeType="text/plain",
                description=f"Tables in database: {database}",
            )
        )
        # List tables for each database
        table_result = client.query(f"SHOW TABLES FROM {database}")
        resources.extend(
            [
                Resource(
                    uri=AnyUrl(f"clickhouse://{database}/{table[0]}/schema"),
                    name=f"Table: {database}.{table[0]}",
                    mimeType="text/plain",
                    description=f"Schema of table: {database}.{table[0]}",
                )
                for table in table_result.result_rows
            ]
        )
    return resources
@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:
    """Read resource contents."""
    client = get_clickhouse_client()
    uri_str = str(uri)
    logger.info(f"Reading resource: {uri_str}")
    if not uri_str.startswith("clickhouse://"):
        raise ValueError(f"Invalid URI scheme: {uri_str}")
    parts = uri_str[len("clickhouse://") :].split("/")
    if len(parts) == 2 and parts[1] == "tables":
        database = parts[0]
        result = client.query(f"SHOW TABLES FROM {database}")
        return "\n".join(row[0] for row in result.result_rows)
    elif len(parts) == 3 and parts[2] == "schema":
        database, table = parts[0], parts[1]
        result = client.query(f"DESCRIBE TABLE {database}.{table}")
        schema = [f"{row[0]} - {row[1]}" for row in result.result_rows]
        return "\n".join(schema)
    else:
        raise ValueError(f"Invalid resource URI: {uri_str}")
@app.list_tools()
async def list_tools() -> list[Tool]:
    """List available ClickHouse tools."""
    logger.info("Listing tools...")
    return [
        Tool(
            name="execute_select_query",
            description="Execute a SELECT query on the ClickHouse server",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The SELECT query to execute",
                    }
                },
                "required": ["query"],
            },
        )
    ]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Execute SELECT queries."""
    logger.info(f"Calling tool: {name} with arguments: {arguments}")
    if name != "execute_select_query":
        raise ValueError(f"Unknown tool: {name}")
    query = arguments.get("query")
    if not query:
        raise ValueError("Query is required")
    if not query.strip().upper().startswith("SELECT"):
        return [TextContent(type="text", text="Error: Only SELECT queries are allowed.")]
    try:
        client = get_clickhouse_client()
        result = client.query(query)
        output = []
        output.append("\t".join(result.column_names))
        for row in result.result_rows:
            output.append("\t".join(str(value) for value in row))
        return [TextContent(type="text", text="\n".join(output))]
    except Exception as e:
        logger.error(f"Error executing SQL '{query}': {e}")
        return [TextContent(type="text", text=f"Error executing query: {str(e)}")]
async def main():
    """Main entry point to run the MCP server."""
    from mcp.server.stdio import stdio_server
    logger.info("Starting ClickHouse MCP server...")
    async with stdio_server() as (read_stream, write_stream):
        try:
            await app.run(read_stream, write_stream, app.create_initialization_options())
        except Exception as e:
            logger.error(f"Server error: {str(e)}", exc_info=True)
            raise
if __name__ == "__main__":
    asyncio.run(main())