"""KDB+ connection handler for managing database connections."""
import socket
import struct
import numpy as np
import pandas as pd
from typing import Any, Dict, List, Optional, Union
from datetime import datetime, date, time
import logging
logger = logging.getLogger(__name__)
class KDBConnection:
"""Handles connections and queries to KDB+ databases."""
def __init__(self, host: str, port: int, username: str = "", password: str = ""):
"""
Initialize KDB connection parameters.
Args:
host: KDB server hostname
port: KDB server port
username: Username for authentication (optional)
password: Password for authentication (optional)
"""
self.host = host
self.port = port
self.username = username
self.password = password
self.socket = None
self.connected = False
def connect(self) -> bool:
"""
Establish connection to KDB+ server.
Returns:
True if connection successful, False otherwise
"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
# Send authentication
auth_string = f"{self.username}:{self.password}\x00"
self.socket.send(auth_string.encode('ascii'))
# Receive capability byte
capability = self.socket.recv(1)
if not capability:
raise ConnectionError("Failed to receive capability byte")
self.connected = True
logger.info(f"Connected to KDB+ at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
self.connected = False
return False
def disconnect(self):
"""Close the KDB+ connection."""
if self.socket:
self.socket.close()
self.socket = None
self.connected = False
logger.info("Disconnected from KDB+")
def execute(self, query: str) -> Any:
"""
Execute a Q query on the KDB+ server.
Args:
query: Q language query string
Returns:
Query result (could be various types depending on query)
"""
if not self.connected:
raise ConnectionError("Not connected to KDB+ server")
try:
# Encode and send query
query_bytes = self._encode_query(query)
self.socket.send(query_bytes)
# Receive and decode response
response = self._receive_response()
result = self._decode_response(response)
return result
except Exception as e:
logger.error(f"Query execution failed: {e}")
raise
def _encode_query(self, query: str) -> bytes:
"""
Encode a query for sending to KDB+.
Args:
query: Q language query string
Returns:
Encoded query bytes
"""
query_bytes = query.encode('utf-8')
# Message format: [msgtype(1)][compressed(1)][unused(2)][length(4)][data]
header = struct.pack('<bbhi', 1, 0, 0, len(query_bytes) + 8)
return header + query_bytes
def _receive_response(self) -> bytes:
"""
Receive response from KDB+ server.
Returns:
Raw response bytes
"""
# Read header (8 bytes)
header = self.socket.recv(8)
if len(header) < 8:
raise ConnectionError("Incomplete header received")
# Parse header
msgtype, compressed, unused, length = struct.unpack('<bbhi', header)
# Read data
data_length = length - 8
data = b''
while len(data) < data_length:
chunk = self.socket.recv(min(4096, data_length - len(data)))
if not chunk:
raise ConnectionError("Connection closed while receiving data")
data += chunk
return data
def _decode_response(self, data: bytes) -> Any:
"""
Decode KDB+ response data.
Args:
data: Raw response bytes
Returns:
Decoded Python object
"""
# This is a simplified decoder - full IPC protocol implementation would be more complex
try:
# Try to decode as string first
return data.decode('utf-8')
except:
# Return raw bytes if not decodable
return data
def get_tables(self) -> List[str]:
"""
Get list of tables in the KDB+ database.
Returns:
List of table names
"""
result = self.execute("tables[]")
if isinstance(result, str):
return result.split()
return []
def get_table_schema(self, table_name: str) -> Dict[str, str]:
"""
Get schema information for a table.
Args:
table_name: Name of the table
Returns:
Dictionary mapping column names to types
"""
query = f"meta {table_name}"
result = self.execute(query)
# Parse result into schema dict
# This would need proper parsing based on KDB+ response format
return {}
def select(self, table: str, columns: Optional[List[str]] = None,
where: Optional[str] = None, limit: Optional[int] = None) -> Any:
"""
Execute a SELECT query on a table.
Args:
table: Table name
columns: List of column names (None for all)
where: WHERE clause condition
limit: Number of rows to return
Returns:
Query result
"""
# Build query
col_str = ",".join(columns) if columns else ""
query = f"select {col_str} from {table}"
if where:
query += f" where {where}"
if limit:
query = f"{limit}#{query}"
return self.execute(query)
def insert(self, table: str, data: Dict[str, Any]) -> bool:
"""
Insert data into a table.
Args:
table: Table name
data: Dictionary of column names to values
Returns:
True if successful
"""
# Build insert statement
columns = list(data.keys())
values = list(data.values())
# Format values for Q
formatted_values = []
for v in values:
if isinstance(v, str):
formatted_values.append(f'"{v}"')
else:
formatted_values.append(str(v))
query = f"`{table} insert ({';'.join(formatted_values)})"
try:
self.execute(query)
return True
except:
return False
def update(self, table: str, updates: Dict[str, Any], where: Optional[str] = None) -> bool:
"""
Update data in a table.
Args:
table: Table name
updates: Dictionary of column names to new values
where: WHERE clause condition
Returns:
True if successful
"""
# Build update statement
set_clauses = []
for col, val in updates.items():
if isinstance(val, str):
set_clauses.append(f'{col}:"{val}"')
else:
set_clauses.append(f'{col}:{val}')
query = f"update {', '.join(set_clauses)} from {table}"
if where:
query += f" where {where}"
try:
self.execute(query)
return True
except:
return False
def delete(self, table: str, where: str) -> bool:
"""
Delete rows from a table.
Args:
table: Table name
where: WHERE clause condition
Returns:
True if successful
"""
query = f"delete from {table} where {where}"
try:
self.execute(query)
return True
except:
return False
class KDBConnectionPool:
"""Manages a pool of KDB+ connections for concurrent access."""
def __init__(self, config: Dict[str, Any], pool_size: int = 5):
"""
Initialize connection pool.
Args:
config: Database configuration
pool_size: Maximum number of connections
"""
self.config = config
self.pool_size = pool_size
self.connections: List[KDBConnection] = []
self.available: List[bool] = []
# Initialize connections
for _ in range(pool_size):
conn = KDBConnection(
host=config.get('host', 'localhost'),
port=config.get('port', 5000),
username=config.get('username', ''),
password=config.get('password', '')
)
self.connections.append(conn)
self.available.append(True)
def get_connection(self) -> Optional[KDBConnection]:
"""
Get an available connection from the pool.
Returns:
KDBConnection instance or None if none available
"""
for i, available in enumerate(self.available):
if available:
conn = self.connections[i]
if not conn.connected:
conn.connect()
self.available[i] = False
return conn
return None
def release_connection(self, conn: KDBConnection):
"""
Return a connection to the pool.
Args:
conn: Connection to release
"""
for i, c in enumerate(self.connections):
if c == conn:
self.available[i] = True
break
def close_all(self):
"""Close all connections in the pool."""
for conn in self.connections:
if conn.connected:
conn.disconnect()