"""Database query execution operations."""
from typing import Any
from .credentials import Credentials
from .connection import ConnectionManager
from .dependencies import ensure_deps_once
try:
from sqlalchemy import Engine, MetaData, Table, select
except ImportError:
ensure_deps_once()
from sqlalchemy import Engine, MetaData, Table, select
class QueryExecutor:
"""Handles database query operations."""
def __init__(self, connection_manager: ConnectionManager):
self.conn_manager = connection_manager
def explore_table(self, creds: Credentials, table: str, limit: int = 5) -> list[dict[str, Any]]:
"""Get sample data from table."""
limit = min(limit, 100)
engine = self.conn_manager.get_engine_with_credentials(creds)
if not engine:
return [{"error": "Could not create connection"}]
try:
table_obj = self._reflect_table(engine, table)
query = select(table_obj).limit(limit)
with engine.connect() as connection:
result = connection.execute(query)
columns = result.keys()
return [dict(zip(columns, row)) for row in result]
except Exception as e:
return [{"error": f"Error exploring table: {str(e)}"}]
def query_columns(self, creds: Credentials, table: str, columns: list[str], limit: int = 10) -> list[dict[str, Any]]:
"""Query specific columns from table."""
limit = min(limit, 100)
if not columns:
return [{"error": "Column list cannot be empty"}]
engine = self.conn_manager.get_engine_with_credentials(creds)
if not engine:
return [{"error": "Could not create connection"}]
try:
table_obj = self._reflect_table(engine, table)
# Validate columns
table_columns = {col.name for col in table_obj.columns}
invalid_columns = [col for col in columns if col not in table_columns]
if invalid_columns:
return [{"error": f"Invalid column(s): {', '.join(invalid_columns)}"}]
# Build query
column_objs = [table_obj.c[col] for col in columns]
query = select(*column_objs).limit(limit)
with engine.connect() as connection:
result = connection.execute(query)
return [dict(zip(columns, row)) for row in result]
except Exception as e:
return [{"error": f"Error querying table: {str(e)}"}]
def execute_select_query(self, creds: Credentials, query: str, limit: int = 100) -> dict[str, Any]:
"""Execute a SELECT query safely.
Args:
creds: Database credentials
query: SQL query to execute (must be SELECT only)
limit: Maximum number of rows to return
Returns:
dict: Query results with rows and metadata, or error dictionary
"""
limit = min(limit, 1000) # Maximum safety limit
# Validate query is safe (only SELECT)
query_upper = query.strip().upper()
# Remove comments
import re
query_clean = re.sub(r'--.*?$', '', query_upper, flags=re.MULTILINE)
query_clean = re.sub(r'/\*.*?\*/', '', query_clean, flags=re.DOTALL)
query_clean = query_clean.strip()
# Check for dangerous keywords
dangerous_keywords = [
'ALTER', 'CREATE', 'DROP', 'DELETE', 'INSERT',
'UPDATE', 'TRUNCATE', 'EXEC', 'EXECUTE', 'GRANT',
'REVOKE', 'BACKUP', 'RESTORE'
]
for keyword in dangerous_keywords:
if re.search(rf'\b{keyword}\b', query_clean):
return {
"error": f"Query contains forbidden keyword: {keyword}. Only SELECT queries are allowed."
}
# Ensure query starts with SELECT
if not query_clean.startswith('SELECT'):
return {
"error": "Query must start with SELECT. Only SELECT queries are allowed."
}
engine = self.conn_manager.get_engine_with_credentials(creds)
if not engine:
return {"error": "Could not create connection"}
try:
# Add LIMIT if not present (for safety)
from sqlalchemy import text
with engine.connect() as connection:
result = connection.execute(text(query))
columns = list(result.keys())
rows = [dict(zip(columns, row)) for row in result.fetchmany(limit)]
return {
"columns": columns,
"rows": rows,
"row_count": len(rows),
"query": query
}
except Exception as e:
return {"error": f"Error executing query: {str(e)}"}
def _reflect_table(self, engine: Engine, table: str) -> Table:
"""Reflect table structure from database."""
metadata = MetaData()
schema = 'dbo' if engine.dialect.name == 'mssql' else None
try:
return Table(table, metadata, autoload_with=engine, schema=schema)
except Exception:
if schema:
return Table(table, metadata, autoload_with=engine)
raise