import asyncio
import contextlib
from fastmcp import FastMCP
from binance_mcp.models import BinanceConnectivityResponse, BinanceServerTimeResponse, DateTimeResponse
from binance_mcp.settings import mcp_settings
from binance_mcp.tools import spot, timestamp
# Initialize the FastMCP server
mcp = FastMCP(name="binance-mcp")
@mcp.tool()
def get_current_datetime() -> DateTimeResponse:
"""
Get the current date and time in ISO8601 format.
Returns:
DateTimeResponse: Object containing:
- datetime_iso: Current date and time in ISO8601 format
- timestamp: Unix timestamp
- timezone: Timezone information (UTC)
"""
return timestamp.get_current_datetime()
@mcp.tool()
def spot_test_connectivity() -> BinanceConnectivityResponse:
"""
Test connectivity to the Binance Spot API.
Returns:
BinanceConnectivityResponse: Object containing:
- timestamp: ISO8601 timestamp of the connectivity check
- reachable: True if connectivity is successful, False otherwise
"""
return spot.spot_test_connectivity()
@mcp.tool()
def get_binance_server_time() -> BinanceServerTimeResponse:
"""
Get the current server time from Binance Spot API.
Returns:
BinanceServerTimeResponse: Object containing:
- server_time: Unix timestamp in milliseconds representing the Binance server time
"""
return spot.spot_server_time()
async def main() -> None:
await mcp.run_async(transport=mcp_settings.transport, port=mcp_settings.port)
def run():
"""
Run the FastMCP server.
This function is used to start the server when running the script directly.
"""
with contextlib.suppress(KeyboardInterrupt, SystemExit):
asyncio.run(main())
# Entry point
if __name__ == "__main__":
# Run the server
run()