Skip to main content
Glama

query_sql

Execute SQL queries on downloaded blockchain data files. Pass file paths from query_dataset results to analyze transaction or block data. Supports DuckDB for direct table references or read_parquet().

Instructions

Run a SQL query against downloaded blockchain data files

IMPORTANT WORKFLOW: This function should be used after calling query_dataset
to download data. Use the file paths returned by query_dataset as input to this function.

Workflow steps:
1. Download data: result = query_dataset('transactions', blocks='1000:1010', output_format='parquet')
2. Get file paths: files = result.get('files', [])
3. Execute SQL using either:
   - Direct table references: query_sql("SELECT * FROM transactions", files=files)
   - Or read_parquet(): query_sql("SELECT * FROM read_parquet('/path/to/file.parquet')", files=files)

To see the schema of a file, use get_sql_table_schema(file_path) before writing your query.

DuckDB supports both approaches:
1. Direct table references (simpler): "SELECT * FROM blocks"
2. read_parquet function (explicit): "SELECT * FROM read_parquet('/path/to/file.parquet')"

Args:
    query: SQL query to execute - can use simple table names or read_parquet()
    files: List of parquet file paths to query (typically from query_dataset results)
    include_schema: Whether to include schema information in the result
    
Returns:
    Query results and metadata

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
filesNo
include_schemaNo
queryYes

Implementation Reference

  • Registration and handler stub for the 'query_sql' MCP tool. Delegates execution to execute_sql_query in sql.py. Input schema defined by function parameters and docstring.
    @mcp.tool()
    def query_sql(
        query: str,
        files: Optional[List[str]] = None,
        include_schema: bool = True
    ) -> Dict[str, Any]:
        """
        Run a SQL query against downloaded blockchain data files
        
        IMPORTANT WORKFLOW: This function should be used after calling query_dataset
        to download data. Use the file paths returned by query_dataset as input to this function.
        
        Workflow steps:
        1. Download data: result = query_dataset('transactions', blocks='1000:1010', output_format='parquet')
        2. Get file paths: files = result.get('files', [])
        3. Execute SQL using either:
           - Direct table references: query_sql("SELECT * FROM transactions", files=files)
           - Or read_parquet(): query_sql("SELECT * FROM read_parquet('/path/to/file.parquet')", files=files)
        
        To see the schema of a file, use get_sql_table_schema(file_path) before writing your query.
        
        DuckDB supports both approaches:
        1. Direct table references (simpler): "SELECT * FROM blocks"
        2. read_parquet function (explicit): "SELECT * FROM read_parquet('/path/to/file.parquet')"
        
        Args:
            query: SQL query to execute - can use simple table names or read_parquet()
            files: List of parquet file paths to query (typically from query_dataset results)
            include_schema: Whether to include schema information in the result
            
        Returns:
            Query results and metadata
        """
        from cryo_mcp.sql import execute_sql_query
        return execute_sql_query(query, files, include_schema)
  • Core handler function that executes the SQL query using DuckDB. Handles file registration as views, query execution, schema extraction, and result formatting.
    def execute_sql_query(
        query: str,
        files: Optional[List[str]] = None,
        include_schema: bool = True
    ) -> Dict[str, Any]:
        """
        Execute a SQL query against specified parquet files.
        
        Args:
            query: SQL query to execute
            files: List of parquet file paths to query. If None, will use all files in the data directory.
            include_schema: Whether to include schema information in the result
            
        Returns:
            Dictionary with query results and metadata
        """
        data_dir = get_data_directory()
        conn = create_connection()
        
        try:
            # Determine which parquet files to use
            parquet_files = []
            if files:
                for file_path in files:
                    path = Path(file_path)
                    if path.exists() and path.suffix == '.parquet':
                        parquet_files.append(path)
                    else:
                        print(f"Warning: File not found or not a parquet file: {file_path}")
            else:
                # If no files provided, use all parquet files in the data directory
                parquet_files = list(data_dir.glob("**/*.parquet"))
            
            if not parquet_files:
                return {
                    "success": False,
                    "error": "No parquet files available. Download data first with query_dataset."
                }
            
            # Register temporary views for datasets if needed
            has_registered_views = False
            try:
                # Check if the query might be using direct table references without read_parquet()
                potential_tables = extract_tables_from_sql(query)
                
                # Create views for potential table names that aren't using read_parquet
                for table_name in potential_tables:
                    if not ("read_parquet" in query.lower() and table_name.lower() in query.lower()):
                        # Match files to table name more precisely
                        # First, look for exact dataset name match (e.g., "blocks" in ethereum__blocks_*.parquet)
                        dataset_pattern = f"__{table_name.lower()}__"
                        exact_matches = [f for f in parquet_files if dataset_pattern in str(f).lower()]
                        
                        # If no exact matches, try looser matching
                        if not exact_matches:
                            # Try matching at word boundaries to avoid partial matches
                            matching_files = []
                            for f in parquet_files:
                                file_lower = str(f).lower()
                                # Match dataset name patterns like ethereum__blocks_* or *_blocks_*
                                if f"__{table_name.lower()}__" in file_lower or f"_{table_name.lower()}_" in file_lower:
                                    matching_files.append(f)
                                # Also match if it's just the table name at the start of the filename
                                elif f"/{table_name.lower()}_" in file_lower or f"/{table_name.lower()}." in file_lower:
                                    matching_files.append(f)
                        else:
                            matching_files = exact_matches
                        
                        if matching_files:
                            # Create a combined view from all matching files
                            conn.execute(f"DROP VIEW IF EXISTS {table_name}")
                            
                            if len(matching_files) == 1:
                                # If only one file, create a simple view
                                conn.execute(f"CREATE VIEW {table_name} AS SELECT * FROM '{matching_files[0]}'")
                                print(f"Registered view '{table_name}' for file: {matching_files[0]}")
                            else:
                                # If multiple files, create a UNION ALL view to join all files
                                union_query = " UNION ALL ".join([f"SELECT * FROM '{file}'" for file in matching_files])
                                conn.execute(f"CREATE VIEW {table_name} AS {union_query}")
                                print(f"Registered view '{table_name}' for {len(matching_files)} files using UNION ALL")
                            
                            has_registered_views = True
                
                # Execute the query
                print(f"Executing SQL query: {query}")
                result = conn.execute(query).fetchdf()
                
                # Convert to records format for easier JSON serialization
                records = result.to_dict(orient="records")
                
                # Get schema information if requested
                schema_info = None
                if include_schema and not result.empty:
                    schema_info = {
                        "columns": list(result.columns),
                        "dtypes": {col: str(dtype) for col, dtype in result.dtypes.items()}
                    }
                
                # Track how the files were used
                file_usage = {}
                if has_registered_views:
                    for table_name in extract_tables_from_sql(query):
                        # Use the same matching logic as above
                        dataset_pattern = f"__{table_name.lower()}__"
                        exact_matches = [f for f in parquet_files if dataset_pattern in str(f).lower()]
                        
                        if not exact_matches:
                            matching_files = []
                            for f in parquet_files:
                                file_lower = str(f).lower()
                                if f"__{table_name.lower()}__" in file_lower or f"_{table_name.lower()}_" in file_lower:
                                    matching_files.append(f)
                                elif f"/{table_name.lower()}_" in file_lower or f"/{table_name.lower()}." in file_lower:
                                    matching_files.append(f)
                        else:
                            matching_files = exact_matches
                        if matching_files:
                            file_usage[table_name] = {
                                "files": [str(f) for f in matching_files],
                                "combined": len(matching_files) > 1
                            }
                
                return {
                    "success": True,
                    "result": records,
                    "row_count": len(records),
                    "schema": schema_info,
                    "files_used": [str(f) for f in parquet_files],
                    "used_direct_references": has_registered_views,
                    "table_mappings": file_usage if file_usage else None
                }
            except Exception as e:
                # Handle query-specific errors
                error_msg = str(e)
                print(f"SQL query error: {error_msg}")
                return {
                    "success": False,
                    "error": error_msg,
                    "files_available": [str(f) for f in parquet_files]
                }
        except Exception as e:
            # Handle connection and setup errors
            return {
                "success": False,
                "error": str(e)
            }
        finally:
            # Clean up any registered views
            if has_registered_views:
                for table_name in extract_tables_from_sql(query):
                    try:
                        conn.execute(f"DROP VIEW IF EXISTS {table_name}")
                    except:
                        pass
            conn.close()
  • Helper function to list available parquet tables/files for SQL querying, used by list_available_sql_tables tool.
    def list_available_tables() -> List[Dict[str, Any]]:
        """List all available tables from downloaded data files."""
        data_dir = get_data_directory()
        
        # Find all parquet files in the data directory (including the latest subdirectory)
        parquet_files = list(data_dir.glob("**/*.parquet"))
        
        tables = []
        for file_path in parquet_files:
            # Extract dataset name from filename
            name = file_path.stem.split("__")[0]
            if "__" in file_path.stem:
                name = file_path.stem.split("__")[0]
            else:
                # Try to extract from other naming patterns
                name_match = re.match(r'([a-z_]+)_', file_path.stem)
                if name_match:
                    name = name_match.group(1)
                else:
                    name = file_path.stem
            
            # Get file stats
            stats = file_path.stat()
            
            # Try to extract block range from filename
            block_range = ""
            blocks_match = re.search(r'blocks__(\d+)_to_(\d+)', str(file_path))
            if blocks_match:
                block_range = f"{blocks_match.group(1)}:{blocks_match.group(2)}"
            
            tables.append({
                "name": name,
                "path": str(file_path),
                "size_bytes": stats.st_size,
                "modified": stats.st_mtime,
                "block_range": block_range,
                "is_latest": "latest" in str(file_path)
            })
        
        return tables
  • Helper to create DuckDB connection used by SQL execution.
    def create_connection(read_only: bool = False) -> duckdb.DuckDBPyConnection:
        """Create a DuckDB connection with appropriate settings."""
        # In-memory database can't be read-only, so we always use read_only=False
        conn = duckdb.connect(database=":memory:", read_only=False)
        
        # Configure DuckDB settings for performance and safety
        conn.execute("SET memory_limit='4GB'")
        conn.execute("SET max_expression_depth=10000")
        
        # Note: query_timeout_ms setting might not be available in all DuckDB versions
        try:
            conn.execute(f"SET query_timeout_ms={DEFAULT_QUERY_TIMEOUT * 1000}")
        except Exception:
            pass  # Ignore if setting doesn't exist
        
        return conn
  • Helper function to get schema and sample data for a table/file, used by get_sql_table_schema tool.
    def get_table_schema(file_path: str) -> Dict[str, Any]:
        """
        Get schema information for a parquet file.
        
        Args:
            file_path: Path to the parquet file
            
        Returns:
            Dictionary with schema information
        """
        conn = create_connection()
        
        try:
            path = Path(file_path)
            if not path.exists() or path.suffix != '.parquet':
                return {
                    "success": False,
                    "error": f"File not found or not a parquet file: {file_path}"
                }
            
            # Register a temporary view for the file
            conn.execute(f"CREATE VIEW temp_view AS SELECT * FROM '{file_path}'")
            
            # Get schema info
            schema_result = conn.execute("SELECT column_name, data_type FROM information_schema.columns WHERE table_name='temp_view'").fetchdf()
            
            # Get sample data
            sample_data = conn.execute("SELECT * FROM temp_view LIMIT 5").fetchdf()
            
            # Get row count (might be expensive for large files)
            row_count = conn.execute("SELECT COUNT(*) as count FROM temp_view").fetchone()[0]
            
            return {
                "success": True,
                "file_path": file_path,
                "columns": schema_result.to_dict(orient="records"),
                "sample_data": sample_data.to_dict(orient="records"),
                "row_count": row_count
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
        finally:
            conn.close()
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure. It effectively explains the tool's behavior: it executes SQL queries against parquet files, supports two query approaches (direct table references or read_parquet()), and mentions DuckDB as the underlying engine. It also notes that results include query results and metadata. The main gap is lack of information about error handling, performance characteristics, or limitations.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is well-structured with clear sections (purpose, workflow, examples, parameter explanations, return value). While comprehensive, it could be more concise - some information is repeated (e.g., both workflow steps and DuckDB approaches mention the two query methods). Every sentence earns its place, but some tightening is possible.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (SQL execution with file dependencies), no annotations, and no output schema, the description provides substantial context. It explains the workflow, parameter usage, and return values. The main gap is lack of output format details - while it mentions 'Query results and metadata', it doesn't specify the structure. For a SQL execution tool, more detail on result format would be helpful.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters5/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

With 0% schema description coverage, the description fully compensates by explaining all three parameters. It clarifies that 'query' is the SQL to execute with syntax examples, 'files' are typically from query_dataset results, and 'include_schema' controls whether schema information is included in results. The description adds substantial value beyond the bare schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose as 'Run a SQL query against downloaded blockchain data files', specifying both the action (run SQL query) and the target resource (downloaded blockchain data files). It distinguishes this from sibling tools like query_dataset (which downloads data) and get_sql_table_schema (which shows schema).

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines5/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides explicit workflow guidance: 'This function should be used after calling query_dataset to download data.' It names the specific prerequisite tool (query_dataset) and explains the sequence of operations. The 'IMPORTANT WORKFLOW' section clearly defines when to use this tool versus alternatives.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/z80dev/cryo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server