Skip to main content
Glama

query_sql

Execute SQL queries on downloaded blockchain data files. Pass file paths from query_dataset results to analyze transaction or block data. Supports DuckDB for direct table references or read_parquet().

Instructions

Run a SQL query against downloaded blockchain data files

IMPORTANT WORKFLOW: This function should be used after calling query_dataset
to download data. Use the file paths returned by query_dataset as input to this function.

Workflow steps:
1. Download data: result = query_dataset('transactions', blocks='1000:1010', output_format='parquet')
2. Get file paths: files = result.get('files', [])
3. Execute SQL using either:
   - Direct table references: query_sql("SELECT * FROM transactions", files=files)
   - Or read_parquet(): query_sql("SELECT * FROM read_parquet('/path/to/file.parquet')", files=files)

To see the schema of a file, use get_sql_table_schema(file_path) before writing your query.

DuckDB supports both approaches:
1. Direct table references (simpler): "SELECT * FROM blocks"
2. read_parquet function (explicit): "SELECT * FROM read_parquet('/path/to/file.parquet')"

Args:
    query: SQL query to execute - can use simple table names or read_parquet()
    files: List of parquet file paths to query (typically from query_dataset results)
    include_schema: Whether to include schema information in the result
    
Returns:
    Query results and metadata

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
filesNo
include_schemaNo
queryYes

Implementation Reference

  • Registration and handler stub for the 'query_sql' MCP tool. Delegates execution to execute_sql_query in sql.py. Input schema defined by function parameters and docstring.
    @mcp.tool()
    def query_sql(
        query: str,
        files: Optional[List[str]] = None,
        include_schema: bool = True
    ) -> Dict[str, Any]:
        """
        Run a SQL query against downloaded blockchain data files
        
        IMPORTANT WORKFLOW: This function should be used after calling query_dataset
        to download data. Use the file paths returned by query_dataset as input to this function.
        
        Workflow steps:
        1. Download data: result = query_dataset('transactions', blocks='1000:1010', output_format='parquet')
        2. Get file paths: files = result.get('files', [])
        3. Execute SQL using either:
           - Direct table references: query_sql("SELECT * FROM transactions", files=files)
           - Or read_parquet(): query_sql("SELECT * FROM read_parquet('/path/to/file.parquet')", files=files)
        
        To see the schema of a file, use get_sql_table_schema(file_path) before writing your query.
        
        DuckDB supports both approaches:
        1. Direct table references (simpler): "SELECT * FROM blocks"
        2. read_parquet function (explicit): "SELECT * FROM read_parquet('/path/to/file.parquet')"
        
        Args:
            query: SQL query to execute - can use simple table names or read_parquet()
            files: List of parquet file paths to query (typically from query_dataset results)
            include_schema: Whether to include schema information in the result
            
        Returns:
            Query results and metadata
        """
        from cryo_mcp.sql import execute_sql_query
        return execute_sql_query(query, files, include_schema)
  • Core handler function that executes the SQL query using DuckDB. Handles file registration as views, query execution, schema extraction, and result formatting.
    def execute_sql_query(
        query: str,
        files: Optional[List[str]] = None,
        include_schema: bool = True
    ) -> Dict[str, Any]:
        """
        Execute a SQL query against specified parquet files.
        
        Args:
            query: SQL query to execute
            files: List of parquet file paths to query. If None, will use all files in the data directory.
            include_schema: Whether to include schema information in the result
            
        Returns:
            Dictionary with query results and metadata
        """
        data_dir = get_data_directory()
        conn = create_connection()
        
        try:
            # Determine which parquet files to use
            parquet_files = []
            if files:
                for file_path in files:
                    path = Path(file_path)
                    if path.exists() and path.suffix == '.parquet':
                        parquet_files.append(path)
                    else:
                        print(f"Warning: File not found or not a parquet file: {file_path}")
            else:
                # If no files provided, use all parquet files in the data directory
                parquet_files = list(data_dir.glob("**/*.parquet"))
            
            if not parquet_files:
                return {
                    "success": False,
                    "error": "No parquet files available. Download data first with query_dataset."
                }
            
            # Register temporary views for datasets if needed
            has_registered_views = False
            try:
                # Check if the query might be using direct table references without read_parquet()
                potential_tables = extract_tables_from_sql(query)
                
                # Create views for potential table names that aren't using read_parquet
                for table_name in potential_tables:
                    if not ("read_parquet" in query.lower() and table_name.lower() in query.lower()):
                        # Match files to table name more precisely
                        # First, look for exact dataset name match (e.g., "blocks" in ethereum__blocks_*.parquet)
                        dataset_pattern = f"__{table_name.lower()}__"
                        exact_matches = [f for f in parquet_files if dataset_pattern in str(f).lower()]
                        
                        # If no exact matches, try looser matching
                        if not exact_matches:
                            # Try matching at word boundaries to avoid partial matches
                            matching_files = []
                            for f in parquet_files:
                                file_lower = str(f).lower()
                                # Match dataset name patterns like ethereum__blocks_* or *_blocks_*
                                if f"__{table_name.lower()}__" in file_lower or f"_{table_name.lower()}_" in file_lower:
                                    matching_files.append(f)
                                # Also match if it's just the table name at the start of the filename
                                elif f"/{table_name.lower()}_" in file_lower or f"/{table_name.lower()}." in file_lower:
                                    matching_files.append(f)
                        else:
                            matching_files = exact_matches
                        
                        if matching_files:
                            # Create a combined view from all matching files
                            conn.execute(f"DROP VIEW IF EXISTS {table_name}")
                            
                            if len(matching_files) == 1:
                                # If only one file, create a simple view
                                conn.execute(f"CREATE VIEW {table_name} AS SELECT * FROM '{matching_files[0]}'")
                                print(f"Registered view '{table_name}' for file: {matching_files[0]}")
                            else:
                                # If multiple files, create a UNION ALL view to join all files
                                union_query = " UNION ALL ".join([f"SELECT * FROM '{file}'" for file in matching_files])
                                conn.execute(f"CREATE VIEW {table_name} AS {union_query}")
                                print(f"Registered view '{table_name}' for {len(matching_files)} files using UNION ALL")
                            
                            has_registered_views = True
                
                # Execute the query
                print(f"Executing SQL query: {query}")
                result = conn.execute(query).fetchdf()
                
                # Convert to records format for easier JSON serialization
                records = result.to_dict(orient="records")
                
                # Get schema information if requested
                schema_info = None
                if include_schema and not result.empty:
                    schema_info = {
                        "columns": list(result.columns),
                        "dtypes": {col: str(dtype) for col, dtype in result.dtypes.items()}
                    }
                
                # Track how the files were used
                file_usage = {}
                if has_registered_views:
                    for table_name in extract_tables_from_sql(query):
                        # Use the same matching logic as above
                        dataset_pattern = f"__{table_name.lower()}__"
                        exact_matches = [f for f in parquet_files if dataset_pattern in str(f).lower()]
                        
                        if not exact_matches:
                            matching_files = []
                            for f in parquet_files:
                                file_lower = str(f).lower()
                                if f"__{table_name.lower()}__" in file_lower or f"_{table_name.lower()}_" in file_lower:
                                    matching_files.append(f)
                                elif f"/{table_name.lower()}_" in file_lower or f"/{table_name.lower()}." in file_lower:
                                    matching_files.append(f)
                        else:
                            matching_files = exact_matches
                        if matching_files:
                            file_usage[table_name] = {
                                "files": [str(f) for f in matching_files],
                                "combined": len(matching_files) > 1
                            }
                
                return {
                    "success": True,
                    "result": records,
                    "row_count": len(records),
                    "schema": schema_info,
                    "files_used": [str(f) for f in parquet_files],
                    "used_direct_references": has_registered_views,
                    "table_mappings": file_usage if file_usage else None
                }
            except Exception as e:
                # Handle query-specific errors
                error_msg = str(e)
                print(f"SQL query error: {error_msg}")
                return {
                    "success": False,
                    "error": error_msg,
                    "files_available": [str(f) for f in parquet_files]
                }
        except Exception as e:
            # Handle connection and setup errors
            return {
                "success": False,
                "error": str(e)
            }
        finally:
            # Clean up any registered views
            if has_registered_views:
                for table_name in extract_tables_from_sql(query):
                    try:
                        conn.execute(f"DROP VIEW IF EXISTS {table_name}")
                    except:
                        pass
            conn.close()
  • Helper function to list available parquet tables/files for SQL querying, used by list_available_sql_tables tool.
    def list_available_tables() -> List[Dict[str, Any]]:
        """List all available tables from downloaded data files."""
        data_dir = get_data_directory()
        
        # Find all parquet files in the data directory (including the latest subdirectory)
        parquet_files = list(data_dir.glob("**/*.parquet"))
        
        tables = []
        for file_path in parquet_files:
            # Extract dataset name from filename
            name = file_path.stem.split("__")[0]
            if "__" in file_path.stem:
                name = file_path.stem.split("__")[0]
            else:
                # Try to extract from other naming patterns
                name_match = re.match(r'([a-z_]+)_', file_path.stem)
                if name_match:
                    name = name_match.group(1)
                else:
                    name = file_path.stem
            
            # Get file stats
            stats = file_path.stat()
            
            # Try to extract block range from filename
            block_range = ""
            blocks_match = re.search(r'blocks__(\d+)_to_(\d+)', str(file_path))
            if blocks_match:
                block_range = f"{blocks_match.group(1)}:{blocks_match.group(2)}"
            
            tables.append({
                "name": name,
                "path": str(file_path),
                "size_bytes": stats.st_size,
                "modified": stats.st_mtime,
                "block_range": block_range,
                "is_latest": "latest" in str(file_path)
            })
        
        return tables
  • Helper to create DuckDB connection used by SQL execution.
    def create_connection(read_only: bool = False) -> duckdb.DuckDBPyConnection:
        """Create a DuckDB connection with appropriate settings."""
        # In-memory database can't be read-only, so we always use read_only=False
        conn = duckdb.connect(database=":memory:", read_only=False)
        
        # Configure DuckDB settings for performance and safety
        conn.execute("SET memory_limit='4GB'")
        conn.execute("SET max_expression_depth=10000")
        
        # Note: query_timeout_ms setting might not be available in all DuckDB versions
        try:
            conn.execute(f"SET query_timeout_ms={DEFAULT_QUERY_TIMEOUT * 1000}")
        except Exception:
            pass  # Ignore if setting doesn't exist
        
        return conn
  • Helper function to get schema and sample data for a table/file, used by get_sql_table_schema tool.
    def get_table_schema(file_path: str) -> Dict[str, Any]:
        """
        Get schema information for a parquet file.
        
        Args:
            file_path: Path to the parquet file
            
        Returns:
            Dictionary with schema information
        """
        conn = create_connection()
        
        try:
            path = Path(file_path)
            if not path.exists() or path.suffix != '.parquet':
                return {
                    "success": False,
                    "error": f"File not found or not a parquet file: {file_path}"
                }
            
            # Register a temporary view for the file
            conn.execute(f"CREATE VIEW temp_view AS SELECT * FROM '{file_path}'")
            
            # Get schema info
            schema_result = conn.execute("SELECT column_name, data_type FROM information_schema.columns WHERE table_name='temp_view'").fetchdf()
            
            # Get sample data
            sample_data = conn.execute("SELECT * FROM temp_view LIMIT 5").fetchdf()
            
            # Get row count (might be expensive for large files)
            row_count = conn.execute("SELECT COUNT(*) as count FROM temp_view").fetchone()[0]
            
            return {
                "success": True,
                "file_path": file_path,
                "columns": schema_result.to_dict(orient="records"),
                "sample_data": sample_data.to_dict(orient="records"),
                "row_count": row_count
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
        finally:
            conn.close()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/z80dev/cryo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server