pyth_mcp_client.pyβ’3.22 kB
"""Example client for testing the Pyth Network MCP server."""
import asyncio
import os
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
import time
async def main():
"""Run example interactions with the Pyth Network MCP server."""
# Define BTC/USD price feed ID
btc_usd_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"
# Create server parameters for stdio connection
server_params = StdioServerParameters(
command="python3",
args=["pyth_mcp_server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()
print("=" * 80)
print("Pyth Network MCP Server - Example Client")
print("=" * 80)
print()
# Example 1: Search for Bitcoin price feeds
print("=== Example 1: Search for Bitcoin price feeds ===")
result = await session.call_tool(
"get_price_feeds",
arguments={"query": "bitcoin", "asset_type": "crypto"}
)
print(result.content[0].text)
print()
# Example 2: Get latest BTC/USD price
print("=== Example 2: Get latest BTC/USD price ===")
result = await session.call_tool(
"get_latest_price_updates",
arguments={"ids": [btc_usd_id], "parsed": True}
)
print(result.content[0].text)
print()
# Example 3: Get historical price from 5 minutes ago
print("=== Example 3: Get historical price at timestamp ===")
five_minutes_ago = int(time.time()) - 300
result = await session.call_tool(
"get_price_updates_at_time",
arguments={
"publish_time": five_minutes_ago,
"ids": [btc_usd_id],
"parsed": True
}
)
print(f"Price at timestamp {five_minutes_ago}:")
print(result.content[0].text)
print()
# Example 4: Get publisher stake caps
print("=== Example 4: Get publisher stake caps ===")
result = await session.call_tool(
"get_publisher_stake_caps",
arguments={"parsed": True}
)
print(result.content[0].text)
print()
# Example 5: Get 5-minute TWAP
print("=== Example 5: Get 5-minute TWAP for BTC/USD ===")
result = await session.call_tool(
"get_twap_latest",
arguments={
"window_seconds": 300,
"ids": [btc_usd_id],
"parsed": True
}
)
print(result.content[0].text)
print()
print("=" * 80)
print("All examples completed!")
print("=" * 80)
if __name__ == "__main__":
asyncio.run(main())