Skip to main content
Glama
executor.py5.45 kB
"""Database query execution operations.""" from typing import Any from .credentials import Credentials from .connection import ConnectionManager from .dependencies import ensure_deps_once try: from sqlalchemy import Engine, MetaData, Table, select except ImportError: ensure_deps_once() from sqlalchemy import Engine, MetaData, Table, select class QueryExecutor: """Handles database query operations.""" def __init__(self, connection_manager: ConnectionManager): self.conn_manager = connection_manager def explore_table(self, creds: Credentials, table: str, limit: int = 5) -> list[dict[str, Any]]: """Get sample data from table.""" limit = min(limit, 100) engine = self.conn_manager.get_engine_with_credentials(creds) if not engine: return [{"error": "Could not create connection"}] try: table_obj = self._reflect_table(engine, table) query = select(table_obj).limit(limit) with engine.connect() as connection: result = connection.execute(query) columns = result.keys() return [dict(zip(columns, row)) for row in result] except Exception as e: return [{"error": f"Error exploring table: {str(e)}"}] def query_columns(self, creds: Credentials, table: str, columns: list[str], limit: int = 10) -> list[dict[str, Any]]: """Query specific columns from table.""" limit = min(limit, 100) if not columns: return [{"error": "Column list cannot be empty"}] engine = self.conn_manager.get_engine_with_credentials(creds) if not engine: return [{"error": "Could not create connection"}] try: table_obj = self._reflect_table(engine, table) # Validate columns table_columns = {col.name for col in table_obj.columns} invalid_columns = [col for col in columns if col not in table_columns] if invalid_columns: return [{"error": f"Invalid column(s): {', '.join(invalid_columns)}"}] # Build query column_objs = [table_obj.c[col] for col in columns] query = select(*column_objs).limit(limit) with engine.connect() as connection: result = connection.execute(query) return [dict(zip(columns, row)) for row in result] except Exception as e: return [{"error": f"Error querying table: {str(e)}"}] def execute_select_query(self, creds: Credentials, query: str, limit: int = 100) -> dict[str, Any]: """Execute a SELECT query safely. Args: creds: Database credentials query: SQL query to execute (must be SELECT only) limit: Maximum number of rows to return Returns: dict: Query results with rows and metadata, or error dictionary """ limit = min(limit, 1000) # Maximum safety limit # Validate query is safe (only SELECT) query_upper = query.strip().upper() # Remove comments import re query_clean = re.sub(r'--.*?$', '', query_upper, flags=re.MULTILINE) query_clean = re.sub(r'/\*.*?\*/', '', query_clean, flags=re.DOTALL) query_clean = query_clean.strip() # Check for dangerous keywords dangerous_keywords = [ 'ALTER', 'CREATE', 'DROP', 'DELETE', 'INSERT', 'UPDATE', 'TRUNCATE', 'EXEC', 'EXECUTE', 'GRANT', 'REVOKE', 'BACKUP', 'RESTORE' ] for keyword in dangerous_keywords: if re.search(rf'\b{keyword}\b', query_clean): return { "error": f"Query contains forbidden keyword: {keyword}. Only SELECT queries are allowed." } # Ensure query starts with SELECT if not query_clean.startswith('SELECT'): return { "error": "Query must start with SELECT. Only SELECT queries are allowed." } engine = self.conn_manager.get_engine_with_credentials(creds) if not engine: return {"error": "Could not create connection"} try: # Add LIMIT if not present (for safety) from sqlalchemy import text with engine.connect() as connection: result = connection.execute(text(query)) columns = list(result.keys()) rows = [dict(zip(columns, row)) for row in result.fetchmany(limit)] return { "columns": columns, "rows": rows, "row_count": len(rows), "query": query } except Exception as e: return {"error": f"Error executing query: {str(e)}"} def _reflect_table(self, engine: Engine, table: str) -> Table: """Reflect table structure from database.""" metadata = MetaData() schema = 'dbo' if engine.dialect.name == 'mssql' else None try: return Table(table, metadata, autoload_with=engine, schema=schema) except Exception: if schema: return Table(table, metadata, autoload_with=engine) raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/yuuues/mcp-sql'

If you have feedback or need assistance with the MCP directory API, please join our Discord server