Skip to main content
Glama
by tom342178
client.py18.7 kB
""" EdgeLake HTTP Client Multi-threaded client for communicating with EdgeLake nodes via REST API. Handles database discovery, table schema retrieval, and SQL query execution. License: Mozilla Public License 2.0 """ import asyncio import json import logging from concurrent.futures import ThreadPoolExecutor from typing import List, Dict, Any, Optional import requests from requests.exceptions import HTTPError, RequestException logger = logging.getLogger('edgelake-mcp-server.client') class EdgeLakeClient: """ Async client for EdgeLake REST API with multi-threaded request execution. """ def __init__(self, host: str, port: int, timeout: int = 20, max_workers: int = 10): """ Initialize EdgeLake client. Args: host: EdgeLake node IP/hostname port: EdgeLake REST port (typically 32049) timeout: Request timeout in seconds max_workers: Maximum concurrent worker threads """ self.host = host self.port = port self.timeout = timeout self.base_url = f"http://{host}:{port}" self.executor = ThreadPoolExecutor(max_workers=max_workers) # Disable SSL warnings for self-signed certificates requests.packages.urllib3.disable_warnings() logger.info(f"EdgeLake client initialized: {self.base_url}") def _execute_request(self, method: str, command: str, headers: Optional[Dict[str, str]] = None) -> Any: """ Execute HTTP request to EdgeLake (synchronous, runs in thread pool). Args: method: HTTP method (GET, POST, etc.) command: EdgeLake command to execute headers: Optional additional headers Returns: Response data (parsed JSON or text) Raises: Exception: On request failure """ # Build headers for EdgeLake REST API request_headers = { "User-Agent": "anylog", "command": command, } if headers: request_headers.update(headers) # Log detailed request information logger.info("=" * 80) logger.info(f"HTTP REQUEST DETAILS:") logger.info(f" Method: {method}") logger.info(f" URL: {self.base_url}") logger.info(f" Headers:") for key, value in request_headers.items(): logger.info(f" {key}: {value}") logger.info("=" * 80) try: if method.upper() == "GET": response = requests.get( url=self.base_url, headers=request_headers, timeout=self.timeout, verify=False ) elif method.upper() == "POST": response = requests.post( url=self.base_url, headers=request_headers, timeout=self.timeout, verify=False ) else: raise ValueError(f"Unsupported HTTP method: {method}") # Check response status if response.status_code != 200: error_msg = f"EdgeLake returned status {response.status_code}" if hasattr(response, 'text'): error_msg += f": {response.text}" raise Exception(error_msg) # Parse response try: return response.json() except ValueError: # Not JSON, return text return response.text except HTTPError as e: logger.error(f"HTTP error executing command '{command}': {e}") raise Exception(f"HTTP error: {str(e)}") except RequestException as e: logger.error(f"Request error executing command '{command}': {e}") raise Exception(f"Request error: {str(e)}") except Exception as e: logger.error(f"Unexpected error executing command '{command}': {e}") raise async def _async_request(self, method: str, command: str, headers: Optional[Dict[str, str]] = None) -> Any: """ Execute HTTP request asynchronously using thread pool. Args: method: HTTP method command: EdgeLake command headers: Optional headers Returns: Response data """ loop = asyncio.get_event_loop() return await loop.run_in_executor( self.executor, self._execute_request, method, command, headers ) async def get_databases(self) -> List[str]: """ Get list of all user-accessible databases from blockchain. Returns: List of database names Example EdgeLake command: blockchain get table """ logger.info("Fetching databases from blockchain") try: result = await self._async_request("GET", "blockchain get table") # Log the raw response for debugging logger.info(f"blockchain get table response type: {type(result)}") if isinstance(result, list) and len(result) > 0: logger.info(f"First item sample: {result[0]}") elif isinstance(result, dict): logger.info(f"Dict keys: {result.keys()}") # Parse result to extract unique database names if isinstance(result, str): databases = self._parse_databases_from_blockchain_table(result) elif isinstance(result, dict): databases = self._parse_databases_from_json(result) elif isinstance(result, list): databases = self._parse_databases_from_list(result) else: logger.warning(f"Unexpected response type: {type(result)}") databases = [] logger.info(f"Found {len(databases)} databases") return databases except Exception as e: logger.error(f"Failed to get databases: {e}") raise def _parse_databases_from_blockchain_table(self, text: str) -> List[str]: """ Parse unique database names from 'blockchain get table' response. This command returns a table with columns including 'Database' and 'Table name'. We extract unique database names from the response. """ databases = set() lines = text.strip().split('\n') for line in lines: line = line.strip() if not line or line.startswith('-') or line.startswith('|'): continue # Skip header lines if 'database' in line.lower() and 'table' in line.lower(): continue # Parse table format with | if '|' in line: parts = [p.strip() for p in line.split('|') if p.strip()] if parts and len(parts) >= 1: # First column is the database name db_name = parts[0] # Skip if it looks like a header if db_name and not any(kw in db_name.lower() for kw in ['database', 'table', 'local', 'dbms']): databases.add(db_name) else: # Simple format - first word is database parts = line.split() if parts: databases.add(parts[0]) return sorted(list(databases)) def _parse_databases_from_text(self, text: str) -> List[str]: """Parse database names from text table response""" databases = [] lines = text.strip().split('\n') # Common header keywords to skip skip_keywords = ['active', 'dbms', 'connections', 'logical', 'database', 'type', 'owner', 'configuration', 'storage', 'ip:port', 'name'] for line in lines: line = line.strip() if not line or line.startswith('-') or line.startswith('|'): continue # Extract database name if '|' in line: # Table format - extract first column parts = [p.strip() for p in line.split('|') if p.strip()] if parts: db_name = parts[0] else: continue else: # Simple list format db_name = line # Skip if it looks like a header (contains multiple keywords or spaces suggesting it's a header) db_lower = db_name.lower() if any(keyword in db_lower for keyword in skip_keywords): continue # Skip if it's obviously a multi-word header line if ' ' in db_name and len(db_name.split()) > 2: continue databases.append(db_name) return databases def _parse_databases_from_json(self, data: dict) -> List[str]: """Parse database names from JSON response""" # Handle different possible JSON structures if "databases" in data: return data["databases"] elif "Database" in data: return data["Database"] else: logger.warning(f"Unexpected JSON structure: {data}") return [] def _parse_databases_from_list(self, data: list) -> List[str]: """ Parse database names from list of JSON objects. Expected format from blockchain get table: [ {"table": {"name": "rand_data", "dbms": "new_company", ...}}, {"table": {"name": "ping_sensor", "dbms": "new_company", ...}}, ... ] """ databases = set() for item in data: if isinstance(item, dict): # Check if nested under "table" key if "table" in item and isinstance(item["table"], dict): db_name = item["table"].get("dbms") if db_name: databases.add(db_name) else: # Try top-level keys as fallback db_name = item.get("dbms") or item.get("database") or item.get("db") if db_name: databases.add(db_name) return sorted(list(databases)) async def get_tables(self, database: str) -> List[str]: """ Get list of tables in a database using blockchain get table. Args: database: Database name Returns: List of table names Example EdgeLake command: blockchain get table """ logger.info(f"Fetching tables for database '{database}'") try: # Use blockchain get table to get all tables, then filter for this database result = await self._async_request("GET", "blockchain get table") # Parse result if isinstance(result, str): tables = self._parse_tables_from_blockchain(result, database) elif isinstance(result, dict): tables = self._parse_tables_from_json(result) elif isinstance(result, list): tables = self._parse_tables_from_list(result, database) else: tables = [] logger.info(f"Found {len(tables)} tables in '{database}'") return tables except Exception as e: logger.error(f"Failed to get tables for '{database}': {e}") raise def _parse_tables_from_blockchain(self, text: str, database: str) -> List[str]: """ Parse table names for a specific database from 'blockchain get table' response. Args: text: Response text from blockchain get table database: Database name to filter for Returns: List of table names for the specified database """ tables = [] lines = text.strip().split('\n') for line in lines: line = line.strip() if not line or line.startswith('-') or line.startswith('|'): continue # Skip header lines if 'database' in line.lower() and 'table' in line.lower(): continue # Parse table format with | (Database | Table name | ...) if '|' in line: parts = [p.strip() for p in line.split('|') if p.strip()] if len(parts) >= 2: db_name = parts[0] table_name = parts[1] # Only include tables for the requested database if db_name == database: tables.append(table_name) else: # Simple format - first word is database, second is table parts = line.split() if len(parts) >= 2 and parts[0] == database: tables.append(parts[1]) return tables def _parse_tables_from_text(self, text: str) -> List[str]: """Parse table names from text response""" tables = [] lines = text.strip().split('\n') for line in lines: line = line.strip() if line and not line.startswith('-') and not line.startswith('|') and line.lower() not in ['table name', 'table_name']: if '|' in line: parts = [p.strip() for p in line.split('|') if p.strip()] if parts and parts[0].lower() not in ['table name', 'table_name']: tables.append(parts[0]) else: tables.append(line) return tables def _parse_tables_from_json(self, data: dict) -> List[str]: """Parse table names from JSON response""" if "tables" in data: return data["tables"] elif "Table" in data: return data["Table"] else: return [] def _parse_tables_from_list(self, data: list, database: str) -> List[str]: """ Parse table names for a specific database from list of JSON objects. Args: data: List of JSON objects from blockchain get table database: Database name to filter for Returns: List of table names for the specified database """ tables = [] for item in data: if isinstance(item, dict): # Check if nested under "table" key if "table" in item and isinstance(item["table"], dict): db_name = item["table"].get("dbms") table_name = item["table"].get("name") # Only include tables for the requested database if db_name == database and table_name: tables.append(table_name) else: # Try top-level keys as fallback db_name = item.get("dbms") or item.get("database") or item.get("db") table_name = item.get("table") or item.get("table_name") or item.get("name") # Only include tables for the requested database if db_name == database and table_name: tables.append(table_name) return tables async def get_table_schema(self, database: str, table: str) -> str: """ Get schema for a specific table. Args: database: Database name table: Table name Returns: JSON string containing table schema Example EdgeLake command: get columns where dbms="new_company" and table="rand_data" and format=json """ logger.info(f"Fetching schema for '{database}.{table}'") try: command = f'get columns where dbms="{database}" and table="{table}" and format=json' result = await self._async_request("GET", command) # Format result as JSON string if isinstance(result, dict): return json.dumps(result, indent=2) elif isinstance(result, str): return result else: return json.dumps({"schema": result}, indent=2) except Exception as e: logger.error(f"Failed to get schema for '{database}.{table}': {e}") raise async def execute_query(self, database: str, query: str, output_format: str = "json") -> str: """ Execute SQL query against EdgeLake. Args: database: Database name query: SQL query to execute output_format: Output format ('json' or 'table') Returns: Query results as formatted string Example EdgeLake command: sql {database} format = {format} "{query}" Note: Uses 'destination: network' header to route query to Operator nodes instead of executing locally on Query node. """ logger.info(f"Executing query on '{database}': {query}") try: # Build EdgeLake SQL command command = f"sql {database} format = {output_format} \"{query}\"" # Add destination: network header to route to Operator nodes headers = {"destination": "network"} result = await self._async_request("GET", command, headers) # Format result if isinstance(result, dict): return json.dumps(result, indent=2) elif isinstance(result, str): return result else: return json.dumps({"result": result}, indent=2) except Exception as e: logger.error(f"Query execution failed: {e}") raise async def get_node_status(self) -> str: """ Get EdgeLake node status. Returns: Node status information as JSON string Example EdgeLake command: get status """ logger.info("Fetching node status") try: result = await self._async_request("GET", "get status") if isinstance(result, dict): return json.dumps(result, indent=2) elif isinstance(result, str): return result else: return json.dumps({"status": result}, indent=2) except Exception as e: logger.error(f"Failed to get node status: {e}") raise def close(self): """Shutdown the thread pool executor""" logger.info("Shutting down EdgeLake client") self.executor.shutdown(wait=True)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tom342178/edgelake-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server