query.py•10.6 kB
"""
SQL Query Builder for EdgeLake
Constructs SQL queries from MCP tool arguments with support for:
- WHERE clauses with AND/OR operators
- GROUP BY aggregations
- ORDER BY with ASC/DESC
- JOINs with include_tables
- Extended fields (metadata fields)
- LIMIT clause
License: Mozilla Public License 2.0
"""
import logging
from typing import Dict, List, Any, Optional
logger = logging.getLogger('edgelake-mcp-server.query_builder')
class QueryBuilder:
"""Build SQL queries from structured arguments"""
def build_query(self, arguments: Dict[str, Any]) -> str:
"""
Build SQL query from MCP tool arguments.
Args:
arguments: Query parameters dict with keys:
- database: str (required)
- table: str (required)
- select: List[str] (optional, default ['*'])
- where: str (optional)
- group_by: List[str] (optional)
- order_by: List[Dict] (optional)
- include_tables: List[str] (optional)
- extend_fields: List[str] (optional)
- limit: int (optional, default 100)
Returns:
SQL query string
Example:
>>> builder = QueryBuilder()
>>> query = builder.build_query({
... "database": "mydb",
... "table": "users",
... "select": ["id", "name", "COUNT(*) as count"],
... "where": "is_active = true",
... "group_by": ["id", "name"],
... "order_by": [{"column": "count", "direction": "DESC"}],
... "limit": 10
... })
SELECT id, name, COUNT(*) as count FROM users WHERE is_active = true GROUP BY id, name ORDER BY count DESC LIMIT 10
"""
# Extract parameters
table = arguments["table"]
select_columns = arguments.get("select", ["*"])
where_clause = arguments.get("where")
group_by = arguments.get("group_by", [])
order_by = arguments.get("order_by", [])
include_tables = arguments.get("include_tables", [])
extend_fields = arguments.get("extend_fields", [])
limit = arguments.get("limit", 100)
# Build SELECT clause
select_parts = []
# Add extended fields first (EdgeLake metadata fields)
if extend_fields:
select_parts.extend(self._process_extend_fields(extend_fields))
# Add regular columns
select_parts.extend(select_columns)
select_clause = ", ".join(select_parts)
# Build FROM clause
from_clause = self._build_from_clause(table, include_tables)
# Build WHERE clause
where_sql = self._build_where_clause(where_clause)
# Build GROUP BY clause
group_by_sql = self._build_group_by_clause(group_by)
# Build ORDER BY clause
order_by_sql = self._build_order_by_clause(order_by)
# Build LIMIT clause
limit_sql = self._build_limit_clause(limit)
# Construct full query
query_parts = [f"SELECT {select_clause}", f"FROM {from_clause}"]
if where_sql:
query_parts.append(where_sql)
if group_by_sql:
query_parts.append(group_by_sql)
if order_by_sql:
query_parts.append(order_by_sql)
if limit_sql:
query_parts.append(limit_sql)
query = " ".join(query_parts)
logger.debug(f"Built query: {query}")
return query
def _process_extend_fields(self, extend_fields: List[str]) -> List[str]:
"""
Process extended fields (EdgeLake metadata fields).
Extended fields use special prefixes:
- '+' prefix: Metadata fields (e.g., +ip, +overlay_ip, +hostname)
- '@' prefix: Special fields (e.g., @table_name)
Args:
extend_fields: List of extended field names
Returns:
List of processed field expressions
Examples:
>>> _process_extend_fields(['+ip', '+hostname', '@table_name'])
['+ip', '+hostname', '@table_name']
"""
# EdgeLake supports these extended fields natively
# Just pass them through as-is
return extend_fields
def _build_from_clause(self, table: str, include_tables: Optional[List[str]] = None) -> str:
"""
Build FROM clause with optional JOINs.
Args:
table: Main table name
include_tables: Additional tables to include (JOIN)
Format: 'table_name' or 'db_name.table_name'
Returns:
FROM clause SQL
Examples:
>>> _build_from_clause("users", ["orders", "other_db.products"])
'users, orders, other_db.products'
Note:
EdgeLake uses comma-separated table syntax for joins.
For cross-database tables, use 'db_name.table_name' format.
"""
from_parts = [table]
if include_tables:
from_parts.extend(include_tables)
return ", ".join(from_parts)
def _build_where_clause(self, where: Optional[str]) -> str:
"""
Build WHERE clause.
Args:
where: WHERE conditions string (e.g., "is_active = true AND age > 18")
Supports AND/OR operators
Returns:
WHERE clause SQL or empty string
Examples:
>>> _build_where_clause("is_active = true")
'WHERE is_active = true'
>>> _build_where_clause("age > 18 AND status = 'active'")
"WHERE age > 18 AND status = 'active'"
>>> _build_where_clause(None)
''
"""
if not where:
return ""
# EdgeLake supports standard SQL WHERE syntax
return f"WHERE {where}"
def _build_group_by_clause(self, group_by: Optional[List[str]]) -> str:
"""
Build GROUP BY clause.
Args:
group_by: List of column names to group by
Returns:
GROUP BY clause SQL or empty string
Note:
GROUP BY is required when using aggregations (COUNT, AVG, SUM, etc.)
with non-aggregated columns.
Examples:
>>> _build_group_by_clause(["category", "status"])
'GROUP BY category, status'
>>> _build_group_by_clause([])
''
"""
if not group_by:
return ""
return f"GROUP BY {', '.join(group_by)}"
def _build_order_by_clause(self, order_by: Optional[List[Dict[str, str]]]) -> str:
"""
Build ORDER BY clause.
Args:
order_by: List of ordering specifications, each with:
- column: Column name to order by
- direction: 'ASC' or 'DESC' (optional, defaults to ASC)
Returns:
ORDER BY clause SQL or empty string
Examples:
>>> _build_order_by_clause([
... {"column": "created_at", "direction": "DESC"},
... {"column": "name"}
... ])
'ORDER BY created_at DESC, name ASC'
>>> _build_order_by_clause([])
''
"""
if not order_by:
return ""
order_parts = []
for order_spec in order_by:
column = order_spec.get("column")
direction = order_spec.get("direction", "ASC").upper()
if not column:
logger.warning(f"Skipping order_by spec without column: {order_spec}")
continue
# Validate direction
if direction not in ["ASC", "DESC"]:
logger.warning(f"Invalid direction '{direction}', using ASC")
direction = "ASC"
order_parts.append(f"{column} {direction}")
if not order_parts:
return ""
return f"ORDER BY {', '.join(order_parts)}"
def _build_limit_clause(self, limit: Optional[int]) -> str:
"""
Build LIMIT clause.
Args:
limit: Maximum number of rows to return
Returns:
LIMIT clause SQL or empty string
Examples:
>>> _build_limit_clause(100)
'LIMIT 100'
>>> _build_limit_clause(None)
''
"""
if limit is None or limit <= 0:
return ""
return f"LIMIT {limit}"
def validate_query(self, arguments: Dict[str, Any]) -> tuple[bool, Optional[str]]:
"""
Validate query arguments before building.
Args:
arguments: Query arguments to validate
Returns:
Tuple of (is_valid, error_message)
is_valid: True if valid, False otherwise
error_message: Error description if invalid, None if valid
Examples:
>>> builder = QueryBuilder()
>>> builder.validate_query({"database": "mydb", "table": "users"})
(True, None)
>>> builder.validate_query({"database": "mydb"})
(False, "Missing required field: table")
"""
# Check required fields
if "database" not in arguments:
return False, "Missing required field: database"
if "table" not in arguments:
return False, "Missing required field: table"
# Validate limit if provided
limit = arguments.get("limit")
if limit is not None:
if not isinstance(limit, int):
return False, f"limit must be an integer, got {type(limit).__name__}"
if limit <= 0:
return False, f"limit must be positive, got {limit}"
# Validate order_by structure if provided
order_by = arguments.get("order_by", [])
if order_by:
if not isinstance(order_by, list):
return False, f"order_by must be a list, got {type(order_by).__name__}"
for idx, order_spec in enumerate(order_by):
if not isinstance(order_spec, dict):
return False, f"order_by[{idx}] must be a dict, got {type(order_spec).__name__}"
if "column" not in order_spec:
return False, f"order_by[{idx}] missing required field: column"
direction = order_spec.get("direction", "ASC").upper()
if direction not in ["ASC", "DESC"]:
return False, f"order_by[{idx}] invalid direction: {direction} (must be ASC or DESC)"
return True, None