Skip to main content
Glama
contract_analyzer.py8.4 kB
""" 合约分析器 - 核心业务逻辑 """ import json from typing import Dict, List, Optional, Any from datetime import datetime from .etherscan_client import EtherscanClient from .web3_client import Web3Client from .utils import ( is_valid_ethereum_address, to_checksum_address, extract_view_functions, format_function_result ) class ContractAnalyzer: """合约分析器""" def __init__(self): self.etherscan_client = EtherscanClient() self.web3_client = Web3Client() self._initialized = False async def initialize(self) -> bool: """ 初始化分析器 Returns: bool: 初始化是否成功 """ if self._initialized: return True # 验证 Web3 连接 connection_ok = await self.web3_client.verify_connection() if not connection_ok: print("❌ Web3 连接失败,无法继续") return False self._initialized = True print("✅ 合约分析器初始化成功") return True async def analyze_contract(self, contract_address: str) -> Dict[str, Any]: """ 分析合约信息 - 主要业务逻辑 Args: contract_address: 合约地址 Returns: Dict: 分析结果 """ # 初始化检查 if not await self.initialize(): return { "status": "error", "error": "分析器初始化失败", "timestamp": datetime.now().isoformat() } # 步骤1: 验证地址格式 if not is_valid_ethereum_address(contract_address): return { "status": "error", "error": f"无效的以太坊地址格式: {contract_address}", "timestamp": datetime.now().isoformat() } # 转换为校验和格式 checksum_address = to_checksum_address(contract_address) # 步骤2: 检查是否为合约地址 is_contract = await self.web3_client.is_contract_address(checksum_address) if not is_contract: return { "status": "error", "error": f"地址 {checksum_address} 不是合约地址", "timestamp": datetime.now().isoformat() } # 步骤3: 从 Etherscan 获取合约 ABI print(f"📡 正在获取合约 {checksum_address} 的 ABI...") abi = await self.etherscan_client.get_contract_abi(checksum_address) if not abi: return { "status": "error", "error": f"无法获取合约 ABI,可能合约未验证", "contract_address": checksum_address, "timestamp": datetime.now().isoformat() } # 步骤4: 筛选无参数的 view 函数 view_functions = extract_view_functions(abi) if not view_functions: return { "status": "warning", "message": "合约中没有找到无参数的 view 函数", "contract_address": checksum_address, "total_functions": len([f for f in abi if f.get("type") == "function"]), "timestamp": datetime.now().isoformat() } print(f"🔍 找到 {len(view_functions)} 个无参数的 view 函数") # 步骤5: 创建合约实例 contract = await self.web3_client.get_contract_instance(checksum_address, abi) if not contract: return { "status": "error", "error": "创建合约实例失败", "contract_address": checksum_address, "timestamp": datetime.now().isoformat() } # 步骤6: 批量调用 view 函数 function_names = [func["name"] for func in view_functions] print(f"🚀 正在调用 {len(function_names)} 个函数...") call_results = await self.web3_client.batch_call_view_functions( contract, function_names ) # 步骤7: 处理和格式化结果 successful_calls = [] failed_calls = [] for i, result in enumerate(call_results): if result["status"] == "success": # 获取函数的输出类型信息 func_info = view_functions[i] output_type = self._get_output_type(func_info) formatted_result = { "function_name": result["function_name"], "result": result["result"], "type": output_type, "status": "success" } # 格式化特殊类型的值 if output_type.startswith("uint") and isinstance(result["result"], int): if result["result"] > 10**15: # 可能是 wei 值 formatted_result["formatted_value"] = self._format_large_number(result["result"]) elif output_type == "address": formatted_result["checksum_address"] = to_checksum_address(str(result["result"])) successful_calls.append(formatted_result) else: failed_calls.append({ "function_name": result["function_name"], "error": result.get("error", "未知错误"), "status": "failed" }) # 获取额外的合约信息 contract_name = await self.etherscan_client.get_contract_name(checksum_address) # 组装最终结果 final_result = { "status": "success", "contract_address": checksum_address, "contract_name": contract_name, "analysis_summary": { "total_view_functions": len(view_functions), "successful_calls": len(successful_calls), "failed_calls": len(failed_calls) }, "successful_functions": successful_calls, "failed_functions": failed_calls if failed_calls else None, "timestamp": datetime.now().isoformat() } print(f"✅ 分析完成: 成功调用 {len(successful_calls)} 个函数,失败 {len(failed_calls)} 个") return final_result def _get_output_type(self, func_info: Dict) -> str: """ 获取函数输出类型 Args: func_info: 函数信息 Returns: str: 输出类型 """ outputs = func_info.get("outputs", []) if not outputs: return "void" if len(outputs) == 1: return outputs[0].get("type", "unknown") else: return "tuple" def _format_large_number(self, value: int) -> str: """ 格式化大数值 Args: value: 数值 Returns: str: 格式化后的字符串 """ if value >= 10**18: return f"{value / 10**18:.6f} Ether" elif value >= 10**15: return f"{value / 10**15:.6f} milli-Ether" elif value >= 10**12: return f"{value / 10**12:.6f} micro-Ether" else: return str(value) async def get_contract_summary(self, contract_address: str) -> Dict[str, Any]: """ 获取合约基本信息摘要 Args: contract_address: 合约地址 Returns: Dict: 合约摘要信息 """ if not is_valid_ethereum_address(contract_address): return {"error": "无效的地址格式"} checksum_address = to_checksum_address(contract_address) # 基本信息 summary = { "address": checksum_address, "is_contract": await self.web3_client.is_contract_address(checksum_address), "is_verified": await self.etherscan_client.is_contract_verified(checksum_address), "contract_name": await self.etherscan_client.get_contract_name(checksum_address) } return summary

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ACaiSec/ContractInfoMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server