We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/haiqiubullish/nix-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""NIX Query Client for communicating with Rodeos endpoint"""
import sys
from pathlib import Path
# Add protobuf directory to path to handle subdirectory imports
protobuf_dir = Path(__file__).parent / "protobuf"
if str(protobuf_dir) not in sys.path:
sys.path.insert(0, str(protobuf_dir))
import json
import logging
import subprocess
import os
from typing import Any, Dict, Optional
from google.protobuf.message import Message
from google.protobuf.json_format import MessageToDict
logger = logging.getLogger(__name__)
class NixClient:
"""Client for interacting with NIX via Rodeos endpoint"""
def __init__(self, endpoint: str = "http://127.0.0.1:8880", timeout: int = 30):
"""
Initialize NIX client
Args:
endpoint: Rodeos HTTP endpoint URL
timeout: Request timeout in seconds
"""
self.endpoint = endpoint.rstrip('/')
self.timeout = timeout
self.nix_query_contract = "nix.q"
self.nix_contract = "nix"
# Set endpoints from environment or use defaults
self.rodeos_api = os.getenv("RODEOS_API", "http://rodeos-wasm-ql-blockchain-atoms-b1fs-dev.service.c-sin-g-atoms-b1fs-dev.int.b1fs.net:8880")
self.nodeos_api = os.getenv("NODEOS_API", "http://localhost:8888")
async def push_action_ro(self, contract: str, action: str, data: str, data_is_json: bool = False) -> Dict[str, Any]:
"""
Execute a read-only action on the blockchain using cleos
Args:
contract: Contract account name
action: Action name to execute
data: Action data (JSON string or will be converted to JSON)
data_is_json: Whether data is already in JSON format
Returns:
JSON response from the action
"""
# Prepare the action data
action_data = data if data_is_json else json.dumps(data)
# Build cleos command
cmd = [
"cleos", "-u", self.rodeos_api,
"push", "action",
"--use-old-send-rpc",
"--return-failure-trace",
"0", # account (0 for read-only)
contract, # use contract directly, not contract.q
action,
action_data,
"-sj" # silent with JSON output
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout,
check=False
)
if result.returncode != 0:
logger.error(f"cleos error: {result.stderr}")
raise Exception(f"cleos failed: {result.stderr}")
return json.loads(result.stdout)
except subprocess.TimeoutExpired:
logger.error(f"cleos timeout calling {action}")
raise
except json.JSONDecodeError as e:
logger.error(f"Failed to parse cleos output: {e}")
raise
except Exception as e:
logger.error(f"Error calling {action}: {e}")
raise
def protobuf_to_hex(self, message: Message) -> str:
"""
Convert a protobuf message to hex string
Args:
message: Protobuf message instance
Returns:
Hex-encoded protobuf data
"""
return message.SerializeToString().hex()
def hex_to_protobuf(self, hex_data: str, message_class: type) -> Message:
"""
Convert hex string to protobuf message
Args:
hex_data: Hex-encoded protobuf data
message_class: Protobuf message class
Returns:
Deserialized protobuf message
"""
message = message_class()
message.ParseFromString(bytes.fromhex(hex_data))
return message
def extract_return_value(self, trace: Dict[str, Any]) -> str:
"""
Extract return value from action trace
Args:
trace: Transaction trace response
Returns:
Hex-encoded return value
"""
try:
# Handle cleos output format
processed = trace.get("processed", {})
action_traces = processed.get("action_traces", [])
# cleos returns nested structure
if action_traces and len(action_traces) > 0:
# Navigate the nested structure
if isinstance(action_traces[0], list) and len(action_traces[0]) > 1:
trace_obj = action_traces[0][1]
return trace_obj.get("return_value", "")
# Fallback to direct return_value if present
if "return_value" in trace:
return trace["return_value"]
raise ValueError("No return value found in trace")
except (KeyError, IndexError) as e:
logger.error(f"Failed to extract return value: {e}")
raise ValueError(f"Invalid trace format: {e}")
def is_success_trace(self, trace: Dict[str, Any]) -> bool:
"""
Check if the action trace indicates success
Args:
trace: Transaction trace response
Returns:
True if successful, False otherwise
"""
if "error" in trace:
return False
try:
status = trace.get("processed", {}).get("receipt", {}).get("status")
return status == "executed"
except Exception:
return False
async def query_global_configs(self, blockchain: Optional[str] = None, network: Optional[str] = None) -> Dict[str, Any]:
"""
Query global configurations
Args:
blockchain: Optional blockchain name filter
network: Optional network name filter
Returns:
Global configurations as dict
"""
if blockchain and network:
# Create JSON data for the specific network query
action_data = {
"payload_version": {"major": 2},
"network_indentifier": {
"blockchain": blockchain,
"network": network
}
}
trace = await self.push_action_ro(
self.nix_query_contract,
"globalconfn",
json.dumps(action_data),
data_is_json=True
)
else:
# Query all configs
action_data = {"payload_version": {"major": 2}}
trace = await self.push_action_ro(
self.nix_query_contract,
"globalconfs",
json.dumps(action_data),
data_is_json=True
)
if not self.is_success_trace(trace):
error = trace.get("error", "Unknown error")
raise Exception(f"Failed to query global configs: {error}")
return_hex = self.extract_return_value(trace)
# Parse the protobuf response
from .protobuf.native_indexer import native_indexer_pb2
configs = native_indexer_pb2.GlobalConfigs()
configs.ParseFromString(bytes.fromhex(return_hex))
return MessageToDict(configs, preserving_proto_field_name=True)
async def query_network_status(self, blockchain: str, network: str) -> Dict[str, Any]:
"""
Query network status
Args:
blockchain: Blockchain name
network: Network name
Returns:
Network status as dict
"""
from .protobuf import native_indexer_pb2
query = native_indexer_pb2.NetworkIdentifier()
query.blockchain = blockchain
query.network = network
trace = await self.push_action_ro(
self.nix_query_contract,
"nwstatus",
self.protobuf_to_hex(query)
)
if not self.is_success_trace(trace):
error = trace.get("error", "Unknown error")
raise Exception(f"Failed to query network status: {error}")
return_hex = self.extract_return_value(trace)
status = native_indexer_pb2.NetworkStatus()
status.ParseFromString(bytes.fromhex(return_hex))
return MessageToDict(status, preserving_proto_field_name=True)
async def query_blocks(self, blockchain: str, network: str,
block_height: Optional[int] = None,
from_index: Optional[int] = None,
to_index: Optional[int] = None,
max_count: int = 10) -> Dict[str, Any]:
"""
Query blocks
Args:
blockchain: Blockchain name
network: Network name
block_height: Specific block height to query
from_index: Start block index
to_index: End block index
max_count: Maximum blocks to return
Returns:
Block metadata as dict
"""
from .protobuf import native_indexer_pb2
query = native_indexer_pb2.BlockMetasQuery()
query.network.blockchain = blockchain
query.network.network = network
query.max_block_count = max_count
if block_height is not None:
query.from_index = block_height
query.to_index = block_height
else:
query.from_index = from_index or 0
query.to_index = to_index or 0
trace = await self.push_action_ro(
self.nix_query_contract,
"blocks",
self.protobuf_to_hex(query)
)
if not self.is_success_trace(trace):
error = trace.get("error", "Unknown error")
raise Exception(f"Failed to query blocks: {error}")
return_hex = self.extract_return_value(trace)
blocks = native_indexer_pb2.BlockMetas()
blocks.ParseFromString(bytes.fromhex(return_hex))
return MessageToDict(blocks, preserving_proto_field_name=True)