Skip to main content
Glama
by tom342178
server.py19.9 kB
#!/usr/bin/env python3 """ EdgeLake MCP Server This MCP server provides access to EdgeLake distributed database functionality, including resource discovery (databases/tables) and SQL query execution. Architecture: - Standalone server using stdio transport (JSON-RPC over stdin/stdout) - Multi-threaded query execution for parallel EdgeLake node requests - Stateless design - no session management required License: Mozilla Public License 2.0 """ import asyncio import sys import logging from typing import Any from mcp.server import Server from mcp.types import ( Resource, Tool, TextContent, ImageContent, EmbeddedResource, ) from pydantic import AnyUrl # Local imports from edgelake_mcp import __version__ from edgelake_mcp.client import EdgeLakeClient from edgelake_mcp.query import QueryBuilder from edgelake_mcp.config import Config # Configure logging import os import tempfile log_dir = os.path.expanduser('~/Library/Logs') os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, 'edgelake_mcp.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler(log_file), logging.StreamHandler(sys.stderr)] ) logger = logging.getLogger('edgelake-mcp-server') class EdgeLakeMCPServer: """EdgeLake MCP Server implementation""" def __init__(self, config: Config): self.config = config self.server = Server("edgelake-mcp-server") self.client = EdgeLakeClient( host=config.edgelake_host, port=config.edgelake_port, timeout=config.request_timeout ) self.query_builder = QueryBuilder() # Register handlers self._register_handlers() def _register_handlers(self): """Register all MCP protocol handlers""" @self.server.list_resources() async def list_resources() -> list[Resource]: """ List all available resources (databases and tables). Returns resources in the format: - database://{database_name}/{table_name} - Individual table - database://{database_name} - All tables in database """ logger.info("Listing resources") try: # Get all databases databases = await self.client.get_databases() logger.info(f"Found {len(databases)} databases: {databases}") resources = [] # For each database, get its tables and create resources for db_name in databases: # Add database-level resource resources.append(Resource( uri=AnyUrl(f"database://{db_name}"), name=f"Database: {db_name}", description=f"All tables in database '{db_name}'", mimeType="application/json" )) # Get tables for this database tables = await self.client.get_tables(db_name) logger.info(f"Database '{db_name}' has {len(tables)} tables") # Add table-level resources for table_name in tables: resources.append(Resource( uri=AnyUrl(f"database://{db_name}/{table_name}"), name=f"{db_name}.{table_name}", description=f"Table '{table_name}' in database '{db_name}'", mimeType="application/json" )) logger.info(f"Returning {len(resources)} resources") return resources except Exception as e: logger.error(f"Error listing resources: {e}", exc_info=True) # Return empty list on error rather than failing return [] @self.server.read_resource() async def read_resource(uri: AnyUrl) -> str: """ Read a resource (table schema). Args: uri: Resource URI in format database://{database}/{table} Returns: JSON string containing the table schema """ logger.info(f"Reading resource: {uri}") try: # Parse the URI uri_str = str(uri) if not uri_str.startswith("database://"): raise ValueError(f"Invalid URI scheme: {uri_str}") path = uri_str[11:] # Remove "database://" parts = path.split("/") if len(parts) == 1: # Database-level resource - return list of tables db_name = parts[0] tables = await self.client.get_tables(db_name) return f"Tables in database '{db_name}':\n" + "\n".join(f" - {t}" for t in tables) elif len(parts) == 2: # Table-level resource - return schema db_name, table_name = parts schema = await self.client.get_table_schema(db_name, table_name) return schema else: raise ValueError(f"Invalid resource URI format: {uri_str}") except Exception as e: logger.error(f"Error reading resource {uri}: {e}", exc_info=True) return f"Error reading resource: {str(e)}" @self.server.list_tools() async def list_tools() -> list[Tool]: """ List available tools. Returns: - query: Execute SQL queries against EdgeLake - node_status: Get EdgeLake node status """ logger.info("Listing tools") return [ Tool( name="query", description="Execute SQL query against EdgeLake database tables with advanced filtering, grouping, and ordering options", inputSchema={ "type": "object", "properties": { "database": { "type": "string", "description": "Database name to query" }, "table": { "type": "string", "description": "Table name to query" }, "select": { "type": "array", "items": {"type": "string"}, "description": "Columns to select (default: all columns with *)", "default": ["*"] }, "where": { "type": "string", "description": "WHERE clause conditions (e.g., 'is_active = true AND age > 18'). Supports AND/OR operators" }, "group_by": { "type": "array", "items": {"type": "string"}, "description": "Columns to group by (required when using aggregations with non-aggregated columns)" }, "order_by": { "type": "array", "items": { "type": "object", "properties": { "column": {"type": "string"}, "direction": { "type": "string", "enum": ["ASC", "DESC"], "default": "ASC" } }, "required": ["column"] }, "description": "Order results by columns with optional ASC/DESC direction" }, "include_tables": { "type": "array", "items": {"type": "string"}, "description": "Additional tables to include (JOIN). Use 'db_name.table_name' format for cross-database joins" }, "extend_fields": { "type": "array", "items": {"type": "string"}, "description": "Additional fields to add to query (e.g., '+ip', '+overlay_ip', '+hostname', '@table_name')" }, "limit": { "type": "integer", "description": "Maximum number of rows to return", "minimum": 1, "default": 100 }, "format": { "type": "string", "enum": ["json", "table"], "description": "Output format", "default": "json" } }, "required": ["database", "table"] } ), Tool( name="node_status", description="Get the status and health information of the EdgeLake node", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="server_info", description="Get EdgeLake MCP Server version and configuration information", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="list_databases", description="List all available databases in EdgeLake. Use this to discover what databases are available before querying.", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="list_tables", description="List all tables in a specific database. Use this to discover what tables are available in a database before querying.", inputSchema={ "type": "object", "properties": { "database": { "type": "string", "description": "Database name to list tables from" } }, "required": ["database"] } ), Tool( name="get_schema", description="Get the schema (column definitions) for a specific table. Use this to understand what columns are available before querying.", inputSchema={ "type": "object", "properties": { "database": { "type": "string", "description": "Database name" }, "table": { "type": "string", "description": "Table name" } }, "required": ["database", "table"] } ) ] @self.server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """ Execute a tool. Args: name: Tool name ('query' or 'node_status') arguments: Tool-specific arguments Returns: List of TextContent with results """ logger.info(f"Calling tool '{name}' with arguments: {arguments}") try: if name == "query": return await self._execute_query(arguments) elif name == "node_status": return await self._get_node_status(arguments) elif name == "server_info": return await self._get_server_info(arguments) elif name == "list_databases": return await self._list_databases(arguments) elif name == "list_tables": return await self._list_tables(arguments) elif name == "get_schema": return await self._get_schema(arguments) else: raise ValueError(f"Unknown tool: {name}") except Exception as e: logger.error(f"Error executing tool '{name}': {e}", exc_info=True) return [TextContent( type="text", text=f"Error executing {name}: {str(e)}" )] async def _execute_query(self, arguments: dict) -> list[TextContent]: """ Execute SQL query against EdgeLake. Args: arguments: Query parameters including database, table, filters, etc. Returns: List containing TextContent with query results """ try: # Build SQL query from arguments sql_query = self.query_builder.build_query(arguments) logger.info(f"Executing SQL: {sql_query}") # Execute query database = arguments.get("database") output_format = arguments.get("format", "json") result = await self.client.execute_query( database=database, query=sql_query, output_format=output_format ) return [TextContent( type="text", text=result )] except Exception as e: logger.error(f"Query execution failed: {e}", exc_info=True) raise async def _get_node_status(self, arguments: dict) -> list[TextContent]: """ Get EdgeLake node status. Returns: List containing TextContent with node status """ try: status = await self.client.get_node_status() return [TextContent( type="text", text=status )] except Exception as e: logger.error(f"Failed to get node status: {e}", exc_info=True) raise async def _get_server_info(self, arguments: dict) -> list[TextContent]: """ Get MCP server version and configuration information. Returns: List containing TextContent with server info """ import json info = { "version": __version__, "server_name": "edgelake-mcp-server", "configuration": { "edgelake_host": self.config.edgelake_host, "edgelake_port": self.config.edgelake_port, "request_timeout": self.config.request_timeout, "max_workers": self.config.max_workers, "log_level": self.config.log_level } } return [TextContent( type="text", text=json.dumps(info, indent=2) )] async def _list_databases(self, arguments: dict) -> list[TextContent]: """ List all available databases. Returns: List containing TextContent with database names """ import json try: databases = await self.client.get_databases() result = { "databases": databases, "count": len(databases) } return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: logger.error(f"Failed to list databases: {e}", exc_info=True) raise async def _list_tables(self, arguments: dict) -> list[TextContent]: """ List all tables in a specific database. Args: arguments: Must contain 'database' key Returns: List containing TextContent with table names """ import json try: database = arguments.get("database") if not database: raise ValueError("database parameter is required") tables = await self.client.get_tables(database) result = { "database": database, "tables": tables, "count": len(tables) } return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: logger.error(f"Failed to list tables for '{database}': {e}", exc_info=True) raise async def _get_schema(self, arguments: dict) -> list[TextContent]: """ Get the schema (column definitions) for a specific table. Args: arguments: Must contain 'database' and 'table' keys Returns: List containing TextContent with table schema """ try: database = arguments.get("database") table = arguments.get("table") if not database: raise ValueError("database parameter is required") if not table: raise ValueError("table parameter is required") schema = await self.client.get_table_schema(database, table) return [TextContent( type="text", text=schema )] except Exception as e: logger.error(f"Failed to get schema for '{database}.{table}': {e}", exc_info=True) raise async def run(self): """Run the MCP server using stdio transport""" logger.info(f"Starting EdgeLake MCP Server (host={self.config.edgelake_host}, port={self.config.edgelake_port})") from mcp.server.stdio import stdio_server async with stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, self.server.create_initialization_options() ) async def async_main(): """Async main entry point""" # Load configuration config = Config.from_env() # Create and run server server = EdgeLakeMCPServer(config) await server.run() def main(): """Synchronous entry point for console scripts""" # Handle --version flag if len(sys.argv) > 1 and sys.argv[1] in ['--version', '-v']: print(f"edgelake-mcp-server version {__version__}") sys.exit(0) try: asyncio.run(async_main()) except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/tom342178/edgelake-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server