get_latest_ethereum_block
Retrieve details of the most recent Ethereum block, including block number, using the Cryo MCP Server. Query blockchain data efficiently for analysis or integration.
Instructions
Get information about the latest Ethereum block
Returns:
Information about the latest block including block number
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- cryo_mcp/server.py:573-656 (handler)The handler function for 'get_latest_ethereum_block' tool, decorated with @mcp.tool() for MCP registration. It retrieves the latest Ethereum block number via RPC, downloads block data using the 'cryo blocks' CLI command, and returns file paths to the generated JSON data.@mcp.tool() def get_latest_ethereum_block() -> Dict[str, Any]: """ Get information about the latest Ethereum block Returns: Information about the latest block including block number """ latest_block = get_latest_block_number() if latest_block is None: return {"error": "Failed to get the latest block number from the RPC endpoint"} # Get block data using cryo rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) block_range = f"{latest_block}:{latest_block+1}" # +1 to make it inclusive data_dir = Path(os.environ.get("CRYO_DATA_DIR", DEFAULT_DATA_DIR)) latest_dir = data_dir / "latest" latest_dir.mkdir(parents=True, exist_ok=True) # Always clean up the latest directory for latest block print("Cleaning latest directory for current block") existing_files = list(latest_dir.glob("*blocks*.*")) for file in existing_files: try: file.unlink() print(f"Removed existing file: {file}") except Exception as e: print(f"Warning: Could not remove file {file}: {e}") cmd = [ "cryo", "blocks", "-b", block_range, "-r", rpc_url, "--json", "-o", str(latest_dir) ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return { "block_number": latest_block, "error": "Failed to get detailed block data", "stderr": result.stderr } # Try to find the report file which contains info about generated files report_dir = latest_dir / ".cryo" / "reports" if report_dir.exists(): # Get the most recent report file report_files = sorted(report_dir.glob("*.json"), key=lambda x: x.stat().st_mtime, reverse=True) if report_files: with open(report_files[0], 'r') as f: report_data = json.load(f) # Get the list of completed files from the report if "results" in report_data and "completed_paths" in report_data["results"]: completed_files = report_data["results"]["completed_paths"] print(f"Found {len(completed_files)} files in Cryo report: {completed_files}") return { "block_number": latest_block, "files": completed_files, "count": len(completed_files) } # Fallback to glob search if report file not found output_files = list(latest_dir.glob("*blocks*.json")) if not output_files: return { "block_number": latest_block, "error": "No output files generated" } # Convert Path objects to strings for JSON serialization file_paths = [str(file_path) for file_path in output_files] return { "block_number": latest_block, "files": file_paths, "count": len(file_paths) }
- cryo_mcp/server.py:27-52 (helper)Supporting helper function that queries the Ethereum RPC endpoint using eth_blockNumber to retrieve the latest block number, used by the main tool handler.def get_latest_block_number() -> Optional[int]: """Get the latest block number from the Ethereum node""" rpc_url = os.environ.get("ETH_RPC_URL", DEFAULT_RPC_URL) payload = { "jsonrpc": "2.0", "method": "eth_blockNumber", "params": [], "id": 1 } try: response = requests.post(rpc_url, json=payload) response_data = response.json() if 'result' in response_data: # Convert hex to int latest_block = int(response_data['result'], 16) print(f"Latest block number: {latest_block}") return latest_block else: print(f"Error fetching latest block: {response_data.get('error', 'Unknown error')}") return None except Exception as e: print(f"Exception when fetching latest block: {e}") return None
- cryo_mcp/server.py:573-573 (registration)The @mcp.tool() decorator registers the get_latest_ethereum_block function as an MCP tool.@mcp.tool()