"""MCP Server implementation for KDB+ database interactions."""
import asyncio
import json
import logging
from typing import Any, Dict, List, Optional
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
from .kdb_connection import KDBConnection, KDBConnectionPool
from .config import load_config
logger = logging.getLogger(__name__)
class KDBMCPServer:
"""MCP Server for KDB+ database operations."""
def __init__(self, config_path: str = "config/kdb_config.yaml"):
"""
Initialize the MCP server.
Args:
config_path: Path to configuration file
"""
self.server = Server("kdb-mcp-server")
self.config = load_config(config_path)
self.connection_pools: Dict[str, KDBConnectionPool] = {}
# Initialize connection pools for each configured database
for db_name, db_config in self.config.get('databases', {}).items():
pool = KDBConnectionPool(db_config, pool_size=db_config.get('pool_size', 5))
self.connection_pools[db_name] = pool
logger.info(f"Initialized connection pool for database: {db_name}")
# Register handlers
self._register_handlers()
def _register_handlers(self):
"""Register MCP protocol handlers."""
@self.server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""Return list of available KDB tools."""
return [
Tool(
name="kdb_query",
description="Execute a Q query on a KDB+ database",
input_schema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Name of the database to query"
},
"query": {
"type": "string",
"description": "Q language query to execute"
}
},
"required": ["database", "query"]
}
),
Tool(
name="kdb_list_tables",
description="List all tables in a KDB+ database",
input_schema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Name of the database"
}
},
"required": ["database"]
}
),
Tool(
name="kdb_get_schema",
description="Get schema information for a table",
input_schema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Name of the database"
},
"table": {
"type": "string",
"description": "Name of the table"
}
},
"required": ["database", "table"]
}
),
Tool(
name="kdb_select",
description="Execute a SELECT query on a KDB+ table",
input_schema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Name of the database"
},
"table": {
"type": "string",
"description": "Name of the table"
},
"columns": {
"type": "array",
"items": {"type": "string"},
"description": "List of columns to select (optional)"
},
"where": {
"type": "string",
"description": "WHERE clause condition (optional)"
},
"limit": {
"type": "integer",
"description": "Number of rows to return (optional)"
}
},
"required": ["database", "table"]
}
),
Tool(
name="kdb_insert",
description="Insert data into a KDB+ table",
input_schema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Name of the database"
},
"table": {
"type": "string",
"description": "Name of the table"
},
"data": {
"type": "object",
"description": "Data to insert (column: value pairs)"
}
},
"required": ["database", "table", "data"]
}
),
Tool(
name="kdb_update",
description="Update data in a KDB+ table",
input_schema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Name of the database"
},
"table": {
"type": "string",
"description": "Name of the table"
},
"updates": {
"type": "object",
"description": "Updates to apply (column: value pairs)"
},
"where": {
"type": "string",
"description": "WHERE clause condition (optional)"
}
},
"required": ["database", "table", "updates"]
}
),
Tool(
name="kdb_delete",
description="Delete rows from a KDB+ table",
input_schema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Name of the database"
},
"table": {
"type": "string",
"description": "Name of the table"
},
"where": {
"type": "string",
"description": "WHERE clause condition"
}
},
"required": ["database", "table", "where"]
}
),
Tool(
name="kdb_list_databases",
description="List all configured KDB+ databases",
input_schema={
"type": "object",
"properties": {}
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Optional[Dict[str, Any]]) -> List[TextContent]:
"""Handle tool execution requests."""
try:
if name == "kdb_query":
result = await self._execute_query(
arguments.get("database"),
arguments.get("query")
)
elif name == "kdb_list_tables":
result = await self._list_tables(arguments.get("database"))
elif name == "kdb_get_schema":
result = await self._get_schema(
arguments.get("database"),
arguments.get("table")
)
elif name == "kdb_select":
result = await self._select(
arguments.get("database"),
arguments.get("table"),
arguments.get("columns"),
arguments.get("where"),
arguments.get("limit")
)
elif name == "kdb_insert":
result = await self._insert(
arguments.get("database"),
arguments.get("table"),
arguments.get("data")
)
elif name == "kdb_update":
result = await self._update(
arguments.get("database"),
arguments.get("table"),
arguments.get("updates"),
arguments.get("where")
)
elif name == "kdb_delete":
result = await self._delete(
arguments.get("database"),
arguments.get("table"),
arguments.get("where")
)
elif name == "kdb_list_databases":
result = await self._list_databases()
else:
result = f"Unknown tool: {name}"
return [TextContent(type="text", text=str(result))]
except Exception as e:
logger.error(f"Tool execution failed: {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def _execute_query(self, database: str, query: str) -> Any:
"""Execute a Q query on the specified database."""
pool = self.connection_pools.get(database)
if not pool:
raise ValueError(f"Database '{database}' not configured")
conn = pool.get_connection()
if not conn:
raise ConnectionError("No available connections in pool")
try:
result = await asyncio.to_thread(conn.execute, query)
return result
finally:
pool.release_connection(conn)
async def _list_tables(self, database: str) -> List[str]:
"""List all tables in the specified database."""
pool = self.connection_pools.get(database)
if not pool:
raise ValueError(f"Database '{database}' not configured")
conn = pool.get_connection()
if not conn:
raise ConnectionError("No available connections in pool")
try:
tables = await asyncio.to_thread(conn.get_tables)
return tables
finally:
pool.release_connection(conn)
async def _get_schema(self, database: str, table: str) -> Dict[str, str]:
"""Get schema information for a table."""
pool = self.connection_pools.get(database)
if not pool:
raise ValueError(f"Database '{database}' not configured")
conn = pool.get_connection()
if not conn:
raise ConnectionError("No available connections in pool")
try:
schema = await asyncio.to_thread(conn.get_table_schema, table)
return schema
finally:
pool.release_connection(conn)
async def _select(self, database: str, table: str, columns: Optional[List[str]] = None,
where: Optional[str] = None, limit: Optional[int] = None) -> Any:
"""Execute a SELECT query."""
pool = self.connection_pools.get(database)
if not pool:
raise ValueError(f"Database '{database}' not configured")
conn = pool.get_connection()
if not conn:
raise ConnectionError("No available connections in pool")
try:
result = await asyncio.to_thread(
conn.select, table, columns, where, limit
)
return result
finally:
pool.release_connection(conn)
async def _insert(self, database: str, table: str, data: Dict[str, Any]) -> bool:
"""Insert data into a table."""
pool = self.connection_pools.get(database)
if not pool:
raise ValueError(f"Database '{database}' not configured")
conn = pool.get_connection()
if not conn:
raise ConnectionError("No available connections in pool")
try:
success = await asyncio.to_thread(conn.insert, table, data)
return {"success": success}
finally:
pool.release_connection(conn)
async def _update(self, database: str, table: str, updates: Dict[str, Any],
where: Optional[str] = None) -> bool:
"""Update data in a table."""
pool = self.connection_pools.get(database)
if not pool:
raise ValueError(f"Database '{database}' not configured")
conn = pool.get_connection()
if not conn:
raise ConnectionError("No available connections in pool")
try:
success = await asyncio.to_thread(conn.update, table, updates, where)
return {"success": success}
finally:
pool.release_connection(conn)
async def _delete(self, database: str, table: str, where: str) -> bool:
"""Delete rows from a table."""
pool = self.connection_pools.get(database)
if not pool:
raise ValueError(f"Database '{database}' not configured")
conn = pool.get_connection()
if not conn:
raise ConnectionError("No available connections in pool")
try:
success = await asyncio.to_thread(conn.delete, table, where)
return {"success": success}
finally:
pool.release_connection(conn)
async def _list_databases(self) -> Dict[str, Any]:
"""List all configured databases."""
databases = {}
for name, config in self.config.get('databases', {}).items():
databases[name] = {
"host": config.get('host', 'localhost'),
"port": config.get('port', 5000),
"description": config.get('description', '')
}
return databases
async def run(self):
"""Run the MCP server."""
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="kdb-mcp-server",
server_version="0.1.0"
)
)
def cleanup(self):
"""Clean up resources."""
for pool in self.connection_pools.values():
pool.close_all()
async def main():
"""Main entry point."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
server = KDBMCPServer()
try:
await server.run()
finally:
server.cleanup()
if __name__ == "__main__":
asyncio.run(main())