pyth_mcp_server.pyβ’1.85 kB
"""Pyth Network MCP Server - Official MCP Protocol Implementation."""
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent
import json
# Import MCP component modules
from src.tools import get_tool_definitions, handle_tool_call
from src.prompts import get_prompt_definitions, handle_get_prompt
from src.resources import get_resource_definitions, handle_read_resource
# Initialize MCP server
app = Server("pyth-network-mcp")
@app.list_tools()
async def list_tools():
"""List all available Pyth Network tools."""
return get_tool_definitions()
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls for Pyth Network operations."""
result = handle_tool_call(name, arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
@app.list_prompts()
async def list_prompts():
"""List available prompt templates for Pyth Network operations."""
return get_prompt_definitions()
@app.get_prompt()
async def get_prompt(name: str, arguments: dict[str, str] | None):
"""Generate prompt content based on the requested template."""
return handle_get_prompt(name, arguments)
@app.list_resources()
async def list_resources():
"""List available Pyth Network resources."""
return get_resource_definitions()
@app.read_resource()
async def read_resource(uri: str) -> str:
"""Read the content of a specific Pyth Network resource."""
return handle_read_resource(uri)
async def main():
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())