Skip to main content
Glama
nolleh
by nolleh

stream_query

Execute SQL queries on Vertica databases and retrieve results in manageable batches for efficient data processing.

Instructions

Execute a SQL query and return the results in batches as a single string.

Args:
    ctx: FastMCP context for progress reporting and logging
    query: SQL query to execute
    batch_size: Number of rows to fetch at once

Returns:
    Query results as a concatenated string

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryYes
batch_sizeNo

Implementation Reference

  • The handler function for the 'stream_query' tool. It executes a SQL query on Vertica in batches using fetchmany, checks permissions, and returns results as a string. Registered via @mcp.tool() decorator.
    @mcp.tool()
    async def stream_query(
        ctx: Context, query: str, batch_size: int = 1000
    ) -> str:
        """Execute a SQL query and return the results in batches as a single string.
    
        Args:
            ctx: FastMCP context for progress reporting and logging
            query: SQL query to execute
            batch_size: Number of rows to fetch at once
    
        Returns:
            Query results as a concatenated string
        """
        await ctx.info(f"Executing query with batching: {query}")
    
        # Get or create connection manager
        manager = await get_or_create_manager(ctx)
        if not manager:
            return "Error: Failed to initialize database connection. Check configuration."
    
        # Extract schema from query if not provided
        schema = extract_schema_from_query(query)
        # Check operation permissions
        operation = extract_operation_type(query)
        if operation and not manager.is_operation_allowed(schema or "default", operation):
            error_msg = f"Operation {operation.name} not allowed for schema {schema}"
            await ctx.error(error_msg)
            return error_msg
    
        conn = None
        cursor = None
        try:
            conn = manager.get_connection()  # Always use default DB connection
            cursor = conn.cursor()
            cursor.execute(query)
    
            all_results = []
            total_rows = 0
            while True:
                batch = cursor.fetchmany(batch_size)
                if not batch:
                    break
                total_rows += len(batch)
                await ctx.debug(f"Fetched {total_rows} rows")
                all_results.extend(batch)
    
            await ctx.info(f"Query completed, total rows: {total_rows}")
            return str(all_results)
        except Exception as e:
            error_msg = f"Error executing query: {str(e)}"
            await ctx.error(error_msg)
            return error_msg
        finally:
            if cursor:
                cursor.close()
            if conn:
                manager.release_connection(conn)
  • Docstring defining the input parameters and return type for the stream_query tool, used by FastMCP for schema generation.
    """Execute a SQL query and return the results in batches as a single string.
    
    Args:
        ctx: FastMCP context for progress reporting and logging
        query: SQL query to execute
        batch_size: Number of rows to fetch at once
    
    Returns:
        Query results as a concatenated string
    """
  • Helper function to get or lazily create the VerticaConnectionManager used by stream_query.
    async def get_or_create_manager(ctx: Context) -> VerticaConnectionManager | None:
        """Get connection manager from context or create it lazily.
    
        Args:
            ctx: FastMCP context
    
        Returns:
            VerticaConnectionManager instance or None if creation fails
        """
        manager = ctx.request_context.lifespan_context.get("vertica_manager")
        if not manager:
            try:
                manager = VerticaConnectionManager()
                config = VerticaConfig.from_env()
                manager.initialize_default(config)
                await ctx.info("Connection manager initialized from request config")
            except Exception as e:
                await ctx.error(f"Failed to initialize database connection: {str(e)}")
                return None
        return manager
  • Helper to determine the SQL operation type (INSERT, UPDATE, etc.) for permission checks in stream_query.
    def extract_operation_type(query: str) -> OperationType | None:
        """Extract the operation type from a SQL query."""
        query = query.strip().upper()
    
        if query.startswith("INSERT"):
            return OperationType.INSERT
        elif query.startswith("UPDATE"):
            return OperationType.UPDATE
        elif query.startswith("DELETE"):
            return OperationType.DELETE
        elif any(query.startswith(op) for op in ["CREATE", "ALTER", "DROP", "TRUNCATE"]):
            return OperationType.DDL
        return None
  • Helper to extract schema name from SQL query for permission checks in stream_query.
    def extract_schema_from_query(query: str) -> str | None:
        """Extract schema name from a SQL query."""
        # database.table 또는 schema.table 패턴에서 schema 추출
        match = re.search(r"([a-zA-Z0-9_]+)\.[a-zA-Z0-9_]+", query)
        if match:
            return match.group(1)
        return None

Tool Definition Quality

Score is being calculated. Check back soon.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nolleh/mcp-vertica'

If you have feedback or need assistance with the MCP directory API, please join our Discord server