stream_query
Execute SQL queries on Vertica databases and retrieve results in manageable batches, optimizing memory usage and performance for large datasets.
Instructions
Execute a SQL query and return the results in batches as a single string.
Args:
ctx: FastMCP context for progress reporting and logging
query: SQL query to execute
batch_size: Number of rows to fetch at once
Returns:
Query results as a concatenated string
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| batch_size | No | ||
| query | Yes |
Implementation Reference
- src/mcp_vertica/mcp.py:248-306 (handler)Implementation of the stream_query tool handler. This async function executes a SQL query on Vertica in batches, checks permissions, reports progress via ctx, and returns results as a string. Registered via @mcp.tool() decorator.@mcp.tool() async def stream_query( ctx: Context, query: str, batch_size: int = 1000 ) -> str: """Execute a SQL query and return the results in batches as a single string. Args: ctx: FastMCP context for progress reporting and logging query: SQL query to execute batch_size: Number of rows to fetch at once Returns: Query results as a concatenated string """ await ctx.info(f"Executing query with batching: {query}") # Get or create connection manager manager = await get_or_create_manager(ctx) if not manager: return "Error: Failed to initialize database connection. Check configuration." # Extract schema from query if not provided schema = extract_schema_from_query(query) # Check operation permissions operation = extract_operation_type(query) if operation and not manager.is_operation_allowed(schema or "default", operation): error_msg = f"Operation {operation.name} not allowed for schema {schema}" await ctx.error(error_msg) return error_msg conn = None cursor = None try: conn = manager.get_connection() # Always use default DB connection cursor = conn.cursor() cursor.execute(query) all_results = [] total_rows = 0 while True: batch = cursor.fetchmany(batch_size) if not batch: break total_rows += len(batch) await ctx.debug(f"Fetched {total_rows} rows") all_results.extend(batch) await ctx.info(f"Query completed, total rows: {total_rows}") return str(all_results) except Exception as e: error_msg = f"Error executing query: {str(e)}" await ctx.error(error_msg) return error_msg finally: if cursor: cursor.close() if conn: manager.release_connection(conn)