stream_query
Execute SQL queries on Vertica databases and retrieve results in manageable batches for efficient data processing.
Instructions
Execute a SQL query and return the results in batches as a single string.
Args:
ctx: FastMCP context for progress reporting and logging
query: SQL query to execute
batch_size: Number of rows to fetch at once
Returns:
Query results as a concatenated string
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | Yes | ||
| batch_size | No |
Implementation Reference
- src/mcp_vertica/mcp.py:248-306 (handler)The handler function for the 'stream_query' tool. It executes a SQL query on Vertica in batches using fetchmany, checks permissions, and returns results as a string. Registered via @mcp.tool() decorator.@mcp.tool() async def stream_query( ctx: Context, query: str, batch_size: int = 1000 ) -> str: """Execute a SQL query and return the results in batches as a single string. Args: ctx: FastMCP context for progress reporting and logging query: SQL query to execute batch_size: Number of rows to fetch at once Returns: Query results as a concatenated string """ await ctx.info(f"Executing query with batching: {query}") # Get or create connection manager manager = await get_or_create_manager(ctx) if not manager: return "Error: Failed to initialize database connection. Check configuration." # Extract schema from query if not provided schema = extract_schema_from_query(query) # Check operation permissions operation = extract_operation_type(query) if operation and not manager.is_operation_allowed(schema or "default", operation): error_msg = f"Operation {operation.name} not allowed for schema {schema}" await ctx.error(error_msg) return error_msg conn = None cursor = None try: conn = manager.get_connection() # Always use default DB connection cursor = conn.cursor() cursor.execute(query) all_results = [] total_rows = 0 while True: batch = cursor.fetchmany(batch_size) if not batch: break total_rows += len(batch) await ctx.debug(f"Fetched {total_rows} rows") all_results.extend(batch) await ctx.info(f"Query completed, total rows: {total_rows}") return str(all_results) except Exception as e: error_msg = f"Error executing query: {str(e)}" await ctx.error(error_msg) return error_msg finally: if cursor: cursor.close() if conn: manager.release_connection(conn)
- src/mcp_vertica/mcp.py:252-261 (schema)Docstring defining the input parameters and return type for the stream_query tool, used by FastMCP for schema generation."""Execute a SQL query and return the results in batches as a single string. Args: ctx: FastMCP context for progress reporting and logging query: SQL query to execute batch_size: Number of rows to fetch at once Returns: Query results as a concatenated string """
- src/mcp_vertica/mcp.py:61-81 (helper)Helper function to get or lazily create the VerticaConnectionManager used by stream_query.async def get_or_create_manager(ctx: Context) -> VerticaConnectionManager | None: """Get connection manager from context or create it lazily. Args: ctx: FastMCP context Returns: VerticaConnectionManager instance or None if creation fails """ manager = ctx.request_context.lifespan_context.get("vertica_manager") if not manager: try: manager = VerticaConnectionManager() config = VerticaConfig.from_env() manager.initialize_default(config) await ctx.info("Connection manager initialized from request config") except Exception as e: await ctx.error(f"Failed to initialize database connection: {str(e)}") return None return manager
- src/mcp_vertica/mcp.py:83-96 (helper)Helper to determine the SQL operation type (INSERT, UPDATE, etc.) for permission checks in stream_query.def extract_operation_type(query: str) -> OperationType | None: """Extract the operation type from a SQL query.""" query = query.strip().upper() if query.startswith("INSERT"): return OperationType.INSERT elif query.startswith("UPDATE"): return OperationType.UPDATE elif query.startswith("DELETE"): return OperationType.DELETE elif any(query.startswith(op) for op in ["CREATE", "ALTER", "DROP", "TRUNCATE"]): return OperationType.DDL return None
- src/mcp_vertica/mcp.py:98-105 (helper)Helper to extract schema name from SQL query for permission checks in stream_query.def extract_schema_from_query(query: str) -> str | None: """Extract schema name from a SQL query.""" # database.table 또는 schema.table 패턴에서 schema 추출 match = re.search(r"([a-zA-Z0-9_]+)\.[a-zA-Z0-9_]+", query) if match: return match.group(1) return None