Skip to main content
Glama
nolleh
by nolleh

stream_query

Execute SQL queries on Vertica databases and retrieve results in manageable batches for efficient data processing.

Instructions

Execute a SQL query and return the results in batches as a single string.

Args:
    ctx: FastMCP context for progress reporting and logging
    query: SQL query to execute
    batch_size: Number of rows to fetch at once

Returns:
    Query results as a concatenated string

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes
batch_sizeNo

Implementation Reference

  • The handler function for the 'stream_query' tool. It executes a SQL query on Vertica in batches using fetchmany, checks permissions, and returns results as a string. Registered via @mcp.tool() decorator.
    @mcp.tool()
    async def stream_query(
        ctx: Context, query: str, batch_size: int = 1000
    ) -> str:
        """Execute a SQL query and return the results in batches as a single string.
    
        Args:
            ctx: FastMCP context for progress reporting and logging
            query: SQL query to execute
            batch_size: Number of rows to fetch at once
    
        Returns:
            Query results as a concatenated string
        """
        await ctx.info(f"Executing query with batching: {query}")
    
        # Get or create connection manager
        manager = await get_or_create_manager(ctx)
        if not manager:
            return "Error: Failed to initialize database connection. Check configuration."
    
        # Extract schema from query if not provided
        schema = extract_schema_from_query(query)
        # Check operation permissions
        operation = extract_operation_type(query)
        if operation and not manager.is_operation_allowed(schema or "default", operation):
            error_msg = f"Operation {operation.name} not allowed for schema {schema}"
            await ctx.error(error_msg)
            return error_msg
    
        conn = None
        cursor = None
        try:
            conn = manager.get_connection()  # Always use default DB connection
            cursor = conn.cursor()
            cursor.execute(query)
    
            all_results = []
            total_rows = 0
            while True:
                batch = cursor.fetchmany(batch_size)
                if not batch:
                    break
                total_rows += len(batch)
                await ctx.debug(f"Fetched {total_rows} rows")
                all_results.extend(batch)
    
            await ctx.info(f"Query completed, total rows: {total_rows}")
            return str(all_results)
        except Exception as e:
            error_msg = f"Error executing query: {str(e)}"
            await ctx.error(error_msg)
            return error_msg
        finally:
            if cursor:
                cursor.close()
            if conn:
                manager.release_connection(conn)
  • Docstring defining the input parameters and return type for the stream_query tool, used by FastMCP for schema generation.
    """Execute a SQL query and return the results in batches as a single string.
    
    Args:
        ctx: FastMCP context for progress reporting and logging
        query: SQL query to execute
        batch_size: Number of rows to fetch at once
    
    Returns:
        Query results as a concatenated string
    """
  • Helper function to get or lazily create the VerticaConnectionManager used by stream_query.
    async def get_or_create_manager(ctx: Context) -> VerticaConnectionManager | None:
        """Get connection manager from context or create it lazily.
    
        Args:
            ctx: FastMCP context
    
        Returns:
            VerticaConnectionManager instance or None if creation fails
        """
        manager = ctx.request_context.lifespan_context.get("vertica_manager")
        if not manager:
            try:
                manager = VerticaConnectionManager()
                config = VerticaConfig.from_env()
                manager.initialize_default(config)
                await ctx.info("Connection manager initialized from request config")
            except Exception as e:
                await ctx.error(f"Failed to initialize database connection: {str(e)}")
                return None
        return manager
  • Helper to determine the SQL operation type (INSERT, UPDATE, etc.) for permission checks in stream_query.
    def extract_operation_type(query: str) -> OperationType | None:
        """Extract the operation type from a SQL query."""
        query = query.strip().upper()
    
        if query.startswith("INSERT"):
            return OperationType.INSERT
        elif query.startswith("UPDATE"):
            return OperationType.UPDATE
        elif query.startswith("DELETE"):
            return OperationType.DELETE
        elif any(query.startswith(op) for op in ["CREATE", "ALTER", "DROP", "TRUNCATE"]):
            return OperationType.DDL
        return None
  • Helper to extract schema name from SQL query for permission checks in stream_query.
    def extract_schema_from_query(query: str) -> str | None:
        """Extract schema name from a SQL query."""
        # database.table 또는 schema.table 패턴에서 schema 추출
        match = re.search(r"([a-zA-Z0-9_]+)\.[a-zA-Z0-9_]+", query)
        if match:
            return match.group(1)
        return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nolleh/mcp-vertica'

If you have feedback or need assistance with the MCP directory API, please join our Discord server