Skip to main content
Glama
kdb_connection.py10 kB
"""KDB+ connection handler for managing database connections.""" import socket import struct import numpy as np import pandas as pd from typing import Any, Dict, List, Optional, Union from datetime import datetime, date, time import logging logger = logging.getLogger(__name__) class KDBConnection: """Handles connections and queries to KDB+ databases.""" def __init__(self, host: str, port: int, username: str = "", password: str = ""): """ Initialize KDB connection parameters. Args: host: KDB server hostname port: KDB server port username: Username for authentication (optional) password: Password for authentication (optional) """ self.host = host self.port = port self.username = username self.password = password self.socket = None self.connected = False def connect(self) -> bool: """ Establish connection to KDB+ server. Returns: True if connection successful, False otherwise """ try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.host, self.port)) # Send authentication auth_string = f"{self.username}:{self.password}\x00" self.socket.send(auth_string.encode('ascii')) # Receive capability byte capability = self.socket.recv(1) if not capability: raise ConnectionError("Failed to receive capability byte") self.connected = True logger.info(f"Connected to KDB+ at {self.host}:{self.port}") return True except Exception as e: logger.error(f"Connection failed: {e}") self.connected = False return False def disconnect(self): """Close the KDB+ connection.""" if self.socket: self.socket.close() self.socket = None self.connected = False logger.info("Disconnected from KDB+") def execute(self, query: str) -> Any: """ Execute a Q query on the KDB+ server. Args: query: Q language query string Returns: Query result (could be various types depending on query) """ if not self.connected: raise ConnectionError("Not connected to KDB+ server") try: # Encode and send query query_bytes = self._encode_query(query) self.socket.send(query_bytes) # Receive and decode response response = self._receive_response() result = self._decode_response(response) return result except Exception as e: logger.error(f"Query execution failed: {e}") raise def _encode_query(self, query: str) -> bytes: """ Encode a query for sending to KDB+. Args: query: Q language query string Returns: Encoded query bytes """ query_bytes = query.encode('utf-8') # Message format: [msgtype(1)][compressed(1)][unused(2)][length(4)][data] header = struct.pack('<bbhi', 1, 0, 0, len(query_bytes) + 8) return header + query_bytes def _receive_response(self) -> bytes: """ Receive response from KDB+ server. Returns: Raw response bytes """ # Read header (8 bytes) header = self.socket.recv(8) if len(header) < 8: raise ConnectionError("Incomplete header received") # Parse header msgtype, compressed, unused, length = struct.unpack('<bbhi', header) # Read data data_length = length - 8 data = b'' while len(data) < data_length: chunk = self.socket.recv(min(4096, data_length - len(data))) if not chunk: raise ConnectionError("Connection closed while receiving data") data += chunk return data def _decode_response(self, data: bytes) -> Any: """ Decode KDB+ response data. Args: data: Raw response bytes Returns: Decoded Python object """ # This is a simplified decoder - full IPC protocol implementation would be more complex try: # Try to decode as string first return data.decode('utf-8') except: # Return raw bytes if not decodable return data def get_tables(self) -> List[str]: """ Get list of tables in the KDB+ database. Returns: List of table names """ result = self.execute("tables[]") if isinstance(result, str): return result.split() return [] def get_table_schema(self, table_name: str) -> Dict[str, str]: """ Get schema information for a table. Args: table_name: Name of the table Returns: Dictionary mapping column names to types """ query = f"meta {table_name}" result = self.execute(query) # Parse result into schema dict # This would need proper parsing based on KDB+ response format return {} def select(self, table: str, columns: Optional[List[str]] = None, where: Optional[str] = None, limit: Optional[int] = None) -> Any: """ Execute a SELECT query on a table. Args: table: Table name columns: List of column names (None for all) where: WHERE clause condition limit: Number of rows to return Returns: Query result """ # Build query col_str = ",".join(columns) if columns else "" query = f"select {col_str} from {table}" if where: query += f" where {where}" if limit: query = f"{limit}#{query}" return self.execute(query) def insert(self, table: str, data: Dict[str, Any]) -> bool: """ Insert data into a table. Args: table: Table name data: Dictionary of column names to values Returns: True if successful """ # Build insert statement columns = list(data.keys()) values = list(data.values()) # Format values for Q formatted_values = [] for v in values: if isinstance(v, str): formatted_values.append(f'"{v}"') else: formatted_values.append(str(v)) query = f"`{table} insert ({';'.join(formatted_values)})" try: self.execute(query) return True except: return False def update(self, table: str, updates: Dict[str, Any], where: Optional[str] = None) -> bool: """ Update data in a table. Args: table: Table name updates: Dictionary of column names to new values where: WHERE clause condition Returns: True if successful """ # Build update statement set_clauses = [] for col, val in updates.items(): if isinstance(val, str): set_clauses.append(f'{col}:"{val}"') else: set_clauses.append(f'{col}:{val}') query = f"update {', '.join(set_clauses)} from {table}" if where: query += f" where {where}" try: self.execute(query) return True except: return False def delete(self, table: str, where: str) -> bool: """ Delete rows from a table. Args: table: Table name where: WHERE clause condition Returns: True if successful """ query = f"delete from {table} where {where}" try: self.execute(query) return True except: return False class KDBConnectionPool: """Manages a pool of KDB+ connections for concurrent access.""" def __init__(self, config: Dict[str, Any], pool_size: int = 5): """ Initialize connection pool. Args: config: Database configuration pool_size: Maximum number of connections """ self.config = config self.pool_size = pool_size self.connections: List[KDBConnection] = [] self.available: List[bool] = [] # Initialize connections for _ in range(pool_size): conn = KDBConnection( host=config.get('host', 'localhost'), port=config.get('port', 5000), username=config.get('username', ''), password=config.get('password', '') ) self.connections.append(conn) self.available.append(True) def get_connection(self) -> Optional[KDBConnection]: """ Get an available connection from the pool. Returns: KDBConnection instance or None if none available """ for i, available in enumerate(self.available): if available: conn = self.connections[i] if not conn.connected: conn.connect() self.available[i] = False return conn return None def release_connection(self, conn: KDBConnection): """ Return a connection to the pool. Args: conn: Connection to release """ for i, c in enumerate(self.connections): if c == conn: self.available[i] = True break def close_all(self): """Close all connections in the pool.""" for conn in self.connections: if conn.connected: conn.disconnect()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/riteshsonawala/kdb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server