"""Main MCP server implementation for Deribit integration."""
import asyncio
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.session import ServerSession
from .alerts import AlertManager, AlertStatus
from .config import settings
from .deribit_rest import DeribitRestClient
from .deribit_ws import DeribitWebSocketClient
from .notifications import NotificationManager
# Configure logging
import sys
logging.basicConfig(
level=getattr(logging, settings.log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stderr, # MCP servers must log to stderr
)
logger = logging.getLogger(__name__)
@dataclass
class AppContext:
"""Application context with shared resources."""
ws_client: DeribitWebSocketClient
rest_client: DeribitRestClient
alert_manager: AlertManager
notification_manager: NotificationManager
price_cache: Dict[str, float]
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""Manage application lifecycle with persistent connections."""
logger.info("Starting Deribit MCP Server...")
# Initialize components
notification_manager = NotificationManager()
logger.info(f"Available notification channels: {notification_manager.list_channels()}")
# Test Telegram on startup
if "telegram" in notification_manager.list_channels():
try:
logger.info("Testing Telegram notification on startup...")
test_result = await notification_manager.send_notification(
"telegram",
"π Deribit MCP Server Started!\n\nThe server is now running and ready to send price alerts.",
None
)
if test_result:
logger.info("β
Telegram test notification sent successfully!")
else:
logger.error("β Telegram test notification failed!")
except Exception as e:
logger.error(f"β Telegram test failed with exception: {e}", exc_info=True)
ws_client = DeribitWebSocketClient()
rest_client = DeribitRestClient()
# Create alert manager with notification callback
async def notification_callback(channel: str, message: str, alert: Any):
try:
result = await notification_manager.send_notification(channel, message, alert)
if result:
logger.info(f"Notification sent via {channel} for alert {alert.id}")
else:
logger.error(f"Failed to send notification via {channel}")
return result
except Exception as e:
logger.error(f"Exception in notification callback: {e}")
return False
alert_manager = AlertManager(notification_callback)
# Price cache for storing latest prices
price_cache: Dict[str, float] = {}
# Define price update handler
async def on_price_update(instrument: str, tick_data: Dict[str, Any]):
"""Handle incoming price updates."""
try:
last_price = tick_data.get("last_price")
if last_price:
price_cache[instrument] = last_price
await alert_manager.process_price_update(instrument, last_price)
except Exception as e:
logger.error(f"Error processing price update for {instrument}: {e}")
try:
# Connect to Deribit
await ws_client.connect()
await rest_client.connect()
logger.info("Deribit MCP Server started successfully")
# Store the price update callback for subscriptions
ws_client._price_update_callback = on_price_update
# Yield context to server
yield AppContext(
ws_client=ws_client,
rest_client=rest_client,
alert_manager=alert_manager,
notification_manager=notification_manager,
price_cache=price_cache,
)
finally:
# Cleanup on shutdown
logger.info("Shutting down Deribit MCP Server...")
await ws_client.disconnect()
await rest_client.disconnect()
logger.info("Shutdown complete")
# Create FastMCP server with lifespan
mcp = FastMCP(
"Deribit MCP Server",
lifespan=app_lifespan,
)
# ============================================================================
# TOOLS - Actionable functions that can be called by the LLM
# ============================================================================
@mcp.tool()
async def set_price_alert(
instrument: str,
condition: str,
threshold: float,
notification_channel: str = "telegram",
message: Optional[str] = None,
repeat: bool = False,
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""
Set a price alert for a cryptocurrency instrument.
Args:
instrument: Trading instrument (e.g., "BTC-PERPETUAL", "ETH-PERPETUAL")
condition: Alert condition - "above", "below", "crosses_above", "crosses_below", "percentage_change"
threshold: Price threshold or percentage for the alert
notification_channel: Channel to send notifications (default: "telegram")
message: Custom alert message (optional)
repeat: Whether alert should re-trigger after cooldown (default: False)
Returns:
Confirmation message with alert ID
"""
try:
app_ctx = ctx.request_context.lifespan_context
alert_manager = app_ctx.alert_manager
ws_client = app_ctx.ws_client
# Create the alert
alert = await alert_manager.add_alert(
instrument=instrument,
condition=condition,
threshold=threshold,
notification_channel=notification_channel,
message=message,
repeat=repeat,
)
# Subscribe to price updates for this instrument
await ws_client.subscribe_ticker(
instrument, ws_client._price_update_callback
)
# Immediately check current price to trigger alert if conditions are met
try:
ticker = await ws_client.get_ticker(instrument)
current_price = ticker.get("last_price")
if current_price:
logger.info(f"Current price for {instrument}: ${current_price}")
# Update cache
app_ctx.price_cache[instrument] = current_price
# Check alert immediately
await alert_manager.process_price_update(instrument, current_price)
else:
logger.warning(f"No price data in ticker response")
except Exception as e:
logger.error(f"Could not perform immediate price check: {e}")
return (
f"β
Alert created successfully!\n\n"
f"Alert ID: {alert.id}\n"
f"Instrument: {alert.instrument}\n"
f"Condition: {alert.condition.value}\n"
f"Threshold: {threshold}\n"
f"Channel: {notification_channel}\n"
f"Repeat: {repeat}"
)
except Exception as e:
logger.error(f"Error creating alert: {e}")
return f"β Error creating alert: {str(e)}"
@mcp.tool()
async def remove_alert(
alert_id: str,
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""
Remove a price alert by its ID.
Args:
alert_id: The unique identifier of the alert to remove
Returns:
Confirmation message
"""
try:
app_ctx = ctx.request_context.lifespan_context
alert_manager = app_ctx.alert_manager
success = await alert_manager.remove_alert(alert_id)
if success:
return f"β
Alert {alert_id} removed successfully"
else:
return f"β Alert {alert_id} not found"
except Exception as e:
logger.error(f"Error removing alert: {e}")
return f"β Error removing alert: {str(e)}"
@mcp.tool()
async def list_alerts(
instrument: Optional[str] = None,
status: Optional[str] = None,
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""
List all configured price alerts.
Args:
instrument: Filter by instrument (optional)
status: Filter by status - "active", "triggered", "cancelled" (optional)
Returns:
Formatted list of alerts
"""
try:
app_ctx = ctx.request_context.lifespan_context
alert_manager = app_ctx.alert_manager
status_enum = AlertStatus(status) if status else None
alerts = await alert_manager.list_alerts(instrument, status_enum)
if not alerts:
return "No alerts found"
result = f"π Active Alerts ({len(alerts)})\n\n"
for alert in alerts:
result += (
f"ID: {alert.id}\n"
f"Instrument: {alert.instrument}\n"
f"Condition: {alert.condition.value}\n"
f"Threshold: {alert.threshold}\n"
f"Status: {alert.status.value}\n"
f"Created: {alert.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
)
if alert.triggered_at:
result += f"Triggered: {alert.triggered_at.strftime('%Y-%m-%d %H:%M:%S')}\n"
result += "\n" + "-" * 50 + "\n\n"
return result
except Exception as e:
logger.error(f"Error listing alerts: {e}")
return f"β Error listing alerts: {str(e)}"
@mcp.tool()
async def get_current_price(
instrument: str,
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""
Get the current price for an instrument.
Args:
instrument: Trading instrument (e.g., "BTC-PERPETUAL")
Returns:
Current price information
"""
try:
app_ctx = ctx.request_context.lifespan_context
ws_client = app_ctx.ws_client
price_cache = app_ctx.price_cache
# Try to get from cache first
if instrument in price_cache:
cached_price = price_cache[instrument]
return (
f"π° Current Price\n\n"
f"Instrument: {instrument}\n"
f"Price: ${cached_price:,.2f}\n"
f"Source: WebSocket (cached)"
)
# If not in cache, fetch from WebSocket
ticker = await ws_client.get_ticker(instrument)
last_price = ticker.get("last_price", 0)
mark_price = ticker.get("mark_price", 0)
best_bid = ticker.get("best_bid_price", 0)
best_ask = ticker.get("best_ask_price", 0)
# Update cache
if last_price:
price_cache[instrument] = last_price
return (
f"π° Current Price\n\n"
f"Instrument: {instrument}\n"
f"Last Price: ${last_price:,.2f}\n"
f"Mark Price: ${mark_price:,.2f}\n"
f"Best Bid: ${best_bid:,.2f}\n"
f"Best Ask: ${best_ask:,.2f}\n"
f"Spread: ${(best_ask - best_bid):,.2f}"
)
except Exception as e:
logger.error(f"Error getting price for {instrument}: {e}")
return f"β Error getting price: {str(e)}"
@mcp.tool()
async def get_instruments(
currency: str = "BTC",
kind: str = "future",
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""
Get available trading instruments.
Args:
currency: Currency code (default: "BTC")
kind: Instrument kind - "future", "option", "spot" (default: "future")
Returns:
List of available instruments
"""
try:
app_ctx = ctx.request_context.lifespan_context
rest_client = app_ctx.rest_client
instruments = await rest_client.get_instruments(currency, kind)
if not instruments:
return f"No {kind} instruments found for {currency}"
result = f"π Available {kind.upper()} Instruments ({len(instruments)})\n\n"
for inst in instruments[:20]: # Limit to 20 for readability
name = inst.get("instrument_name", "N/A")
tick_size = inst.get("tick_size", 0)
min_trade = inst.get("min_trade_amount", 0)
result += (
f"β’ {name}\n"
f" Tick Size: {tick_size}\n"
f" Min Trade: {min_trade}\n\n"
)
if len(instruments) > 20:
result += f"\n... and {len(instruments) - 20} more instruments"
return result
except Exception as e:
logger.error(f"Error getting instruments: {e}")
return f"β Error getting instruments: {str(e)}"
@mcp.tool()
async def get_account_summary(
currency: str = "BTC",
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""
Get account summary and balance information.
Args:
currency: Currency code (default: "BTC")
Returns:
Account summary information
"""
try:
app_ctx = ctx.request_context.lifespan_context
rest_client = app_ctx.rest_client
summary = await rest_client.get_account_summary(currency)
equity = summary.get("equity", 0)
balance = summary.get("balance", 0)
margin_balance = summary.get("margin_balance", 0)
available_funds = summary.get("available_funds", 0)
maintenance_margin = summary.get("maintenance_margin", 0)
return (
f"πΌ Account Summary ({currency})\n\n"
f"Balance: {balance:.8f} {currency}\n"
f"Equity: {equity:.8f} {currency}\n"
f"Margin Balance: {margin_balance:.8f} {currency}\n"
f"Available Funds: {available_funds:.8f} {currency}\n"
f"Maintenance Margin: {maintenance_margin:.8f} {currency}"
)
except Exception as e:
logger.error(f"Error getting account summary: {e}")
return f"β Error getting account summary: {str(e)}"
@mcp.tool()
async def get_positions(
currency: str = "BTC",
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""
Get current open positions.
Args:
currency: Currency code (default: "BTC")
Returns:
List of open positions
"""
try:
app_ctx = ctx.request_context.lifespan_context
rest_client = app_ctx.rest_client
positions = await rest_client.get_positions(currency)
if not positions:
return f"No open positions for {currency}"
result = f"π Open Positions ({len(positions)})\n\n"
for pos in positions:
instrument = pos.get("instrument_name", "N/A")
size = pos.get("size", 0)
direction = pos.get("direction", "N/A")
avg_price = pos.get("average_price", 0)
mark_price = pos.get("mark_price", 0)
pnl = pos.get("total_profit_loss", 0)
result += (
f"Instrument: {instrument}\n"
f"Direction: {direction}\n"
f"Size: {size}\n"
f"Avg Entry: ${avg_price:,.2f}\n"
f"Mark Price: ${mark_price:,.2f}\n"
f"P&L: {pnl:,.4f} {currency}\n\n"
f"{'-' * 50}\n\n"
)
return result
except Exception as e:
logger.error(f"Error getting positions: {e}")
return f"β Error getting positions: {str(e)}"
# ============================================================================
# RESOURCES - Read-only data endpoints
# ============================================================================
@mcp.resource("deribit://price/{instrument}")
async def get_price_resource(
instrument: str,
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""Resource endpoint for current price data."""
try:
app_ctx = ctx.request_context.lifespan_context
price_cache = app_ctx.price_cache
if instrument in price_cache:
price = price_cache[instrument]
return f'{{"instrument": "{instrument}", "price": {price}}}'
return f'{{"instrument": "{instrument}", "price": null, "error": "No price data available"}}'
except Exception as e:
return f'{{"error": "{str(e)}"}}'
@mcp.resource("deribit://alerts")
async def get_alerts_resource(
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""Resource endpoint for all active alerts."""
try:
app_ctx = ctx.request_context.lifespan_context
alert_manager = app_ctx.alert_manager
alerts = await alert_manager.list_alerts()
alerts_data = [alert.to_dict() for alert in alerts]
import json
return json.dumps({"alerts": alerts_data, "count": len(alerts)}, indent=2)
except Exception as e:
return f'{{"error": "{str(e)}"}}'
@mcp.resource("deribit://account/{currency}")
async def get_account_resource(
currency: str,
ctx: Context[ServerSession, AppContext] = None,
) -> str:
"""Resource endpoint for account information."""
try:
app_ctx = ctx.request_context.lifespan_context
rest_client = app_ctx.rest_client
summary = await rest_client.get_account_summary(currency)
import json
return json.dumps(summary, indent=2)
except Exception as e:
return f'{{"error": "{str(e)}"}}'
# ============================================================================
# Main entry point
# ============================================================================
if __name__ == "__main__":
logger.info("Initializing Deribit MCP Server...")
mcp.run(transport="stdio")