Cryo MCP Server

  • cryo_mcp
# cryo_mcp/server.py """ Cryo MCP - A Model Completion Protocol server for the Cryo blockchain data extraction tool. This module provides a server that exposes Cryo's functionality through the MCP protocol, allowing blockchain data querying through an API interface geared at usage by LLMs. """ import json import os import subprocess import requests import argparse import sys from pathlib import Path from typing import List, Optional, Dict, Any from mcp.server.fastmcp import FastMCP # Get the default RPC URL from environment or use fallback DEFAULT_RPC_URL = "http://localhost:8545" # Create an MCP server mcp = FastMCP("Cryo Data Server") def get_latest_block_number() -> Optional[int]: """Get the latest block number from the Ethereum node""" rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) payload = { "jsonrpc": "2.0", "method": "eth_blockNumber", "params": [], "id": 1 } try: response = requests.post(rpc_url, json=payload) response_data = response.json() if 'result' in response_data: # Convert hex to int latest_block = int(response_data['result'], 16) print(f"Latest block number: {latest_block}") return latest_block else: print(f"Error fetching latest block: {response_data.get('error', 'Unknown error')}") return None except Exception as e: print(f"Exception when fetching latest block: {e}") return None @mcp.tool() def list_datasets() -> List[str]: """Return a list of all available cryo datasets""" # Ensure we have the RPC URL rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) result = subprocess.run( ["cryo", "help", "datasets", "-r", rpc_url], capture_output=True, text=True ) # Parse the output to extract dataset names lines = result.stdout.split('\n') datasets = [] for line in lines: if line.startswith('- ') and not line.startswith('- blocks_and_transactions:'): # Extract dataset name, removing any aliases dataset = line[2:].split(' (alias')[0].strip() datasets.append(dataset) if line == 'dataset group names': break return datasets @mcp.tool() def query_dataset( dataset: str, blocks: Optional[str] = None, start_block: Optional[int] = None, end_block: Optional[int] = None, use_latest: bool = False, blocks_from_latest: Optional[int] = None, contract: Optional[str] = None, output_format: str = "json", include_columns: Optional[List[str]] = None, exclude_columns: Optional[List[str]] = None ) -> Dict[str, Any]: """ Query a cryo dataset and return the results Args: dataset: The name of the dataset to query (e.g., 'logs', 'transactions') blocks: Block range specification as a string (e.g., '1000:1010') start_block: Start block number as integer (alternative to blocks) end_block: End block number as integer (alternative to blocks) use_latest: If True, query the latest block blocks_from_latest: Number of blocks before the latest to include (e.g., 10 = latest-10 to latest) contract: Contract address to filter by output_format: Output format (json, csv, parquet) include_columns: Columns to include alongside the defaults exclude_columns: Columns to exclude from the defaults Returns: The dataset results """ # Ensure we have the RPC URL rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) # Build the cryo command cmd = ["cryo", dataset, "-r", rpc_url] # Handle block range (priority: blocks > use_latest > start/end_block > default) if blocks: # Use specified block range string directly cmd.extend(["-b", blocks]) elif use_latest or blocks_from_latest is not None: # Get the latest block number latest_block = get_latest_block_number() if latest_block is None: return {"error": "Failed to get the latest block number from the RPC endpoint"} if blocks_from_latest is not None: # Use a range of blocks up to the latest start = latest_block - blocks_from_latest block_range = f"{start}:{latest_block+1}" # +1 to make it inclusive else: # Just the latest block block_range = f"{latest_block}:{latest_block+1}" # +1 to make it inclusive print(f"Using latest block range: {block_range}") cmd.extend(["-b", block_range]) elif start_block is not None: # Convert integer block numbers to string range if end_block is not None: # Note: cryo uses [start:end) range (inclusive start, exclusive end) # Add 1 to end_block to include it in the range block_range = f"{start_block}:{end_block+1}" else: # If only start_block is provided, get 10 blocks starting from there block_range = f"{start_block}:{start_block+10}" print(f"Using block range: {block_range}") cmd.extend(["-b", block_range]) else: # Default to a reasonable block range if none specified cmd.extend(["-b", "1000:1010"]) if contract: cmd.extend(["--contract", contract]) if output_format == "json": cmd.append("--json") elif output_format == "csv": cmd.append("--csv") if include_columns: cmd.append("--include-columns") cmd.extend(include_columns) if exclude_columns: cmd.append("--exclude-columns") cmd.extend(exclude_columns) # Create a temporary directory for output temp_dir = Path("/tmp/cryo_output") temp_dir.mkdir(exist_ok=True) cmd.extend(["-o", str(temp_dir)]) # Print the command for debugging print(f"Running query command: {' '.join(cmd)}") # Execute the command result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return { "error": result.stderr, "stdout": result.stdout, "command": " ".join(cmd) } # Find the output file(s) output_files = list(temp_dir.glob(f"*{dataset}*.{output_format}")) print(f"Output files found: {output_files}") if not output_files: return {"error": "No output files generated", "command": " ".join(cmd)} # Read and return the data if output_format == "json": with open(output_files[0], 'r') as f: data = json.load(f) return { "data": data, "count": len(data) if isinstance(data, list) else 1, } else: # For other formats, just return the file path return {"file_path": str(output_files[0])} @mcp.resource("dataset://{name}") def get_dataset_info(name: str) -> Dict[str, Any]: """Get information about a specific dataset""" # Ensure we have the RPC URL rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) result = subprocess.run( ["cryo", "help", name, "-r", rpc_url], capture_output=True, text=True ) # Get the latest block number for examples latest_block = get_latest_block_number() latest_example = "" if latest_block: latest_example = f"query_dataset('{name}', blocks_from_latest=10) # Gets latest-10 to latest blocks" return { "name": name, "description": result.stdout, "example_queries": [ f"query_dataset('{name}', blocks='1000:1010')", f"query_dataset('{name}', start_block=1000, end_block=1009)", f"query_dataset('{name}', use_latest=True) # Gets just the latest block", latest_example ], "notes": [ "Block ranges are inclusive for start_block and end_block when using integer parameters.", "Use 'use_latest=True' to query only the latest block.", "Use 'blocks_from_latest=N' to query the latest N blocks." ] } @mcp.tool() def lookup_dataset( name: str, sample_start_block: Optional[int] = None, sample_end_block: Optional[int] = None, use_latest_sample: bool = False, sample_blocks_from_latest: Optional[int] = None ) -> Dict[str, Any]: """ Look up a specific dataset and return detailed information about it Args: name: The name of the dataset to look up sample_start_block: Optional start block for sample data (integer) sample_end_block: Optional end block for sample data (integer) use_latest_sample: If True, use the latest block for sample data sample_blocks_from_latest: Number of blocks before the latest to include in sample Returns: Detailed information about the dataset including schema and available fields """ # Get basic dataset info info = get_dataset_info(name) # Ensure we have the RPC URL rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) # Get schema information by running the dataset with --dry-run schema_result = subprocess.run( ["cryo", name, "--dry-run", "-r", rpc_url], capture_output=True, text=True ) if schema_result.returncode == 0: info["schema"] = schema_result.stdout else: info["schema_error"] = schema_result.stderr # Try to get a sample of the dataset (first 5 records) try: temp_dir = Path("/tmp/cryo_samples") temp_dir.mkdir(exist_ok=True) # Determine block range for sample (priority: latest > specified blocks > default) if use_latest_sample or sample_blocks_from_latest is not None: # Get the latest block number latest_block = get_latest_block_number() if latest_block is None: info["sample_error"] = "Failed to get the latest block number from the RPC endpoint" return info if sample_blocks_from_latest is not None: # Use a range of blocks from latest-n to latest block_range = f"{latest_block - sample_blocks_from_latest}:{latest_block+1}" else: # Just the latest 5 blocks block_range = f"{latest_block-4}:{latest_block+1}" info["sample_block_range"] = block_range elif sample_start_block is not None: if sample_end_block is not None: # Note: cryo uses [start:end) range (inclusive start, exclusive end) # Add 1 to end_block to include it in the range block_range = f"{sample_start_block}:{sample_end_block+1}" else: # Use start block and get 5 blocks block_range = f"{sample_start_block}:{sample_start_block+5}" else: # Default to a known good block range block_range = "1000:1005" # Use the block range for the sample sample_cmd = [ "cryo", name, "-b", block_range, "-r", rpc_url, "--json", "-o", str(temp_dir) ] print(f"Running sample command: {' '.join(sample_cmd)}") sample_result = subprocess.run( sample_cmd, capture_output=True, text=True, timeout=30 # Add timeout to prevent hanging ) if sample_result.returncode == 0: # Find the output file output_files = list(temp_dir.glob(f"*{name}*.json")) print(f"Output files found: {output_files}") if output_files: with open(output_files[0], 'r') as f: data = json.load(f) # Only include a few records as a sample info["sample_data"] = data[:5] if isinstance(data, list) else data else: info["sample_error"] = "No output files generated" else: info["sample_error"] = sample_result.stderr info["sample_stdout"] = sample_result.stdout # Include stdout for debugging except (subprocess.TimeoutExpired, Exception) as e: info["sample_error"] = str(e) return info @mcp.tool() def get_transaction_by_hash( tx_hash: str ) -> Dict[str, Any]: """ Get detailed information about a transaction by its hash Args: tx_hash: The transaction hash to look up Returns: Detailed information about the transaction """ # Ensure we have the RPC URL rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) # Use RPC directly to get the transaction payload = { "jsonrpc": "2.0", "method": "eth_getTransactionByHash", "params": [tx_hash], "id": 1 } try: response = requests.post(rpc_url, json=payload) response_data = response.json() if 'result' in response_data and response_data['result']: tx_data = response_data['result'] # Get the receipt as well for additional information (gas used, status) receipt_payload = { "jsonrpc": "2.0", "method": "eth_getTransactionReceipt", "params": [tx_hash], "id": 2 } receipt_response = requests.post(rpc_url, json=receipt_payload) receipt_data = receipt_response.json() if 'result' in receipt_data and receipt_data['result']: receipt = receipt_data['result'] # Combine transaction and receipt data result = { "transaction_hash": tx_hash, "block_number": int(tx_data.get("blockNumber", "0x0"), 16), "block_hash": tx_data.get("blockHash"), "from_address": tx_data.get("from"), "to_address": tx_data.get("to"), "value": tx_data.get("value"), "value_decimal": int(tx_data.get("value", "0x0"), 16), "gas_limit": int(tx_data.get("gas", "0x0"), 16), "gas_price": int(tx_data.get("gasPrice", "0x0"), 16), "nonce": int(tx_data.get("nonce", "0x0"), 16), "input": tx_data.get("input"), "transaction_index": int(tx_data.get("transactionIndex", "0x0"), 16), "gas_used": int(receipt.get("gasUsed", "0x0"), 16), "status": int(receipt.get("status", "0x0"), 16), "logs_count": len(receipt.get("logs", [])), "contract_address": receipt.get("contractAddress") } # Handle EIP-1559 transactions if "maxFeePerGas" in tx_data: result["max_fee_per_gas"] = int(tx_data.get("maxFeePerGas", "0x0"), 16) result["max_priority_fee_per_gas"] = int(tx_data.get("maxPriorityFeePerGas", "0x0"), 16) result["transaction_type"] = int(tx_data.get("type", "0x0"), 16) return result else: # Return just the transaction data if receipt is not available return { "transaction_hash": tx_hash, "block_number": int(tx_data.get("blockNumber", "0x0"), 16), "block_hash": tx_data.get("blockHash"), "from_address": tx_data.get("from"), "to_address": tx_data.get("to"), "value": tx_data.get("value"), "value_decimal": int(tx_data.get("value", "0x0"), 16), "gas_limit": int(tx_data.get("gas", "0x0"), 16), "gas_price": int(tx_data.get("gasPrice", "0x0"), 16), "nonce": int(tx_data.get("nonce", "0x0"), 16), "input": tx_data.get("input"), "transaction_index": int(tx_data.get("transactionIndex", "0x0"), 16), "error": "Failed to retrieve transaction receipt" } else: return {"error": f"Transaction not found: {tx_hash}"} except Exception as e: return {"error": f"Exception when fetching transaction: {e}"} @mcp.tool() def get_latest_ethereum_block() -> Dict[str, Any]: """ Get information about the latest Ethereum block Returns: Information about the latest block including block number """ latest_block = get_latest_block_number() if latest_block is None: return {"error": "Failed to get the latest block number from the RPC endpoint"} # Get block data using cryo rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) block_range = f"{latest_block}:{latest_block+1}" # +1 to make it inclusive temp_dir = Path("/tmp/cryo_latest") temp_dir.mkdir(exist_ok=True) cmd = [ "cryo", "blocks", "-b", block_range, "-r", rpc_url, "--json", "-o", str(temp_dir) ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return { "block_number": latest_block, "error": "Failed to get detailed block data", "stderr": result.stderr } # Find the output file output_files = list(temp_dir.glob("*blocks*.json")) if not output_files: return { "block_number": latest_block, "error": "No output files generated" } # Read the block data with open(output_files[0], 'r') as f: data = json.load(f) if data and len(data) > 0: block_data = data[0] return { "block_number": latest_block, "timestamp": block_data.get("timestamp"), "hash": block_data.get("block_hash"), "author": block_data.get("author"), "gas_used": block_data.get("gas_used"), "base_fee_per_gas": block_data.get("base_fee_per_gas") } else: return { "block_number": latest_block, "error": "Empty block data" } def parse_args(args=None): """Parse command line arguments""" parser = argparse.ArgumentParser(description="Cryo Data Server") parser.add_argument( "--rpc-url", type=str, help="Ethereum RPC URL to use for requests" ) return parser.parse_args(args) def main(): """Main entry point for the command-line script""" args = parse_args() # Set RPC URL with priority: command line > environment variable > default if args.rpc_url: rpc_url = args.rpc_url os.environ["ETH_RPC_URL"] = rpc_url print(f"Using RPC URL from command line: {rpc_url}") elif os.environ.get("ETH_RPC_URL"): rpc_url = os.environ["ETH_RPC_URL"] print(f"Using RPC URL from environment: {rpc_url}") else: rpc_url = DEFAULT_RPC_URL os.environ["ETH_RPC_URL"] = rpc_url print(f"Using default RPC URL: {rpc_url}") mcp.run() return 0 if __name__ == "__main__": sys.exit(main())