Skip to main content
Glama

query_dataset

Download blockchain datasets like transactions or logs using specified block ranges and contract addresses. Returns file paths for use in SQL queries or further processing on the Cryo MCP Server.

Instructions

Download blockchain data and return the file paths where the data is stored.

IMPORTANT WORKFLOW NOTE: When running SQL queries, use this function first to download
data, then use the returned file paths with query_sql() to execute SQL on those files.

Example workflow for SQL:
1. First download data: result = query_dataset('transactions', blocks='1000:1010', output_format='parquet')
2. Get file paths: files = result.get('files', [])
3. Run SQL query: query_sql("SELECT * FROM read_parquet('/path/to/file.parquet')", files=files)

DATASET-SPECIFIC PARAMETERS:
For datasets that require specific address parameters (like 'balances', 'erc20_transfers', etc.),
ALWAYS use the 'contract' parameter to pass ANY Ethereum address. For example:

- For 'balances' dataset: Use contract parameter for the address you want balances for
  query_dataset('balances', blocks='1000:1010', contract='0x123...')

- For 'logs' or 'erc20_transfers': Use contract parameter for contract address
  query_dataset('logs', blocks='1000:1010', contract='0x123...')

To check what parameters a dataset requires, always use lookup_dataset() first:
lookup_dataset('balances')  # Will show required parameters

Args:
    dataset: The name of the dataset to query (e.g., 'logs', 'transactions', 'balances')
    blocks: Block range specification as a string (e.g., '1000:1010')
    start_block: Start block number as integer (alternative to blocks)
    end_block: End block number as integer (alternative to blocks)
    use_latest: If True, query the latest block
    blocks_from_latest: Number of blocks before the latest to include (e.g., 10 = latest-10 to latest)
    contract: Contract address to filter by - IMPORTANT: Use this parameter for ALL address-based filtering
      regardless of the parameter name in the native cryo command (address, contract, etc.)
    output_format: Output format (json, csv, parquet) - use 'parquet' for SQL queries
    include_columns: Columns to include alongside the defaults
    exclude_columns: Columns to exclude from the defaults

Returns:
    Dictionary containing file paths where the downloaded data is stored

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
blocksNo
blocks_from_latestNo
contractNo
datasetYes
end_blockNo
exclude_columnsNo
include_columnsNo
output_formatNojson
start_blockNo
use_latestNo

Implementation Reference

  • The query_dataset tool handler function, registered via @mcp.tool() decorator. Executes cryo CLI to download blockchain dataset data for specified parameters and returns file paths to the generated files.
    @mcp.tool()
    def query_dataset(
        dataset: str,
        blocks: Optional[str] = None,
        start_block: Optional[int] = None,
        end_block: Optional[int] = None,
        use_latest: bool = False,
        blocks_from_latest: Optional[int] = None,
        contract: Optional[str] = None,
        output_format: str = "json",
        include_columns: Optional[List[str]] = None,
        exclude_columns: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """
        Download blockchain data and return the file paths where the data is stored.
        
        IMPORTANT WORKFLOW NOTE: When running SQL queries, use this function first to download
        data, then use the returned file paths with query_sql() to execute SQL on those files.
        
        Example workflow for SQL:
        1. First download data: result = query_dataset('transactions', blocks='1000:1010', output_format='parquet')
        2. Get file paths: files = result.get('files', [])
        3. Run SQL query: query_sql("SELECT * FROM read_parquet('/path/to/file.parquet')", files=files)
    
        DATASET-SPECIFIC PARAMETERS:
        For datasets that require specific address parameters (like 'balances', 'erc20_transfers', etc.),
        ALWAYS use the 'contract' parameter to pass ANY Ethereum address. For example:
        
        - For 'balances' dataset: Use contract parameter for the address you want balances for
          query_dataset('balances', blocks='1000:1010', contract='0x123...')
        
        - For 'logs' or 'erc20_transfers': Use contract parameter for contract address
          query_dataset('logs', blocks='1000:1010', contract='0x123...')
        
        To check what parameters a dataset requires, always use lookup_dataset() first:
        lookup_dataset('balances')  # Will show required parameters
    
        Args:
            dataset: The name of the dataset to query (e.g., 'logs', 'transactions', 'balances')
            blocks: Block range specification as a string (e.g., '1000:1010')
            start_block: Start block number as integer (alternative to blocks)
            end_block: End block number as integer (alternative to blocks)
            use_latest: If True, query the latest block
            blocks_from_latest: Number of blocks before the latest to include (e.g., 10 = latest-10 to latest)
            contract: Contract address to filter by - IMPORTANT: Use this parameter for ALL address-based filtering
              regardless of the parameter name in the native cryo command (address, contract, etc.)
            output_format: Output format (json, csv, parquet) - use 'parquet' for SQL queries
            include_columns: Columns to include alongside the defaults
            exclude_columns: Columns to exclude from the defaults
    
        Returns:
            Dictionary containing file paths where the downloaded data is stored
        """
        # Ensure we have the RPC URL
        rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL)
        
        # Build the cryo command
        cmd = ["cryo", dataset, "-r", rpc_url]
    
        # Handle block range (priority: blocks > use_latest > start/end_block > default)
        if blocks:
            # Use specified block range string directly
            cmd.extend(["-b", blocks])
        elif use_latest or blocks_from_latest is not None:
            # Get the latest block number
            latest_block = get_latest_block_number()
            
            if latest_block is None:
                return {"error": "Failed to get the latest block number from the RPC endpoint"}
            
            if blocks_from_latest is not None:
                # Use a range of blocks up to the latest
                start = latest_block - blocks_from_latest
                block_range = f"{start}:{latest_block+1}"  # +1 to make it inclusive
            else:
                # Just the latest block
                block_range = f"{latest_block}:{latest_block+1}"  # +1 to make it inclusive
            
            print(f"Using latest block range: {block_range}")
            cmd.extend(["-b", block_range])
        elif start_block is not None:
            # Convert integer block numbers to string range
            if end_block is not None:
                # Note: cryo uses [start:end) range (inclusive start, exclusive end)
                # Add 1 to end_block to include it in the range
                block_range = f"{start_block}:{end_block+1}"
            else:
                # If only start_block is provided, get 10 blocks starting from there
                block_range = f"{start_block}:{start_block+10}"
            
            print(f"Using block range: {block_range}")
            cmd.extend(["-b", block_range])
        else:
            # Default to a reasonable block range if none specified
            cmd.extend(["-b", "1000:1010"])
    
        # Handle dataset-specific address parameters
        # For all address-based filters, we use the contract parameter
        # but map it to the correct flag based on the dataset
        if contract:
            # Check if this is a dataset that requires a different parameter name
            if dataset == 'balances':
                # For balances dataset, contract parameter maps to --address
                cmd.extend(["--address", contract])
            else:
                # For other datasets like logs, transactions, etc. use --contract
                cmd.extend(["--contract", contract])
    
        if output_format == "json":
            cmd.append("--json")
        elif output_format == "csv":
            cmd.append("--csv")
    
        if include_columns:
            cmd.append("--include-columns")
            cmd.extend(include_columns)
    
        if exclude_columns:
            cmd.append("--exclude-columns")
            cmd.extend(exclude_columns)
    
        # Get the base data directory
        data_dir = Path(os.environ.get("CRYO_DATA_DIR", DEFAULT_DATA_DIR))
        
        # Choose output directory based on whether we're querying latest blocks
        if use_latest or blocks_from_latest is not None:
            output_dir = data_dir / "latest"
            output_dir.mkdir(parents=True, exist_ok=True)
            
            # Clean up the latest directory before new query
            print("Cleaning latest directory for current block query")
            existing_files = list(output_dir.glob(f"*{dataset}*.*"))
            for file in existing_files:
                try:
                    file.unlink()
                    print(f"Removed existing file: {file}")
                except Exception as e:
                    print(f"Warning: Could not remove file {file}: {e}")
        else:
            # For historical queries, use the main data directory
            output_dir = data_dir
            output_dir.mkdir(parents=True, exist_ok=True)
    
        cmd.extend(["-o", str(output_dir)])
    
        # Print the command for debugging
        print(f"Running query command: {' '.join(cmd)}")
        
        # Execute the command
        result = subprocess.run(cmd, capture_output=True, text=True)
    
        if result.returncode != 0:
            return {
                "error": result.stderr,
                "stdout": result.stdout,
                "command": " ".join(cmd)
            }
    
        # Try to find the report file which contains info about generated files
        report_dir = output_dir / ".cryo" / "reports"
        if report_dir.exists():
            # Get the most recent report file (should be the one we just created)
            report_files = sorted(report_dir.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True)
            if report_files:
                with open(report_files[0], 'r') as f:
                    report_data = json.load(f)
                    # Get the list of completed files from the report
                    if "results" in report_data and "completed_paths" in report_data["results"]:
                        completed_files = report_data["results"]["completed_paths"]
                        print(f"Found {len(completed_files)} files in Cryo report: {completed_files}")
                        
                        # Return the list of files and their count
                        return {
                            "files": completed_files,
                            "count": len(completed_files),
                            "format": output_format
                        }
        
        # Fallback to glob search if report file not found or doesn't contain the expected data
        output_files = list(output_dir.glob(f"*{dataset}*.{output_format}"))
        print(f"Output files found via glob: {output_files}")
    
        if not output_files:
            return {"error": "No output files generated", "command": " ".join(cmd)}
    
        # Convert Path objects to strings for JSON serialization
        file_paths = [str(file_path) for file_path in output_files]
        
        return {
            "files": file_paths,
            "count": len(file_paths),
            "format": output_format
        }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/z80dev/cryo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server