Skip to main content
Glama

query_dataset

Download blockchain datasets like transactions or logs using specified block ranges and contract addresses. Returns file paths for use in SQL queries or further processing on the Cryo MCP Server.

Instructions

Download blockchain data and return the file paths where the data is stored. IMPORTANT WORKFLOW NOTE: When running SQL queries, use this function first to download data, then use the returned file paths with query_sql() to execute SQL on those files. Example workflow for SQL: 1. First download data: result = query_dataset('transactions', blocks='1000:1010', output_format='parquet') 2. Get file paths: files = result.get('files', []) 3. Run SQL query: query_sql("SELECT * FROM read_parquet('/path/to/file.parquet')", files=files) DATASET-SPECIFIC PARAMETERS: For datasets that require specific address parameters (like 'balances', 'erc20_transfers', etc.), ALWAYS use the 'contract' parameter to pass ANY Ethereum address. For example: - For 'balances' dataset: Use contract parameter for the address you want balances for query_dataset('balances', blocks='1000:1010', contract='0x123...') - For 'logs' or 'erc20_transfers': Use contract parameter for contract address query_dataset('logs', blocks='1000:1010', contract='0x123...') To check what parameters a dataset requires, always use lookup_dataset() first: lookup_dataset('balances') # Will show required parameters Args: dataset: The name of the dataset to query (e.g., 'logs', 'transactions', 'balances') blocks: Block range specification as a string (e.g., '1000:1010') start_block: Start block number as integer (alternative to blocks) end_block: End block number as integer (alternative to blocks) use_latest: If True, query the latest block blocks_from_latest: Number of blocks before the latest to include (e.g., 10 = latest-10 to latest) contract: Contract address to filter by - IMPORTANT: Use this parameter for ALL address-based filtering regardless of the parameter name in the native cryo command (address, contract, etc.) output_format: Output format (json, csv, parquet) - use 'parquet' for SQL queries include_columns: Columns to include alongside the defaults exclude_columns: Columns to exclude from the defaults Returns: Dictionary containing file paths where the downloaded data is stored

Input Schema

NameRequiredDescriptionDefault
blocksNo
blocks_from_latestNo
contractNo
datasetYes
end_blockNo
exclude_columnsNo
include_columnsNo
output_formatNojson
start_blockNo
use_latestNo

Input Schema (JSON Schema)

{ "properties": { "blocks": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Blocks" }, "blocks_from_latest": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Blocks From Latest" }, "contract": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Contract" }, "dataset": { "title": "Dataset", "type": "string" }, "end_block": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "End Block" }, "exclude_columns": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "title": "Exclude Columns" }, "include_columns": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "title": "Include Columns" }, "output_format": { "default": "json", "title": "Output Format", "type": "string" }, "start_block": { "anyOf": [ { "type": "integer" }, { "type": "null" } ], "default": null, "title": "Start Block" }, "use_latest": { "default": false, "title": "Use Latest", "type": "boolean" } }, "required": [ "dataset" ], "title": "query_datasetArguments", "type": "object" }

Implementation Reference

  • The query_dataset tool handler function, registered via @mcp.tool() decorator. Executes cryo CLI to download blockchain dataset data for specified parameters and returns file paths to the generated files.
    @mcp.tool() def query_dataset( dataset: str, blocks: Optional[str] = None, start_block: Optional[int] = None, end_block: Optional[int] = None, use_latest: bool = False, blocks_from_latest: Optional[int] = None, contract: Optional[str] = None, output_format: str = "json", include_columns: Optional[List[str]] = None, exclude_columns: Optional[List[str]] = None ) -> Dict[str, Any]: """ Download blockchain data and return the file paths where the data is stored. IMPORTANT WORKFLOW NOTE: When running SQL queries, use this function first to download data, then use the returned file paths with query_sql() to execute SQL on those files. Example workflow for SQL: 1. First download data: result = query_dataset('transactions', blocks='1000:1010', output_format='parquet') 2. Get file paths: files = result.get('files', []) 3. Run SQL query: query_sql("SELECT * FROM read_parquet('/path/to/file.parquet')", files=files) DATASET-SPECIFIC PARAMETERS: For datasets that require specific address parameters (like 'balances', 'erc20_transfers', etc.), ALWAYS use the 'contract' parameter to pass ANY Ethereum address. For example: - For 'balances' dataset: Use contract parameter for the address you want balances for query_dataset('balances', blocks='1000:1010', contract='0x123...') - For 'logs' or 'erc20_transfers': Use contract parameter for contract address query_dataset('logs', blocks='1000:1010', contract='0x123...') To check what parameters a dataset requires, always use lookup_dataset() first: lookup_dataset('balances') # Will show required parameters Args: dataset: The name of the dataset to query (e.g., 'logs', 'transactions', 'balances') blocks: Block range specification as a string (e.g., '1000:1010') start_block: Start block number as integer (alternative to blocks) end_block: End block number as integer (alternative to blocks) use_latest: If True, query the latest block blocks_from_latest: Number of blocks before the latest to include (e.g., 10 = latest-10 to latest) contract: Contract address to filter by - IMPORTANT: Use this parameter for ALL address-based filtering regardless of the parameter name in the native cryo command (address, contract, etc.) output_format: Output format (json, csv, parquet) - use 'parquet' for SQL queries include_columns: Columns to include alongside the defaults exclude_columns: Columns to exclude from the defaults Returns: Dictionary containing file paths where the downloaded data is stored """ # Ensure we have the RPC URL rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) # Build the cryo command cmd = ["cryo", dataset, "-r", rpc_url] # Handle block range (priority: blocks > use_latest > start/end_block > default) if blocks: # Use specified block range string directly cmd.extend(["-b", blocks]) elif use_latest or blocks_from_latest is not None: # Get the latest block number latest_block = get_latest_block_number() if latest_block is None: return {"error": "Failed to get the latest block number from the RPC endpoint"} if blocks_from_latest is not None: # Use a range of blocks up to the latest start = latest_block - blocks_from_latest block_range = f"{start}:{latest_block+1}" # +1 to make it inclusive else: # Just the latest block block_range = f"{latest_block}:{latest_block+1}" # +1 to make it inclusive print(f"Using latest block range: {block_range}") cmd.extend(["-b", block_range]) elif start_block is not None: # Convert integer block numbers to string range if end_block is not None: # Note: cryo uses [start:end) range (inclusive start, exclusive end) # Add 1 to end_block to include it in the range block_range = f"{start_block}:{end_block+1}" else: # If only start_block is provided, get 10 blocks starting from there block_range = f"{start_block}:{start_block+10}" print(f"Using block range: {block_range}") cmd.extend(["-b", block_range]) else: # Default to a reasonable block range if none specified cmd.extend(["-b", "1000:1010"]) # Handle dataset-specific address parameters # For all address-based filters, we use the contract parameter # but map it to the correct flag based on the dataset if contract: # Check if this is a dataset that requires a different parameter name if dataset == 'balances': # For balances dataset, contract parameter maps to --address cmd.extend(["--address", contract]) else: # For other datasets like logs, transactions, etc. use --contract cmd.extend(["--contract", contract]) if output_format == "json": cmd.append("--json") elif output_format == "csv": cmd.append("--csv") if include_columns: cmd.append("--include-columns") cmd.extend(include_columns) if exclude_columns: cmd.append("--exclude-columns") cmd.extend(exclude_columns) # Get the base data directory data_dir = Path(os.environ.get("CRYO_DATA_DIR", DEFAULT_DATA_DIR)) # Choose output directory based on whether we're querying latest blocks if use_latest or blocks_from_latest is not None: output_dir = data_dir / "latest" output_dir.mkdir(parents=True, exist_ok=True) # Clean up the latest directory before new query print("Cleaning latest directory for current block query") existing_files = list(output_dir.glob(f"*{dataset}*.*")) for file in existing_files: try: file.unlink() print(f"Removed existing file: {file}") except Exception as e: print(f"Warning: Could not remove file {file}: {e}") else: # For historical queries, use the main data directory output_dir = data_dir output_dir.mkdir(parents=True, exist_ok=True) cmd.extend(["-o", str(output_dir)]) # Print the command for debugging print(f"Running query command: {' '.join(cmd)}") # Execute the command result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return { "error": result.stderr, "stdout": result.stdout, "command": " ".join(cmd) } # Try to find the report file which contains info about generated files report_dir = output_dir / ".cryo" / "reports" if report_dir.exists(): # Get the most recent report file (should be the one we just created) report_files = sorted(report_dir.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True) if report_files: with open(report_files[0], 'r') as f: report_data = json.load(f) # Get the list of completed files from the report if "results" in report_data and "completed_paths" in report_data["results"]: completed_files = report_data["results"]["completed_paths"] print(f"Found {len(completed_files)} files in Cryo report: {completed_files}") # Return the list of files and their count return { "files": completed_files, "count": len(completed_files), "format": output_format } # Fallback to glob search if report file not found or doesn't contain the expected data output_files = list(output_dir.glob(f"*{dataset}*.{output_format}")) print(f"Output files found via glob: {output_files}") if not output_files: return {"error": "No output files generated", "command": " ".join(cmd)} # Convert Path objects to strings for JSON serialization file_paths = [str(file_path) for file_path in output_files] return { "files": file_paths, "count": len(file_paths), "format": output_format }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/z80dev/cryo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server