client.py•18.7 kB
"""
EdgeLake HTTP Client
Multi-threaded client for communicating with EdgeLake nodes via REST API.
Handles database discovery, table schema retrieval, and SQL query execution.
License: Mozilla Public License 2.0
"""
import asyncio
import json
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any, Optional
import requests
from requests.exceptions import HTTPError, RequestException
logger = logging.getLogger('edgelake-mcp-server.client')
class EdgeLakeClient:
"""
Async client for EdgeLake REST API with multi-threaded request execution.
"""
def __init__(self, host: str, port: int, timeout: int = 20, max_workers: int = 10):
"""
Initialize EdgeLake client.
Args:
host: EdgeLake node IP/hostname
port: EdgeLake REST port (typically 32049)
timeout: Request timeout in seconds
max_workers: Maximum concurrent worker threads
"""
self.host = host
self.port = port
self.timeout = timeout
self.base_url = f"http://{host}:{port}"
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# Disable SSL warnings for self-signed certificates
requests.packages.urllib3.disable_warnings()
logger.info(f"EdgeLake client initialized: {self.base_url}")
def _execute_request(self, method: str, command: str, headers: Optional[Dict[str, str]] = None) -> Any:
"""
Execute HTTP request to EdgeLake (synchronous, runs in thread pool).
Args:
method: HTTP method (GET, POST, etc.)
command: EdgeLake command to execute
headers: Optional additional headers
Returns:
Response data (parsed JSON or text)
Raises:
Exception: On request failure
"""
# Build headers for EdgeLake REST API
request_headers = {
"User-Agent": "anylog",
"command": command,
}
if headers:
request_headers.update(headers)
# Log detailed request information
logger.info("=" * 80)
logger.info(f"HTTP REQUEST DETAILS:")
logger.info(f" Method: {method}")
logger.info(f" URL: {self.base_url}")
logger.info(f" Headers:")
for key, value in request_headers.items():
logger.info(f" {key}: {value}")
logger.info("=" * 80)
try:
if method.upper() == "GET":
response = requests.get(
url=self.base_url,
headers=request_headers,
timeout=self.timeout,
verify=False
)
elif method.upper() == "POST":
response = requests.post(
url=self.base_url,
headers=request_headers,
timeout=self.timeout,
verify=False
)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
# Check response status
if response.status_code != 200:
error_msg = f"EdgeLake returned status {response.status_code}"
if hasattr(response, 'text'):
error_msg += f": {response.text}"
raise Exception(error_msg)
# Parse response
try:
return response.json()
except ValueError:
# Not JSON, return text
return response.text
except HTTPError as e:
logger.error(f"HTTP error executing command '{command}': {e}")
raise Exception(f"HTTP error: {str(e)}")
except RequestException as e:
logger.error(f"Request error executing command '{command}': {e}")
raise Exception(f"Request error: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error executing command '{command}': {e}")
raise
async def _async_request(self, method: str, command: str, headers: Optional[Dict[str, str]] = None) -> Any:
"""
Execute HTTP request asynchronously using thread pool.
Args:
method: HTTP method
command: EdgeLake command
headers: Optional headers
Returns:
Response data
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self._execute_request,
method,
command,
headers
)
async def get_databases(self) -> List[str]:
"""
Get list of all user-accessible databases from blockchain.
Returns:
List of database names
Example EdgeLake command:
blockchain get table
"""
logger.info("Fetching databases from blockchain")
try:
result = await self._async_request("GET", "blockchain get table")
# Log the raw response for debugging
logger.info(f"blockchain get table response type: {type(result)}")
if isinstance(result, list) and len(result) > 0:
logger.info(f"First item sample: {result[0]}")
elif isinstance(result, dict):
logger.info(f"Dict keys: {result.keys()}")
# Parse result to extract unique database names
if isinstance(result, str):
databases = self._parse_databases_from_blockchain_table(result)
elif isinstance(result, dict):
databases = self._parse_databases_from_json(result)
elif isinstance(result, list):
databases = self._parse_databases_from_list(result)
else:
logger.warning(f"Unexpected response type: {type(result)}")
databases = []
logger.info(f"Found {len(databases)} databases")
return databases
except Exception as e:
logger.error(f"Failed to get databases: {e}")
raise
def _parse_databases_from_blockchain_table(self, text: str) -> List[str]:
"""
Parse unique database names from 'blockchain get table' response.
This command returns a table with columns including 'Database' and 'Table name'.
We extract unique database names from the response.
"""
databases = set()
lines = text.strip().split('\n')
for line in lines:
line = line.strip()
if not line or line.startswith('-') or line.startswith('|'):
continue
# Skip header lines
if 'database' in line.lower() and 'table' in line.lower():
continue
# Parse table format with |
if '|' in line:
parts = [p.strip() for p in line.split('|') if p.strip()]
if parts and len(parts) >= 1:
# First column is the database name
db_name = parts[0]
# Skip if it looks like a header
if db_name and not any(kw in db_name.lower() for kw in ['database', 'table', 'local', 'dbms']):
databases.add(db_name)
else:
# Simple format - first word is database
parts = line.split()
if parts:
databases.add(parts[0])
return sorted(list(databases))
def _parse_databases_from_text(self, text: str) -> List[str]:
"""Parse database names from text table response"""
databases = []
lines = text.strip().split('\n')
# Common header keywords to skip
skip_keywords = ['active', 'dbms', 'connections', 'logical', 'database', 'type', 'owner',
'configuration', 'storage', 'ip:port', 'name']
for line in lines:
line = line.strip()
if not line or line.startswith('-') or line.startswith('|'):
continue
# Extract database name
if '|' in line:
# Table format - extract first column
parts = [p.strip() for p in line.split('|') if p.strip()]
if parts:
db_name = parts[0]
else:
continue
else:
# Simple list format
db_name = line
# Skip if it looks like a header (contains multiple keywords or spaces suggesting it's a header)
db_lower = db_name.lower()
if any(keyword in db_lower for keyword in skip_keywords):
continue
# Skip if it's obviously a multi-word header line
if ' ' in db_name and len(db_name.split()) > 2:
continue
databases.append(db_name)
return databases
def _parse_databases_from_json(self, data: dict) -> List[str]:
"""Parse database names from JSON response"""
# Handle different possible JSON structures
if "databases" in data:
return data["databases"]
elif "Database" in data:
return data["Database"]
else:
logger.warning(f"Unexpected JSON structure: {data}")
return []
def _parse_databases_from_list(self, data: list) -> List[str]:
"""
Parse database names from list of JSON objects.
Expected format from blockchain get table:
[
{"table": {"name": "rand_data", "dbms": "new_company", ...}},
{"table": {"name": "ping_sensor", "dbms": "new_company", ...}},
...
]
"""
databases = set()
for item in data:
if isinstance(item, dict):
# Check if nested under "table" key
if "table" in item and isinstance(item["table"], dict):
db_name = item["table"].get("dbms")
if db_name:
databases.add(db_name)
else:
# Try top-level keys as fallback
db_name = item.get("dbms") or item.get("database") or item.get("db")
if db_name:
databases.add(db_name)
return sorted(list(databases))
async def get_tables(self, database: str) -> List[str]:
"""
Get list of tables in a database using blockchain get table.
Args:
database: Database name
Returns:
List of table names
Example EdgeLake command:
blockchain get table
"""
logger.info(f"Fetching tables for database '{database}'")
try:
# Use blockchain get table to get all tables, then filter for this database
result = await self._async_request("GET", "blockchain get table")
# Parse result
if isinstance(result, str):
tables = self._parse_tables_from_blockchain(result, database)
elif isinstance(result, dict):
tables = self._parse_tables_from_json(result)
elif isinstance(result, list):
tables = self._parse_tables_from_list(result, database)
else:
tables = []
logger.info(f"Found {len(tables)} tables in '{database}'")
return tables
except Exception as e:
logger.error(f"Failed to get tables for '{database}': {e}")
raise
def _parse_tables_from_blockchain(self, text: str, database: str) -> List[str]:
"""
Parse table names for a specific database from 'blockchain get table' response.
Args:
text: Response text from blockchain get table
database: Database name to filter for
Returns:
List of table names for the specified database
"""
tables = []
lines = text.strip().split('\n')
for line in lines:
line = line.strip()
if not line or line.startswith('-') or line.startswith('|'):
continue
# Skip header lines
if 'database' in line.lower() and 'table' in line.lower():
continue
# Parse table format with | (Database | Table name | ...)
if '|' in line:
parts = [p.strip() for p in line.split('|') if p.strip()]
if len(parts) >= 2:
db_name = parts[0]
table_name = parts[1]
# Only include tables for the requested database
if db_name == database:
tables.append(table_name)
else:
# Simple format - first word is database, second is table
parts = line.split()
if len(parts) >= 2 and parts[0] == database:
tables.append(parts[1])
return tables
def _parse_tables_from_text(self, text: str) -> List[str]:
"""Parse table names from text response"""
tables = []
lines = text.strip().split('\n')
for line in lines:
line = line.strip()
if line and not line.startswith('-') and not line.startswith('|') and line.lower() not in ['table name', 'table_name']:
if '|' in line:
parts = [p.strip() for p in line.split('|') if p.strip()]
if parts and parts[0].lower() not in ['table name', 'table_name']:
tables.append(parts[0])
else:
tables.append(line)
return tables
def _parse_tables_from_json(self, data: dict) -> List[str]:
"""Parse table names from JSON response"""
if "tables" in data:
return data["tables"]
elif "Table" in data:
return data["Table"]
else:
return []
def _parse_tables_from_list(self, data: list, database: str) -> List[str]:
"""
Parse table names for a specific database from list of JSON objects.
Args:
data: List of JSON objects from blockchain get table
database: Database name to filter for
Returns:
List of table names for the specified database
"""
tables = []
for item in data:
if isinstance(item, dict):
# Check if nested under "table" key
if "table" in item and isinstance(item["table"], dict):
db_name = item["table"].get("dbms")
table_name = item["table"].get("name")
# Only include tables for the requested database
if db_name == database and table_name:
tables.append(table_name)
else:
# Try top-level keys as fallback
db_name = item.get("dbms") or item.get("database") or item.get("db")
table_name = item.get("table") or item.get("table_name") or item.get("name")
# Only include tables for the requested database
if db_name == database and table_name:
tables.append(table_name)
return tables
async def get_table_schema(self, database: str, table: str) -> str:
"""
Get schema for a specific table.
Args:
database: Database name
table: Table name
Returns:
JSON string containing table schema
Example EdgeLake command:
get columns where dbms="new_company" and table="rand_data" and format=json
"""
logger.info(f"Fetching schema for '{database}.{table}'")
try:
command = f'get columns where dbms="{database}" and table="{table}" and format=json'
result = await self._async_request("GET", command)
# Format result as JSON string
if isinstance(result, dict):
return json.dumps(result, indent=2)
elif isinstance(result, str):
return result
else:
return json.dumps({"schema": result}, indent=2)
except Exception as e:
logger.error(f"Failed to get schema for '{database}.{table}': {e}")
raise
async def execute_query(self, database: str, query: str, output_format: str = "json") -> str:
"""
Execute SQL query against EdgeLake.
Args:
database: Database name
query: SQL query to execute
output_format: Output format ('json' or 'table')
Returns:
Query results as formatted string
Example EdgeLake command:
sql {database} format = {format} "{query}"
Note:
Uses 'destination: network' header to route query to Operator nodes
instead of executing locally on Query node.
"""
logger.info(f"Executing query on '{database}': {query}")
try:
# Build EdgeLake SQL command
command = f"sql {database} format = {output_format} \"{query}\""
# Add destination: network header to route to Operator nodes
headers = {"destination": "network"}
result = await self._async_request("GET", command, headers)
# Format result
if isinstance(result, dict):
return json.dumps(result, indent=2)
elif isinstance(result, str):
return result
else:
return json.dumps({"result": result}, indent=2)
except Exception as e:
logger.error(f"Query execution failed: {e}")
raise
async def get_node_status(self) -> str:
"""
Get EdgeLake node status.
Returns:
Node status information as JSON string
Example EdgeLake command:
get status
"""
logger.info("Fetching node status")
try:
result = await self._async_request("GET", "get status")
if isinstance(result, dict):
return json.dumps(result, indent=2)
elif isinstance(result, str):
return result
else:
return json.dumps({"status": result}, indent=2)
except Exception as e:
logger.error(f"Failed to get node status: {e}")
raise
def close(self):
"""Shutdown the thread pool executor"""
logger.info("Shutting down EdgeLake client")
self.executor.shutdown(wait=True)