Skip to main content
Glama

stream_query

Execute SQL queries on Vertica databases and retrieve results in manageable batches for efficient data processing.

Instructions

Execute a SQL query and return the results in batches as a single string.

Args: ctx: FastMCP context for progress reporting and logging query: SQL query to execute batch_size: Number of rows to fetch at once Returns: Query results as a concatenated string

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes
batch_sizeNo

Implementation Reference

  • The handler function for the 'stream_query' tool. It executes a SQL query on Vertica in batches using fetchmany, checks permissions, and returns results as a string. Registered via @mcp.tool() decorator.
    @mcp.tool() async def stream_query( ctx: Context, query: str, batch_size: int = 1000 ) -> str: """Execute a SQL query and return the results in batches as a single string. Args: ctx: FastMCP context for progress reporting and logging query: SQL query to execute batch_size: Number of rows to fetch at once Returns: Query results as a concatenated string """ await ctx.info(f"Executing query with batching: {query}") # Get or create connection manager manager = await get_or_create_manager(ctx) if not manager: return "Error: Failed to initialize database connection. Check configuration." # Extract schema from query if not provided schema = extract_schema_from_query(query) # Check operation permissions operation = extract_operation_type(query) if operation and not manager.is_operation_allowed(schema or "default", operation): error_msg = f"Operation {operation.name} not allowed for schema {schema}" await ctx.error(error_msg) return error_msg conn = None cursor = None try: conn = manager.get_connection() # Always use default DB connection cursor = conn.cursor() cursor.execute(query) all_results = [] total_rows = 0 while True: batch = cursor.fetchmany(batch_size) if not batch: break total_rows += len(batch) await ctx.debug(f"Fetched {total_rows} rows") all_results.extend(batch) await ctx.info(f"Query completed, total rows: {total_rows}") return str(all_results) except Exception as e: error_msg = f"Error executing query: {str(e)}" await ctx.error(error_msg) return error_msg finally: if cursor: cursor.close() if conn: manager.release_connection(conn)
  • Docstring defining the input parameters and return type for the stream_query tool, used by FastMCP for schema generation.
    """Execute a SQL query and return the results in batches as a single string. Args: ctx: FastMCP context for progress reporting and logging query: SQL query to execute batch_size: Number of rows to fetch at once Returns: Query results as a concatenated string """
  • Helper function to get or lazily create the VerticaConnectionManager used by stream_query.
    async def get_or_create_manager(ctx: Context) -> VerticaConnectionManager | None: """Get connection manager from context or create it lazily. Args: ctx: FastMCP context Returns: VerticaConnectionManager instance or None if creation fails """ manager = ctx.request_context.lifespan_context.get("vertica_manager") if not manager: try: manager = VerticaConnectionManager() config = VerticaConfig.from_env() manager.initialize_default(config) await ctx.info("Connection manager initialized from request config") except Exception as e: await ctx.error(f"Failed to initialize database connection: {str(e)}") return None return manager
  • Helper to determine the SQL operation type (INSERT, UPDATE, etc.) for permission checks in stream_query.
    def extract_operation_type(query: str) -> OperationType | None: """Extract the operation type from a SQL query.""" query = query.strip().upper() if query.startswith("INSERT"): return OperationType.INSERT elif query.startswith("UPDATE"): return OperationType.UPDATE elif query.startswith("DELETE"): return OperationType.DELETE elif any(query.startswith(op) for op in ["CREATE", "ALTER", "DROP", "TRUNCATE"]): return OperationType.DDL return None
  • Helper to extract schema name from SQL query for permission checks in stream_query.
    def extract_schema_from_query(query: str) -> str | None: """Extract schema name from a SQL query.""" # database.table 또는 schema.table 패턴에서 schema 추출 match = re.search(r"([a-zA-Z0-9_]+)\.[a-zA-Z0-9_]+", query) if match: return match.group(1) return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nolleh/mcp-vertica'

If you have feedback or need assistance with the MCP directory API, please join our Discord server