main.py•1.91 kB
import asyncio
import httpx
from mcp.server.fastmcp import FastMCP
import json
from datetime import datetime
# Initialize the MCP server
mcp = FastMCP("Binance Announcements", dependencies=["httpx"])
# Binance API endpoint
BINANCE_API_URL = "https://www.binance.com/bapi/composite/v1/public/market/notice/get"
@mcp.tool()
async def fetch_latest_announcements(count: int = 20, page: int = 1) -> str:
"""
Tool to fetch the latest Binance announcements in Markdown format.
Args:
count: Number of announcements to fetch (max 20).
page: Page number to fetch (default 1).
Returns:
Markdown string with announcement title, URL, and time.
"""
if count > 20:
raise ValueError("Count cannot exceed 20")
if page < 1:
raise ValueError("Page must be at least 1")
async with httpx.AsyncClient() as client:
try:
response = await client.get(BINANCE_API_URL, params={"page": page, "rows": count})
response.raise_for_status()
data = response.json()
if data["code"] != "000000":
raise Exception(f"API error: {data.get('message', 'Unknown error')}")
announcements = data["data"]
markdown = "# Binance Announcements\n\n"
for ann in announcements:
title = ann.get("title", "No Title")
url = ann.get("url", "No URL")
time = datetime.fromtimestamp(ann.get("time", 0) / 1000).strftime('%m-%d %H:%M:%S')
markdown += f"- [{title}]({url}) _{time}_\n"
return markdown if announcements else "# No Announcements Found\n"
except httpx.HTTPStatusError as e:
raise Exception(f"HTTP error: {e}")
except Exception as e:
raise Exception(f"Failed to fetch announcements: {e}")
if __name__ == "__main__":
mcp.run()