mcp-server-starrocks

by hagsmand
Verified
  • src
  • mcp_server_starrocks
import logging from contextlib import closing from typing import Any, List from sqlalchemy import create_engine, text from sqlalchemy.engine.base import Engine import mcp.server.stdio import mcp.types as types from mcp.server import Server from pydantic import AnyUrl from mcp_server_starrocks import Config logger = logging.getLogger("mcp-server-starrocks") logger.info("Starting MCP StarRocks Server") class StarRocksDatabase: def __init__(self, config: Config): if config.password is None: self.engine: Engine = create_engine( f'starrocks://{config.user}:@{config.host}:{config.port}/{config.database}' ) else: self.engine: Engine = create_engine( f'starrocks://{config.user}:{config.password}@{config.host}:{config.port}/{config.database}' ) def execute_query(self, query: str, parameters: list = None) -> List[Any]: with self.engine.connect() as connection: if parameters: result = connection.execute(text(query), parameters) else: result = connection.execute(text(query)) try: return result.fetchall() except: # For non-SELECT queries that don't return results return [] async def main(config: Config): logger.info(f"Starting StarRocks MCP Server with connection to: {config.host}:{config.port}") db = StarRocksDatabase(config) server = Server("mcp-starrocks-server") logger.debug("Registering handlers") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools""" tools = [ types.Tool( name="read-query", description="Execute a SELECT query on the StarRocks database", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "SELECT SQL query to execute", }, }, "required": ["query"], }, ), types.Tool( name="list-tables", description="List all tables in the StarRocks database", inputSchema={ "type": "object", "properties": {}, }, ), types.Tool( name="describe-table", description="Describe the schema of a specific table in the StarRocks database", inputSchema={ "type": "object", "properties": { "table_name": { "type": "string", "description": "Name of the table to describe", }, }, "required": ["table_name"], }, ), ] if not getattr(config, 'readonly', False): tools.extend([ types.Tool( name="write-query", description="Execute an INSERT, UPDATE, or DELETE query on the StarRocks database", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "SQL query to execute", }, }, "required": ["query"], }, ), types.Tool( name="create-table", description="Create a new table in the StarRocks database", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "CREATE TABLE SQL query to execute", }, }, "required": ["query"], }, ), ]) return tools @server.call_tool() async def handle_call_tool( name: str, arguments: dict[str, Any] | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """Handle tool execution requests""" try: if name == "list-tables": results = db.execute_query("SHOW TABLES") return [types.TextContent(type="text", text=str(results))] if name == "read-query": if not arguments["query"].strip().upper().startswith("SELECT"): raise ValueError("Only SELECT queries are allowed for read-query") results = db.execute_query(arguments["query"]) return [types.TextContent(type="text", text=str(results))] elif name == "describe-table": table_name = arguments["table_name"] results = db.execute_query(f"DESCRIBE {table_name}") return [types.TextContent(type="text", text=str(results))] elif name == "create-table": if getattr(config, 'readonly', False): raise ValueError("Server is running in read-only mode") results = db.execute_query(arguments["query"]) return [types.TextContent(type="text", text="Table created successfully.")] elif name == "write-query": if getattr(config, 'readonly', False): raise ValueError("Server is running in read-only mode") results = db.execute_query(arguments["query"]) return [types.TextContent(type="text", text=str(results))] else: raise ValueError(f"Unknown tool: {name}") except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] # Run the server using stdin/stdout streams options = server.create_initialization_options() async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): logger.info("StarRocks MCP Server running with stdio transport") await server.run( read_stream, write_stream, options, )