server.py•19.9 kB
#!/usr/bin/env python3
"""
EdgeLake MCP Server
This MCP server provides access to EdgeLake distributed database functionality,
including resource discovery (databases/tables) and SQL query execution.
Architecture:
- Standalone server using stdio transport (JSON-RPC over stdin/stdout)
- Multi-threaded query execution for parallel EdgeLake node requests
- Stateless design - no session management required
License: Mozilla Public License 2.0
"""
import asyncio
import sys
import logging
from typing import Any
from mcp.server import Server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
from pydantic import AnyUrl
# Local imports
from edgelake_mcp import __version__
from edgelake_mcp.client import EdgeLakeClient
from edgelake_mcp.query import QueryBuilder
from edgelake_mcp.config import Config
# Configure logging
import os
import tempfile
log_dir = os.path.expanduser('~/Library/Logs')
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, 'edgelake_mcp.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(log_file), logging.StreamHandler(sys.stderr)]
)
logger = logging.getLogger('edgelake-mcp-server')
class EdgeLakeMCPServer:
"""EdgeLake MCP Server implementation"""
def __init__(self, config: Config):
self.config = config
self.server = Server("edgelake-mcp-server")
self.client = EdgeLakeClient(
host=config.edgelake_host,
port=config.edgelake_port,
timeout=config.request_timeout
)
self.query_builder = QueryBuilder()
# Register handlers
self._register_handlers()
def _register_handlers(self):
"""Register all MCP protocol handlers"""
@self.server.list_resources()
async def list_resources() -> list[Resource]:
"""
List all available resources (databases and tables).
Returns resources in the format:
- database://{database_name}/{table_name} - Individual table
- database://{database_name} - All tables in database
"""
logger.info("Listing resources")
try:
# Get all databases
databases = await self.client.get_databases()
logger.info(f"Found {len(databases)} databases: {databases}")
resources = []
# For each database, get its tables and create resources
for db_name in databases:
# Add database-level resource
resources.append(Resource(
uri=AnyUrl(f"database://{db_name}"),
name=f"Database: {db_name}",
description=f"All tables in database '{db_name}'",
mimeType="application/json"
))
# Get tables for this database
tables = await self.client.get_tables(db_name)
logger.info(f"Database '{db_name}' has {len(tables)} tables")
# Add table-level resources
for table_name in tables:
resources.append(Resource(
uri=AnyUrl(f"database://{db_name}/{table_name}"),
name=f"{db_name}.{table_name}",
description=f"Table '{table_name}' in database '{db_name}'",
mimeType="application/json"
))
logger.info(f"Returning {len(resources)} resources")
return resources
except Exception as e:
logger.error(f"Error listing resources: {e}", exc_info=True)
# Return empty list on error rather than failing
return []
@self.server.read_resource()
async def read_resource(uri: AnyUrl) -> str:
"""
Read a resource (table schema).
Args:
uri: Resource URI in format database://{database}/{table}
Returns:
JSON string containing the table schema
"""
logger.info(f"Reading resource: {uri}")
try:
# Parse the URI
uri_str = str(uri)
if not uri_str.startswith("database://"):
raise ValueError(f"Invalid URI scheme: {uri_str}")
path = uri_str[11:] # Remove "database://"
parts = path.split("/")
if len(parts) == 1:
# Database-level resource - return list of tables
db_name = parts[0]
tables = await self.client.get_tables(db_name)
return f"Tables in database '{db_name}':\n" + "\n".join(f" - {t}" for t in tables)
elif len(parts) == 2:
# Table-level resource - return schema
db_name, table_name = parts
schema = await self.client.get_table_schema(db_name, table_name)
return schema
else:
raise ValueError(f"Invalid resource URI format: {uri_str}")
except Exception as e:
logger.error(f"Error reading resource {uri}: {e}", exc_info=True)
return f"Error reading resource: {str(e)}"
@self.server.list_tools()
async def list_tools() -> list[Tool]:
"""
List available tools.
Returns:
- query: Execute SQL queries against EdgeLake
- node_status: Get EdgeLake node status
"""
logger.info("Listing tools")
return [
Tool(
name="query",
description="Execute SQL query against EdgeLake database tables with advanced filtering, grouping, and ordering options",
inputSchema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Database name to query"
},
"table": {
"type": "string",
"description": "Table name to query"
},
"select": {
"type": "array",
"items": {"type": "string"},
"description": "Columns to select (default: all columns with *)",
"default": ["*"]
},
"where": {
"type": "string",
"description": "WHERE clause conditions (e.g., 'is_active = true AND age > 18'). Supports AND/OR operators"
},
"group_by": {
"type": "array",
"items": {"type": "string"},
"description": "Columns to group by (required when using aggregations with non-aggregated columns)"
},
"order_by": {
"type": "array",
"items": {
"type": "object",
"properties": {
"column": {"type": "string"},
"direction": {
"type": "string",
"enum": ["ASC", "DESC"],
"default": "ASC"
}
},
"required": ["column"]
},
"description": "Order results by columns with optional ASC/DESC direction"
},
"include_tables": {
"type": "array",
"items": {"type": "string"},
"description": "Additional tables to include (JOIN). Use 'db_name.table_name' format for cross-database joins"
},
"extend_fields": {
"type": "array",
"items": {"type": "string"},
"description": "Additional fields to add to query (e.g., '+ip', '+overlay_ip', '+hostname', '@table_name')"
},
"limit": {
"type": "integer",
"description": "Maximum number of rows to return",
"minimum": 1,
"default": 100
},
"format": {
"type": "string",
"enum": ["json", "table"],
"description": "Output format",
"default": "json"
}
},
"required": ["database", "table"]
}
),
Tool(
name="node_status",
description="Get the status and health information of the EdgeLake node",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="server_info",
description="Get EdgeLake MCP Server version and configuration information",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="list_databases",
description="List all available databases in EdgeLake. Use this to discover what databases are available before querying.",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="list_tables",
description="List all tables in a specific database. Use this to discover what tables are available in a database before querying.",
inputSchema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Database name to list tables from"
}
},
"required": ["database"]
}
),
Tool(
name="get_schema",
description="Get the schema (column definitions) for a specific table. Use this to understand what columns are available before querying.",
inputSchema={
"type": "object",
"properties": {
"database": {
"type": "string",
"description": "Database name"
},
"table": {
"type": "string",
"description": "Table name"
}
},
"required": ["database", "table"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""
Execute a tool.
Args:
name: Tool name ('query' or 'node_status')
arguments: Tool-specific arguments
Returns:
List of TextContent with results
"""
logger.info(f"Calling tool '{name}' with arguments: {arguments}")
try:
if name == "query":
return await self._execute_query(arguments)
elif name == "node_status":
return await self._get_node_status(arguments)
elif name == "server_info":
return await self._get_server_info(arguments)
elif name == "list_databases":
return await self._list_databases(arguments)
elif name == "list_tables":
return await self._list_tables(arguments)
elif name == "get_schema":
return await self._get_schema(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
logger.error(f"Error executing tool '{name}': {e}", exc_info=True)
return [TextContent(
type="text",
text=f"Error executing {name}: {str(e)}"
)]
async def _execute_query(self, arguments: dict) -> list[TextContent]:
"""
Execute SQL query against EdgeLake.
Args:
arguments: Query parameters including database, table, filters, etc.
Returns:
List containing TextContent with query results
"""
try:
# Build SQL query from arguments
sql_query = self.query_builder.build_query(arguments)
logger.info(f"Executing SQL: {sql_query}")
# Execute query
database = arguments.get("database")
output_format = arguments.get("format", "json")
result = await self.client.execute_query(
database=database,
query=sql_query,
output_format=output_format
)
return [TextContent(
type="text",
text=result
)]
except Exception as e:
logger.error(f"Query execution failed: {e}", exc_info=True)
raise
async def _get_node_status(self, arguments: dict) -> list[TextContent]:
"""
Get EdgeLake node status.
Returns:
List containing TextContent with node status
"""
try:
status = await self.client.get_node_status()
return [TextContent(
type="text",
text=status
)]
except Exception as e:
logger.error(f"Failed to get node status: {e}", exc_info=True)
raise
async def _get_server_info(self, arguments: dict) -> list[TextContent]:
"""
Get MCP server version and configuration information.
Returns:
List containing TextContent with server info
"""
import json
info = {
"version": __version__,
"server_name": "edgelake-mcp-server",
"configuration": {
"edgelake_host": self.config.edgelake_host,
"edgelake_port": self.config.edgelake_port,
"request_timeout": self.config.request_timeout,
"max_workers": self.config.max_workers,
"log_level": self.config.log_level
}
}
return [TextContent(
type="text",
text=json.dumps(info, indent=2)
)]
async def _list_databases(self, arguments: dict) -> list[TextContent]:
"""
List all available databases.
Returns:
List containing TextContent with database names
"""
import json
try:
databases = await self.client.get_databases()
result = {
"databases": databases,
"count": len(databases)
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
logger.error(f"Failed to list databases: {e}", exc_info=True)
raise
async def _list_tables(self, arguments: dict) -> list[TextContent]:
"""
List all tables in a specific database.
Args:
arguments: Must contain 'database' key
Returns:
List containing TextContent with table names
"""
import json
try:
database = arguments.get("database")
if not database:
raise ValueError("database parameter is required")
tables = await self.client.get_tables(database)
result = {
"database": database,
"tables": tables,
"count": len(tables)
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
except Exception as e:
logger.error(f"Failed to list tables for '{database}': {e}", exc_info=True)
raise
async def _get_schema(self, arguments: dict) -> list[TextContent]:
"""
Get the schema (column definitions) for a specific table.
Args:
arguments: Must contain 'database' and 'table' keys
Returns:
List containing TextContent with table schema
"""
try:
database = arguments.get("database")
table = arguments.get("table")
if not database:
raise ValueError("database parameter is required")
if not table:
raise ValueError("table parameter is required")
schema = await self.client.get_table_schema(database, table)
return [TextContent(
type="text",
text=schema
)]
except Exception as e:
logger.error(f"Failed to get schema for '{database}.{table}': {e}", exc_info=True)
raise
async def run(self):
"""Run the MCP server using stdio transport"""
logger.info(f"Starting EdgeLake MCP Server (host={self.config.edgelake_host}, port={self.config.edgelake_port})")
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
async def async_main():
"""Async main entry point"""
# Load configuration
config = Config.from_env()
# Create and run server
server = EdgeLakeMCPServer(config)
await server.run()
def main():
"""Synchronous entry point for console scripts"""
# Handle --version flag
if len(sys.argv) > 1 and sys.argv[1] in ['--version', '-v']:
print(f"edgelake-mcp-server version {__version__}")
sys.exit(0)
try:
asyncio.run(async_main())
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error(f"Server error: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()