Skip to main content
Glama
mcp_server.py16.4 kB
"""MCP Server implementation for KDB+ database interactions.""" import asyncio import json import logging from typing import Any, Dict, List, Optional from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.types import ( Tool, TextContent, ImageContent, EmbeddedResource, LoggingLevel ) from .kdb_connection import KDBConnection, KDBConnectionPool from .config import load_config logger = logging.getLogger(__name__) class KDBMCPServer: """MCP Server for KDB+ database operations.""" def __init__(self, config_path: str = "config/kdb_config.yaml"): """ Initialize the MCP server. Args: config_path: Path to configuration file """ self.server = Server("kdb-mcp-server") self.config = load_config(config_path) self.connection_pools: Dict[str, KDBConnectionPool] = {} # Initialize connection pools for each configured database for db_name, db_config in self.config.get('databases', {}).items(): pool = KDBConnectionPool(db_config, pool_size=db_config.get('pool_size', 5)) self.connection_pools[db_name] = pool logger.info(f"Initialized connection pool for database: {db_name}") # Register handlers self._register_handlers() def _register_handlers(self): """Register MCP protocol handlers.""" @self.server.list_tools() async def handle_list_tools() -> List[Tool]: """Return list of available KDB tools.""" return [ Tool( name="kdb_query", description="Execute a Q query on a KDB+ database", input_schema={ "type": "object", "properties": { "database": { "type": "string", "description": "Name of the database to query" }, "query": { "type": "string", "description": "Q language query to execute" } }, "required": ["database", "query"] } ), Tool( name="kdb_list_tables", description="List all tables in a KDB+ database", input_schema={ "type": "object", "properties": { "database": { "type": "string", "description": "Name of the database" } }, "required": ["database"] } ), Tool( name="kdb_get_schema", description="Get schema information for a table", input_schema={ "type": "object", "properties": { "database": { "type": "string", "description": "Name of the database" }, "table": { "type": "string", "description": "Name of the table" } }, "required": ["database", "table"] } ), Tool( name="kdb_select", description="Execute a SELECT query on a KDB+ table", input_schema={ "type": "object", "properties": { "database": { "type": "string", "description": "Name of the database" }, "table": { "type": "string", "description": "Name of the table" }, "columns": { "type": "array", "items": {"type": "string"}, "description": "List of columns to select (optional)" }, "where": { "type": "string", "description": "WHERE clause condition (optional)" }, "limit": { "type": "integer", "description": "Number of rows to return (optional)" } }, "required": ["database", "table"] } ), Tool( name="kdb_insert", description="Insert data into a KDB+ table", input_schema={ "type": "object", "properties": { "database": { "type": "string", "description": "Name of the database" }, "table": { "type": "string", "description": "Name of the table" }, "data": { "type": "object", "description": "Data to insert (column: value pairs)" } }, "required": ["database", "table", "data"] } ), Tool( name="kdb_update", description="Update data in a KDB+ table", input_schema={ "type": "object", "properties": { "database": { "type": "string", "description": "Name of the database" }, "table": { "type": "string", "description": "Name of the table" }, "updates": { "type": "object", "description": "Updates to apply (column: value pairs)" }, "where": { "type": "string", "description": "WHERE clause condition (optional)" } }, "required": ["database", "table", "updates"] } ), Tool( name="kdb_delete", description="Delete rows from a KDB+ table", input_schema={ "type": "object", "properties": { "database": { "type": "string", "description": "Name of the database" }, "table": { "type": "string", "description": "Name of the table" }, "where": { "type": "string", "description": "WHERE clause condition" } }, "required": ["database", "table", "where"] } ), Tool( name="kdb_list_databases", description="List all configured KDB+ databases", input_schema={ "type": "object", "properties": {} } ) ] @self.server.call_tool() async def handle_call_tool(name: str, arguments: Optional[Dict[str, Any]]) -> List[TextContent]: """Handle tool execution requests.""" try: if name == "kdb_query": result = await self._execute_query( arguments.get("database"), arguments.get("query") ) elif name == "kdb_list_tables": result = await self._list_tables(arguments.get("database")) elif name == "kdb_get_schema": result = await self._get_schema( arguments.get("database"), arguments.get("table") ) elif name == "kdb_select": result = await self._select( arguments.get("database"), arguments.get("table"), arguments.get("columns"), arguments.get("where"), arguments.get("limit") ) elif name == "kdb_insert": result = await self._insert( arguments.get("database"), arguments.get("table"), arguments.get("data") ) elif name == "kdb_update": result = await self._update( arguments.get("database"), arguments.get("table"), arguments.get("updates"), arguments.get("where") ) elif name == "kdb_delete": result = await self._delete( arguments.get("database"), arguments.get("table"), arguments.get("where") ) elif name == "kdb_list_databases": result = await self._list_databases() else: result = f"Unknown tool: {name}" return [TextContent(type="text", text=str(result))] except Exception as e: logger.error(f"Tool execution failed: {e}") return [TextContent(type="text", text=f"Error: {str(e)}")] async def _execute_query(self, database: str, query: str) -> Any: """Execute a Q query on the specified database.""" pool = self.connection_pools.get(database) if not pool: raise ValueError(f"Database '{database}' not configured") conn = pool.get_connection() if not conn: raise ConnectionError("No available connections in pool") try: result = await asyncio.to_thread(conn.execute, query) return result finally: pool.release_connection(conn) async def _list_tables(self, database: str) -> List[str]: """List all tables in the specified database.""" pool = self.connection_pools.get(database) if not pool: raise ValueError(f"Database '{database}' not configured") conn = pool.get_connection() if not conn: raise ConnectionError("No available connections in pool") try: tables = await asyncio.to_thread(conn.get_tables) return tables finally: pool.release_connection(conn) async def _get_schema(self, database: str, table: str) -> Dict[str, str]: """Get schema information for a table.""" pool = self.connection_pools.get(database) if not pool: raise ValueError(f"Database '{database}' not configured") conn = pool.get_connection() if not conn: raise ConnectionError("No available connections in pool") try: schema = await asyncio.to_thread(conn.get_table_schema, table) return schema finally: pool.release_connection(conn) async def _select(self, database: str, table: str, columns: Optional[List[str]] = None, where: Optional[str] = None, limit: Optional[int] = None) -> Any: """Execute a SELECT query.""" pool = self.connection_pools.get(database) if not pool: raise ValueError(f"Database '{database}' not configured") conn = pool.get_connection() if not conn: raise ConnectionError("No available connections in pool") try: result = await asyncio.to_thread( conn.select, table, columns, where, limit ) return result finally: pool.release_connection(conn) async def _insert(self, database: str, table: str, data: Dict[str, Any]) -> bool: """Insert data into a table.""" pool = self.connection_pools.get(database) if not pool: raise ValueError(f"Database '{database}' not configured") conn = pool.get_connection() if not conn: raise ConnectionError("No available connections in pool") try: success = await asyncio.to_thread(conn.insert, table, data) return {"success": success} finally: pool.release_connection(conn) async def _update(self, database: str, table: str, updates: Dict[str, Any], where: Optional[str] = None) -> bool: """Update data in a table.""" pool = self.connection_pools.get(database) if not pool: raise ValueError(f"Database '{database}' not configured") conn = pool.get_connection() if not conn: raise ConnectionError("No available connections in pool") try: success = await asyncio.to_thread(conn.update, table, updates, where) return {"success": success} finally: pool.release_connection(conn) async def _delete(self, database: str, table: str, where: str) -> bool: """Delete rows from a table.""" pool = self.connection_pools.get(database) if not pool: raise ValueError(f"Database '{database}' not configured") conn = pool.get_connection() if not conn: raise ConnectionError("No available connections in pool") try: success = await asyncio.to_thread(conn.delete, table, where) return {"success": success} finally: pool.release_connection(conn) async def _list_databases(self) -> Dict[str, Any]: """List all configured databases.""" databases = {} for name, config in self.config.get('databases', {}).items(): databases[name] = { "host": config.get('host', 'localhost'), "port": config.get('port', 5000), "description": config.get('description', '') } return databases async def run(self): """Run the MCP server.""" from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="kdb-mcp-server", server_version="0.1.0" ) ) def cleanup(self): """Clean up resources.""" for pool in self.connection_pools.values(): pool.close_all() async def main(): """Main entry point.""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) server = KDBMCPServer() try: await server.run() finally: server.cleanup() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/riteshsonawala/kdb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server