"""MCP Server implementation for Opinion.trade prediction markets."""
import logging
import json
from typing import Any, Dict, Optional
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import types
from pydantic import ValidationError
from .config import OpinionConfig
from .public_client import PublicClient, OpinionAPIError
from .trading_client import TradingClient, TradingAPIError
from .models import (
GetMarketsRequest,
GetMarketDetailsRequest,
GetTokenPriceRequest,
GetOrderbookRequest,
GetPriceHistoryRequest,
SearchMarketsRequest,
PlaceOrderRequest,
CancelOrderRequest,
CancelAllOrdersRequest,
GetOpenOrdersRequest,
GetPositionsRequest,
GetTradeHistoryRequest,
GetBalancesRequest,
APIError
)
logger = logging.getLogger(__name__)
def create_opinion_server(config: OpinionConfig) -> Server:
"""Create and configure the Opinion.trade MCP server.
Args:
config: Opinion.trade configuration
Returns:
Configured MCP Server instance with dual-mode support
"""
server = Server("opinion-trade")
# Initialize public client (always available for market data)
public_client = PublicClient(
api_key=config.api_key,
base_url=config.api_host,
timeout=config.api_timeout
)
# Initialize trading client (only if trading enabled)
trading_client = None
if config.enable_trading:
try:
trading_client = TradingClient(config)
logger.info("Trading client initialized - Trading mode active")
except (ImportError, TradingAPIError) as e:
logger.warning(f"Trading client initialization failed: {e}")
logger.warning("Server will run in read-only mode")
def create_error_response(error: str, error_code: Optional[str] = None) -> str:
"""Create a structured error response."""
error_response = APIError(error=error, error_code=error_code)
return error_response.model_dump_json()
@server.list_tools()
async def list_tools() -> list[types.Tool]:
"""List all available tools based on operational mode."""
# Public API tools (always available)
tools = [
types.Tool(
name="get_markets",
description="List prediction markets with filtering options (limit, offset, status)",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum markets to return (1-500)",
"minimum": 1,
"maximum": 500,
"default": 100
},
"offset": {
"type": "integer",
"description": "Offset for pagination",
"minimum": 0,
"default": 0
},
"status": {
"type": "string",
"description": "Filter by status (active/closed/all)",
"enum": ["active", "closed", "all"]
}
},
"required": []
}
),
types.Tool(
name="get_market_details",
description="Get detailed information about a specific market by ID",
inputSchema={
"type": "object",
"properties": {
"market_id": {
"type": "string",
"description": "Market ID to retrieve",
"minLength": 1
}
},
"required": ["market_id"]
}
),
types.Tool(
name="get_token_price",
description="Get current price for a specific token/outcome",
inputSchema={
"type": "object",
"properties": {
"token_id": {
"type": "string",
"description": "Token ID for price lookup",
"minLength": 1
}
},
"required": ["token_id"]
}
),
types.Tool(
name="get_orderbook",
description="Get order book (bids/asks) for a token",
inputSchema={
"type": "object",
"properties": {
"token_id": {
"type": "string",
"description": "Token ID for orderbook",
"minLength": 1
},
"depth": {
"type": "integer",
"description": "Number of price levels (1-100)",
"minimum": 1,
"maximum": 100,
"default": 20
}
},
"required": ["token_id"]
}
),
types.Tool(
name="get_price_history",
description="Get historical price data (OHLCV) for a token",
inputSchema={
"type": "object",
"properties": {
"token_id": {
"type": "string",
"description": "Token ID for price history",
"minLength": 1
},
"timeframe": {
"type": "string",
"description": "Timeframe for OHLCV data",
"enum": ["1m", "5m", "15m", "1h", "4h", "1d", "7d", "30d"],
"default": "1h"
},
"limit": {
"type": "integer",
"description": "Number of data points (1-1000)",
"minimum": 1,
"maximum": 1000,
"default": 100
}
},
"required": ["token_id"]
}
),
types.Tool(
name="search_markets",
description="Search markets by keyword or phrase",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
"minLength": 1
},
"limit": {
"type": "integer",
"description": "Maximum results (1-100)",
"minimum": 1,
"maximum": 100,
"default": 20
}
},
"required": ["query"]
}
),
]
# Add trading tools only if trading is enabled
if trading_client:
trading_tools = [
types.Tool(
name="place_order",
description="Place a limit or market order with EIP712 signing",
inputSchema={
"type": "object",
"properties": {
"token_id": {
"type": "string",
"description": "Token ID to trade",
"minLength": 1
},
"side": {
"type": "string",
"enum": ["BUY", "SELL"],
"description": "Order side"
},
"amount": {
"type": "number",
"description": "Order amount/size",
"exclusiveMinimum": 0
},
"price": {
"type": "number",
"description": "Limit price (0-1, required for LIMIT orders)",
"minimum": 0,
"maximum": 1
},
"order_type": {
"type": "string",
"enum": ["LIMIT", "MARKET"],
"description": "Order type",
"default": "LIMIT"
}
},
"required": ["token_id", "side", "amount"]
}
),
types.Tool(
name="cancel_order",
description="Cancel a specific order by ID",
inputSchema={
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "Order ID to cancel",
"minLength": 1
}
},
"required": ["order_id"]
}
),
types.Tool(
name="cancel_all_orders",
description="Cancel all open orders (optionally filtered by market)",
inputSchema={
"type": "object",
"properties": {
"market_id": {
"type": "string",
"description": "Optional market filter"
}
},
"required": []
}
),
types.Tool(
name="get_open_orders",
description="Get user's open orders (optionally filtered by market)",
inputSchema={
"type": "object",
"properties": {
"market_id": {
"type": "string",
"description": "Optional market filter"
}
},
"required": []
}
),
types.Tool(
name="get_positions",
description="Get current positions with P&L (optionally filtered by market)",
inputSchema={
"type": "object",
"properties": {
"market_id": {
"type": "string",
"description": "Optional market filter"
}
},
"required": []
}
),
types.Tool(
name="get_trade_history",
description="Get executed trades history with optional filters",
inputSchema={
"type": "object",
"properties": {
"market_id": {
"type": "string",
"description": "Optional market filter"
},
"limit": {
"type": "integer",
"description": "Maximum trades to return (1-1000)",
"minimum": 1,
"maximum": 1000,
"default": 100
},
"start_time": {
"type": "integer",
"description": "Start timestamp (ms)"
},
"end_time": {
"type": "integer",
"description": "End timestamp (ms)"
}
},
"required": []
}
),
types.Tool(
name="get_balances",
description="Get account balances (available + locked)",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
]
tools.extend(trading_tools)
return tools
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
"""Handle tool calls with validation and error handling."""
try:
# ================================================================
# Public API Tools (Market Data)
# ================================================================
if name == "get_markets":
request = GetMarketsRequest(**arguments)
result = await public_client.get_markets(
limit=request.limit,
offset=request.offset,
status=request.status
)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_market_details":
request = GetMarketDetailsRequest(**arguments)
result = await public_client.get_market_details(request.market_id)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_token_price":
request = GetTokenPriceRequest(**arguments)
result = await public_client.get_token_price(request.token_id)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_orderbook":
request = GetOrderbookRequest(**arguments)
result = await public_client.get_orderbook(request.token_id, request.depth)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_price_history":
request = GetPriceHistoryRequest(**arguments)
result = await public_client.get_price_history(
request.token_id,
request.timeframe,
request.limit
)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "search_markets":
request = SearchMarketsRequest(**arguments)
result = await public_client.search_markets(request.query, request.limit)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
# ================================================================
# Trading Tools (Requires Private Key)
# ================================================================
elif name in ["place_order", "cancel_order", "cancel_all_orders",
"get_open_orders", "get_positions", "get_trade_history", "get_balances"]:
if not trading_client:
error_msg = (
"Trading tools not available. "
"Set OPINION_PRIVATE_KEY in .env to enable trading mode."
)
return [types.TextContent(type="text", text=create_error_response(error_msg, "TRADING_DISABLED"))]
if name == "place_order":
request = PlaceOrderRequest(**arguments)
result = await trading_client.place_order(
token_id=request.token_id,
side=request.side,
amount=request.amount,
price=request.price,
order_type=request.order_type
)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "cancel_order":
request = CancelOrderRequest(**arguments)
result = await trading_client.cancel_order(request.order_id)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "cancel_all_orders":
request = CancelAllOrdersRequest(**arguments)
result = await trading_client.cancel_all_orders(request.market_id)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_open_orders":
request = GetOpenOrdersRequest(**arguments)
result = await trading_client.get_open_orders(request.market_id)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_positions":
request = GetPositionsRequest(**arguments)
result = await trading_client.get_positions(request.market_id)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_trade_history":
request = GetTradeHistoryRequest(**arguments)
result = await trading_client.get_trade_history(
market_id=request.market_id,
limit=request.limit,
start_time=request.start_time,
end_time=request.end_time
)
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_balances":
request = GetBalancesRequest(**arguments)
result = await trading_client.get_balances()
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
else:
error_msg = f"Unknown tool: {name}"
return [types.TextContent(type="text", text=create_error_response(error_msg, "UNKNOWN_TOOL"))]
except ValidationError as e:
logger.error(f"Validation error in {name}: {e}")
return [types.TextContent(type="text", text=create_error_response(str(e), "VALIDATION_ERROR"))]
except (OpinionAPIError, TradingAPIError) as e:
logger.error(f"API error in {name}: {e}")
return [types.TextContent(type="text", text=create_error_response(str(e), e.error_code if hasattr(e, 'error_code') else "API_ERROR"))]
except Exception as e:
logger.error(f"Unexpected error in {name}: {e}", exc_info=True)
return [types.TextContent(type="text", text=create_error_response(f"Unexpected error: {e}", "INTERNAL_ERROR"))]
# Store clients for cleanup
server._public_client = public_client
server._trading_client = trading_client
return server
async def run_server(config: OpinionConfig):
"""Run the Opinion.trade MCP server with stdio transport.
Args:
config: Opinion.trade configuration
"""
server = create_opinion_server(config)
logger.info(f"Starting Opinion.trade MCP server - {config.get_mode_description()}")
try:
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
finally:
# Cleanup
if hasattr(server, '_public_client'):
await server._public_client.close()
logger.info("Public client closed")
logger.info("Server shutdown complete")