#!/usr/bin/env python3
"""
Token Info MCP Server
This MCP server provides comprehensive token smart contract information by integrating
with blockchain tools and APIs. It focuses on two primary capabilities:
1. Proxy contract analysis using Foundry's cast tool
2. Access control role discovery using Etherscan API
Built following the official MCP documentation:
https://modelcontextprotocol.io/quickstart/server#python
"""
import asyncio
import subprocess
import sys
from typing import Any, Dict, List, Optional
import httpx
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
import os
from dataclasses import dataclass
import json
import logging
# Load environment variables
load_dotenv()
# Initialize FastMCP server
mcp = FastMCP("token-info")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
ETHERSCAN_API_BASE = "https://api.etherscan.io/api"
USER_AGENT = "token-info-mcp/1.0"
DEFAULT_TIMEOUT = 30.0
# Configuration
ETHERSCAN_API_KEY = os.getenv("ETHERSCAN_API_KEY", "YourApiKeyToken")
DEFAULT_RPC_URL = os.getenv("DEFAULT_RPC_URL", "https://eth-mainnet.g.alchemy.com/v2/your-api-key")
RATE_LIMIT_REQUESTS_PER_SECOND = int(os.getenv("RATE_LIMIT_REQUESTS_PER_SECOND", "5"))
# Network configurations - using environment variables
def get_default_networks():
return {
"mainnet": {
"rpc_url": os.getenv("MAINNET_RPC_URL", DEFAULT_RPC_URL),
"etherscan_url": "https://api.etherscan.io/api",
"chain_id": 1,
},
"goerli": {
"rpc_url": os.getenv("GOERLI_RPC_URL", "https://eth-goerli.g.alchemy.com/v2/your-api-key"),
"etherscan_url": "https://api-goerli.etherscan.io/api",
"chain_id": 5,
},
"sepolia": {
"rpc_url": os.getenv("SEPOLIA_RPC_URL", "https://eth-sepolia.g.alchemy.com/v2/your-api-key"),
"etherscan_url": "https://api-sepolia.etherscan.io/api",
"chain_id": 11155111,
},
"polygon": {
"rpc_url": os.getenv("POLYGON_RPC_URL", "https://polygon-mainnet.g.alchemy.com/v2/your-api-key"),
"etherscan_url": "https://api.polygonscan.com/api",
"chain_id": 137,
},
"arbitrum": {
"rpc_url": os.getenv("ARBITRUM_RPC_URL", "https://arb-mainnet.g.alchemy.com/v2/your-api-key"),
"etherscan_url": "https://api.arbiscan.io/api",
"chain_id": 42161,
},
}
NETWORKS = get_default_networks()
# AccessControl event signatures
ACCESS_CONTROL_EVENTS = {
"RoleGranted": "0x2f8788117e7eff1d82e926ec794901d17c78024a50270940304540a733656f0d",
"RoleRevoked": "0xf6391f5c32d9c69d2a47ea670b442974b53935d1edc7fd64eb21e047a839171b",
"RoleAdminChanged": "0xbd79b86ffe0ab8e8776151514217cd7cacd52c909f66475c3af44e129f0b00ff",
}
# Well-known roles
WELL_KNOWN_ROLES = {
"0x0000000000000000000000000000000000000000000000000000000000000000": "DEFAULT_ADMIN_ROLE",
"0x9f2df0fed2c77648de5860a4cc508cd0818c85b8b8a1ab4ceeef8d981c8956a6": "MINTER_ROLE",
"0x65d7a28e3265b37a6474929f336521b332c1681b933f6cb9f3376673440d862a": "PAUSER_ROLE",
"0x189ab7a9244df0848122154315af71fe140f3db0fe014031783b0946b8c9d2e3": "UPGRADER_ROLE",
"0x3c11d16cbaffd01df69ce1c404f6340ee057498f5f00246190ea54220576a848": "BURNER_ROLE",
}
@dataclass
class ProxyAdminResponse:
"""Response for proxy admin query"""
contract_address: str
network: str
is_proxy: bool
proxy_admin: Optional[str] = None
proxy_type: Optional[str] = None
proxy_admin_owner: Optional[str] = None
is_proxy_admin_contract: Optional[bool] = None
ownership_chain: Optional[List[str]] = None
@dataclass
class RoleInfo:
"""Information about a specific role"""
role_hash: str
role_name: str
admin_role: str
members: List[str]
total_grants: int
total_revokes: int
@dataclass
class ContractRolesResponse:
"""Response for contract roles query"""
contract_address: str
network: str
roles: List[RoleInfo]
total_roles: int
block_range: Dict[str, str]
@dataclass
class AddressTypeResponse:
"""Response for address type analysis"""
address: str
network: str
address_type: str # "eoa", "contract", "multisig"
is_contract: bool
contract_type: Optional[str] = None
multisig_info: Optional[Dict[str, Any]] = None
# Helper functions
def validate_ethereum_address(address: str) -> bool:
"""Validate an Ethereum address"""
if not address.startswith("0x"):
return False
if len(address) != 42:
return False
try:
int(address, 16)
return True
except ValueError:
return False
def get_network_config(network: str) -> Dict[str, Any]:
"""Get network configuration"""
if network in NETWORKS:
return NETWORKS[network]
elif network.startswith("http"):
# Custom RPC URL
return {
"rpc_url": network,
"etherscan_url": ETHERSCAN_API_BASE,
"chain_id": 1,
}
else:
# Default to mainnet
return NETWORKS["mainnet"]
def get_role_name(role_hash: str) -> str:
"""Get human-readable role name"""
return WELL_KNOWN_ROLES.get(role_hash, f"UNKNOWN_ROLE_{role_hash[2:10]}")
async def execute_cast_command(args: List[str]) -> Optional[str]:
"""Execute cast command with timeout"""
try:
process = await asyncio.create_subprocess_exec(
"cast",
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=DEFAULT_TIMEOUT
)
if process.returncode == 0:
return stdout.decode().strip()
else:
error_msg = stderr.decode().strip()
logger.debug(f"Cast command failed: {error_msg}")
# Check if it's a "not a proxy" error
if "not a proxy" in error_msg.lower() or "not an EIP-1967 proxy" in error_msg.lower():
return None
raise Exception(f"Cast command failed: {error_msg}")
except asyncio.TimeoutError:
logger.error("Cast command timed out")
raise Exception("Cast command timed out")
except FileNotFoundError:
logger.error("Cast command not found - ensure Foundry is installed")
raise Exception("Cast command not found - ensure Foundry is installed")
async def check_if_proxy_admin_contract(address: str, rpc_url: str) -> bool:
"""Check if an address is a ProxyAdmin contract by looking for owner() function"""
try:
# Try to call owner() function (common in ProxyAdmin contracts)
cast_args = ["call", address, "owner()", "--rpc-url", rpc_url]
result = await execute_cast_command(cast_args)
return result is not None and len(result) == 66 # Valid Ethereum address in hex
except:
return False
async def get_proxy_admin_owner(proxy_admin_address: str, rpc_url: str) -> Optional[str]:
"""Get the owner of a ProxyAdmin contract"""
try:
# Call owner() function
cast_args = ["call", proxy_admin_address, "owner()", "--rpc-url", rpc_url]
result = await execute_cast_command(cast_args)
if result and len(result) == 66: # 0x + 64 hex chars
# Convert from 32-byte hex to 20-byte address
owner_address = "0x" + result[-40:].lower()
return owner_address if validate_ethereum_address(owner_address) else None
return None
except Exception as e:
logger.debug(f"Failed to get ProxyAdmin owner: {e}")
return None
async def verify_proxy_contract(contract_address: str, rpc_url: str) -> bool:
"""Verify if a contract is actually a proxy by checking EIP-1967 proxy storage slots"""
try:
# EIP-1967 proxy storage slots
implementation_slot = "0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc"
admin_slot = "0xb53127684a568b3173ae13b9f8a6016e243e63b6e8ee1178d6a717850b5d6103"
async with httpx.AsyncClient(timeout=30.0) as client:
# Check implementation slot
impl_payload = {
"jsonrpc": "2.0",
"method": "eth_getStorageAt",
"params": [contract_address, implementation_slot, "latest"],
"id": 1
}
impl_response = await client.post(rpc_url, json=impl_payload)
impl_response.raise_for_status()
impl_result = impl_response.json()
# Check admin slot
admin_payload = {
"jsonrpc": "2.0",
"method": "eth_getStorageAt",
"params": [contract_address, admin_slot, "latest"],
"id": 2
}
admin_response = await client.post(rpc_url, json=admin_payload)
admin_response.raise_for_status()
admin_result = admin_response.json()
# If either slot has non-zero data, it's likely a proxy
if ("result" in impl_result and impl_result["result"] != "0x0000000000000000000000000000000000000000000000000000000000000000" and
impl_result["result"] != "0x"):
return True
if ("result" in admin_result and admin_result["result"] != "0x0000000000000000000000000000000000000000000000000000000000000000" and
admin_result["result"] != "0x"):
return True
return False
except Exception as e:
logger.debug(f"Error verifying proxy contract: {e}")
return False
async def analyze_ownership_chain(contract_address: str, rpc_url: str) -> Dict[str, Any]:
"""Analyze the complete ownership chain of a proxy contract"""
ownership_analysis = {
"is_proxy": False,
"proxy_admin": None,
"proxy_admin_owner": None,
"is_proxy_admin_contract": False,
"ownership_chain": []
}
# Step 1: Get proxy admin
cast_args = ["admin", contract_address, "--rpc-url", rpc_url]
proxy_admin = await execute_cast_command(cast_args)
if proxy_admin is None:
return ownership_analysis
# Check if admin is zero address - need to verify if this is a real proxy
# with renounced admin vs a non-proxy contract
if proxy_admin.lower() == "0x0000000000000000000000000000000000000000":
logger.debug(f"Cast admin returned zero address for {contract_address} - verifying if actually a proxy")
# Try to check if this is a legitimate proxy by checking for implementation slot
is_real_proxy = await verify_proxy_contract(contract_address, rpc_url)
if not is_real_proxy:
logger.debug(f"Contract {contract_address} is not a proxy - cast admin returned zero for non-proxy")
return ownership_analysis
logger.debug(f"Contract {contract_address} is a proxy with renounced admin")
ownership_analysis["is_proxy"] = True
ownership_analysis["proxy_admin"] = proxy_admin.lower()
ownership_analysis["ownership_chain"].append(contract_address.lower())
ownership_analysis["ownership_chain"].append(proxy_admin.lower())
# Step 2: Check if proxy admin is a ProxyAdmin contract
is_proxy_admin_contract = await check_if_proxy_admin_contract(proxy_admin, rpc_url)
ownership_analysis["is_proxy_admin_contract"] = is_proxy_admin_contract
if is_proxy_admin_contract:
# Step 3: Get owner of ProxyAdmin contract
proxy_admin_owner = await get_proxy_admin_owner(proxy_admin, rpc_url)
if proxy_admin_owner:
ownership_analysis["proxy_admin_owner"] = proxy_admin_owner
ownership_analysis["ownership_chain"].append(proxy_admin_owner)
return ownership_analysis
async def check_if_contract(address: str, rpc_url: str) -> bool:
"""Check if an address is a contract by checking if it has code"""
try:
# Use JSON-RPC to get code at address
payload = {
"jsonrpc": "2.0",
"method": "eth_getCode",
"params": [address, "latest"],
"id": 1
}
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(rpc_url, json=payload)
response.raise_for_status()
result = response.json()
if "result" in result:
code = result["result"]
# If code is "0x" or empty, it's an EOA
return code != "0x" and len(code) > 2
return False
except Exception:
return False
async def detect_multisig_type(address: str, rpc_url: str) -> Optional[Dict[str, Any]]:
"""Detect if a contract is a multisig and get its details"""
try:
# Check for Gnosis Safe (most common multisig)
gnosis_info = await check_gnosis_safe(address, rpc_url)
if gnosis_info:
return gnosis_info
# Check for other multisig types
other_info = await check_other_multisig(address, rpc_url)
if other_info:
return other_info
return None
except Exception:
return None
async def check_gnosis_safe(address: str, rpc_url: str) -> Optional[Dict[str, Any]]:
"""Check if address is a Gnosis Safe and get threshold/owners"""
try:
# Function selectors for Gnosis Safe
get_owners_selector = "0xa0e67e2b" # getOwners()
get_threshold_selector = "0xe75235b8" # getThreshold()
async with httpx.AsyncClient(timeout=30.0) as client:
# Get owners
owners_payload = {
"jsonrpc": "2.0",
"method": "eth_call",
"params": [
{
"to": address,
"data": get_owners_selector
},
"latest"
],
"id": 1
}
owners_response = await client.post(rpc_url, json=owners_payload)
owners_response.raise_for_status()
owners_result = owners_response.json()
# Get threshold
threshold_payload = {
"jsonrpc": "2.0",
"method": "eth_call",
"params": [
{
"to": address,
"data": get_threshold_selector
},
"latest"
],
"id": 2
}
threshold_response = await client.post(rpc_url, json=threshold_payload)
threshold_response.raise_for_status()
threshold_result = threshold_response.json()
# Parse results
if ("result" in owners_result and owners_result["result"] != "0x" and
"result" in threshold_result and threshold_result["result"] != "0x"):
owners = parse_owners_from_hex(owners_result["result"])
threshold = parse_threshold_from_hex(threshold_result["result"])
return {
"type": "gnosis_safe",
"threshold": threshold,
"owners": owners,
"owner_count": len(owners)
}
return None
except Exception:
return None
async def check_other_multisig(address: str, rpc_url: str) -> Optional[Dict[str, Any]]:
"""Check for other multisig implementations"""
try:
# Check for MultiSigWallet (older standard)
# Function selector for getOwners() in MultiSigWallet
get_owners_selector = "0xa0e67e2b"
async with httpx.AsyncClient(timeout=30.0) as client:
payload = {
"jsonrpc": "2.0",
"method": "eth_call",
"params": [
{
"to": address,
"data": get_owners_selector
},
"latest"
],
"id": 1
}
response = await client.post(rpc_url, json=payload)
response.raise_for_status()
result = response.json()
if "result" in result and result["result"] != "0x":
owners = parse_owners_from_hex(result["result"])
if owners:
return {
"type": "multisig_wallet",
"owners": owners,
"owner_count": len(owners)
}
return None
except Exception:
return None
def parse_owners_from_hex(hex_data: str) -> List[str]:
"""Parse owner addresses from hex response"""
try:
# Remove 0x prefix
hex_data = hex_data[2:] if hex_data.startswith("0x") else hex_data
# First 64 chars are offset, next 64 are length
if len(hex_data) < 128:
return []
length_hex = hex_data[64:128]
length = int(length_hex, 16)
# Parse addresses (each address is 64 chars, but only last 40 are the address)
owners = []
data_start = 128
for i in range(length):
addr_start = data_start + (i * 64) + 24 # Skip padding
addr_hex = hex_data[addr_start:addr_start + 40]
if len(addr_hex) == 40:
owners.append(f"0x{addr_hex}")
return owners
except Exception:
return []
def parse_threshold_from_hex(hex_data: str) -> int:
"""Parse threshold from hex response"""
try:
# Remove 0x prefix and convert to int
hex_data = hex_data[2:] if hex_data.startswith("0x") else hex_data
return int(hex_data, 16)
except Exception:
return 0
async def make_etherscan_request(url: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Make a request to Etherscan API with proper error handling"""
headers = {"User-Agent": USER_AGENT}
params["apikey"] = ETHERSCAN_API_KEY
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, params=params, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Etherscan API request failed: {e}")
return None
@mcp.tool()
async def get_proxy_admin(contract_address: str, network: str = "mainnet", rpc_url: str = None) -> str:
"""Get comprehensive proxy admin and ownership analysis for an upgradeable proxy contract.
This tool provides a complete ownership analysis including:
- Proxy admin address detection
- ProxyAdmin contract identification
- Ultimate owner discovery (if proxy admin is a contract)
- Full ownership chain mapping
Args:
contract_address: The contract address to check
network: Network name (mainnet, goerli, sepolia, polygon, arbitrum) or use 'custom' with rpc_url
rpc_url: Custom RPC URL to use (optional, overrides network setting)
Returns:
JSON string containing comprehensive proxy admin and ownership information
"""
# Validate input
if not validate_ethereum_address(contract_address):
return json.dumps({
"error": f"Invalid Ethereum address: {contract_address}"
})
# Determine RPC URL to use
if rpc_url:
# Use custom RPC URL if provided
final_rpc_url = rpc_url
network_name = f"custom ({rpc_url})"
else:
# Get network configuration
network_config = get_network_config(network)
final_rpc_url = network_config["rpc_url"]
network_name = network
logger.info(f"Getting proxy admin for {contract_address} on {network_name}")
try:
# Perform comprehensive ownership analysis
ownership_analysis = await analyze_ownership_chain(contract_address, final_rpc_url)
if not ownership_analysis["is_proxy"]:
# Not a proxy contract
result = ProxyAdminResponse(
contract_address=contract_address.lower(),
network=network_name,
is_proxy=False
)
else:
# Is a proxy contract - create comprehensive response
result = ProxyAdminResponse(
contract_address=contract_address.lower(),
network=network_name,
is_proxy=True,
proxy_admin=ownership_analysis["proxy_admin"],
proxy_type="TransparentUpgradeableProxy", # Simplified detection
proxy_admin_owner=ownership_analysis.get("proxy_admin_owner"),
is_proxy_admin_contract=ownership_analysis["is_proxy_admin_contract"],
ownership_chain=ownership_analysis["ownership_chain"]
)
return json.dumps(result.__dict__, indent=2)
except Exception as e:
logger.error(f"Error getting proxy admin: {e}")
return json.dumps({
"error": str(e),
"contract_address": contract_address.lower(),
"network": network_name if 'network_name' in locals() else network
})
@mcp.tool()
async def get_contract_roles(
contract_address: str,
network: str = "mainnet",
from_block: str = "earliest",
to_block: str = "latest"
) -> str:
"""Discover AccessControl roles and their assignments for a contract.
Args:
contract_address: The contract address to analyze
network: Network name (mainnet, goerli, sepolia, polygon, arbitrum)
from_block: Starting block number (default: earliest)
to_block: Ending block number (default: latest)
Returns:
JSON string containing contract roles information
"""
# Validate input
if not validate_ethereum_address(contract_address):
return json.dumps({
"error": f"Invalid Ethereum address: {contract_address}"
})
# Get network configuration
network_config = get_network_config(network)
etherscan_url = network_config["etherscan_url"]
logger.info(f"Getting contract roles for {contract_address} on {network}")
try:
# Query logs for AccessControl events
role_events = {}
for event_name, topic in ACCESS_CONTROL_EVENTS.items():
params = {
"module": "logs",
"action": "getLogs",
"fromBlock": from_block,
"toBlock": to_block,
"address": contract_address,
"topic0": topic,
}
data = await make_etherscan_request(etherscan_url, params)
if data and data.get("status") == "1":
for log in data.get("result", []):
if len(log.get("topics", [])) >= 2:
role_hash = log["topics"][1]
if role_hash not in role_events:
role_events[role_hash] = {
"grants": [],
"revokes": [],
"admin_changes": []
}
if event_name == "RoleGranted":
account = log["topics"][2] if len(log["topics"]) > 2 else None
if account:
# Convert from 32-byte hex to 20-byte address
account_address = "0x" + account[-40:].lower()
role_events[role_hash]["grants"].append(account_address)
elif event_name == "RoleRevoked":
account = log["topics"][2] if len(log["topics"]) > 2 else None
if account:
account_address = "0x" + account[-40:].lower()
role_events[role_hash]["revokes"].append(account_address)
# Process role events into role info
roles = []
for role_hash, events in role_events.items():
# Calculate current members (grants minus revokes)
current_members = []
for addr in events["grants"]:
if addr not in events["revokes"]:
current_members.append(addr)
# Remove duplicates
current_members = list(set(current_members))
role_info = RoleInfo(
role_hash=role_hash,
role_name=get_role_name(role_hash),
admin_role=WELL_KNOWN_ROLES.get("0x0000000000000000000000000000000000000000000000000000000000000000", "DEFAULT_ADMIN_ROLE"),
members=current_members,
total_grants=len(events["grants"]),
total_revokes=len(events["revokes"])
)
roles.append(role_info)
result = ContractRolesResponse(
contract_address=contract_address.lower(),
network=network,
roles=[role.__dict__ for role in roles],
total_roles=len(roles),
block_range={"from": from_block, "to": to_block}
)
return json.dumps(result.__dict__, indent=2)
except Exception as e:
logger.error(f"Error getting contract roles: {e}")
return json.dumps({
"error": str(e),
"contract_address": contract_address.lower(),
"network": network
})
@mcp.tool()
async def check_address_type(address: str, network: str = "mainnet", rpc_url: str = None) -> str:
"""Check if an address is an EOA, contract, or multisig with detailed analysis.
This tool analyzes an Ethereum address to determine its type:
- EOA (Externally Owned Account): Regular wallet address
- Contract: Smart contract address
- Multisig: Multi-signature wallet with threshold and owners
Args:
address: The Ethereum address to analyze
network: Network name (mainnet, goerli, sepolia, polygon, arbitrum) or use 'custom' with rpc_url
rpc_url: Custom RPC URL to use (optional, overrides network setting)
Returns:
JSON string containing address type analysis
"""
# Validate input
if not validate_ethereum_address(address):
return json.dumps({
"error": f"Invalid Ethereum address: {address}"
})
# Determine RPC URL to use
if rpc_url:
# Use custom RPC URL if provided
final_rpc_url = rpc_url
network_name = f"custom ({rpc_url})"
else:
# Get network configuration
network_config = get_network_config(network)
final_rpc_url = network_config["rpc_url"]
network_name = network
logger.info(f"Checking address type for {address} on {network_name}")
try:
# Check if address is a contract
is_contract = await check_if_contract(address, final_rpc_url)
if not is_contract:
# It's an EOA
result = AddressTypeResponse(
address=address.lower(),
network=network_name,
address_type="eoa",
is_contract=False,
contract_type=None,
multisig_info=None
)
else:
# It's a contract - check if it's a multisig
multisig_info = await detect_multisig_type(address, final_rpc_url)
if multisig_info:
# It's a multisig
result = AddressTypeResponse(
address=address.lower(),
network=network_name,
address_type="multisig",
is_contract=True,
contract_type=multisig_info["type"],
multisig_info=multisig_info
)
else:
# It's a regular contract
result = AddressTypeResponse(
address=address.lower(),
network=network_name,
address_type="contract",
is_contract=True,
contract_type="unknown",
multisig_info=None
)
return json.dumps(result.__dict__, indent=2)
except Exception as e:
logger.error(f"Error checking address type: {e}")
return json.dumps({
"error": str(e),
"address": address.lower(),
"network": network_name if 'network_name' in locals() else network
})
if __name__ == "__main__":
# Debug environment information
import os
logger.info(f"Python executable: {sys.executable}")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"PATH: {os.environ.get('PATH', 'NOT SET')}")
# Check if cast is available
try:
result = subprocess.run(["cast", "--version"], capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"Cast version: {result.stdout.strip()}")
else:
logger.warning("Cast command not found - proxy admin detection will not work")
except FileNotFoundError:
logger.warning("Cast command not found - proxy admin detection will not work")
# Try to find cast in common locations
common_paths = [
"/usr/local/bin/cast",
"/opt/homebrew/bin/cast",
os.path.expanduser("~/.foundry/bin/cast"),
os.path.expanduser("~/.cargo/bin/cast")
]
for path in common_paths:
if os.path.exists(path):
logger.info(f"Found cast at: {path}")
try:
result = subprocess.run([path, "--version"], capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"Cast version (full path): {result.stdout.strip()}")
break
except Exception as e:
logger.warning(f"Failed to run cast at {path}: {e}")
# Initialize and run the server
logger.info("Starting Token Info MCP server...")
mcp.run(transport='stdio')