"""Public API client for Opinion.trade market data."""
import logging
from typing import Any, Dict, Optional
import httpx
logger = logging.getLogger(__name__)
class OpinionAPIError(Exception):
"""Exception raised for Opinion.trade API errors."""
def __init__(self, message: str, error_code: Optional[str] = None, status_code: Optional[int] = None):
self.message = message
self.error_code = error_code
self.status_code = status_code
super().__init__(self.message)
class PublicClient:
"""Client for Opinion.trade public API (market data).
Handles market data retrieval including:
- Markets listing and details
- Token prices and orderbooks
- Price history
- Market search
"""
def __init__(self, api_key: str, base_url: str, timeout: int = 30):
"""Initialize the public API client.
Args:
api_key: Opinion.trade API key
base_url: API base URL (e.g., https://proxy.opinion.trade:8443)
timeout: Request timeout in seconds
"""
self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=timeout)
async def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None
) -> Any:
"""Make an HTTP request to the Opinion.trade API.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
params: Query parameters
json_data: JSON request body
Returns:
Parsed response data from result field
Raises:
OpinionAPIError: If the API returns an error
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
headers = {
"apikey": self.api_key,
"Content-Type": "application/json"
}
try:
logger.debug(f"Request: {method} {url}")
response = await self.client.request(
method=method,
url=url,
headers=headers,
params=params,
json=json_data
)
response.raise_for_status()
data = response.json()
logger.debug(f"Response: {data}")
# Opinion.trade response format: {"errno": 0, "errmsg": "", "result": {...}}
errno = data.get("errno", -1)
if errno != 0:
error_msg = data.get("errmsg", "Unknown error")
raise OpinionAPIError(
message=error_msg,
error_code=str(errno),
status_code=response.status_code
)
# Extract result (can be dict with list/data, or direct list/dict)
result = data.get("result", {})
if isinstance(result, dict):
return result.get("list") or result.get("data") or result
return result
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error: {e}")
raise OpinionAPIError(
message=f"HTTP {e.response.status_code}: {e.response.text}",
status_code=e.response.status_code
)
except httpx.RequestError as e:
logger.error(f"Request error: {e}")
raise OpinionAPIError(
message=f"Request failed: {str(e)}"
)
async def get_markets(
self,
limit: int = 100,
offset: int = 0,
status: Optional[str] = None
) -> Dict[str, Any]:
"""Get list of prediction markets.
Args:
limit: Maximum markets to return (1-500)
offset: Offset for pagination
status: Filter by status (active/closed/all)
Returns:
Market list with pagination info
"""
params = {
"page": (offset // limit) + 1,
"limit": limit
}
if status and status != "all":
# Map to Opinion.trade status values
status_map = {
"active": "activated",
"closed": "resolved"
}
params["status"] = status_map.get(status, status)
return await self._request("GET", "/openapi/market", params=params)
async def get_market_details(self, market_id: str) -> Dict[str, Any]:
"""Get detailed information about a specific market.
Args:
market_id: Market ID to retrieve
Returns:
Market details
"""
return await self._request("GET", f"/openapi/market/{market_id}")
async def get_token_price(self, token_id: str) -> Dict[str, Any]:
"""Get current price for a token/outcome.
Args:
token_id: Token ID for price lookup
Returns:
Token price information
"""
params = {"tokenId": token_id}
return await self._request("GET", "/openapi/token/latest-price", params=params)
async def get_orderbook(self, token_id: str, depth: int = 20) -> Dict[str, Any]:
"""Get order book (bids/asks) for a token.
Args:
token_id: Token ID for orderbook
depth: Number of price levels (1-100)
Returns:
Orderbook with bids and asks
"""
params = {
"token_id": token_id,
"depth": depth
}
return await self._request("GET", "/openapi/token/orderbook", params=params)
async def get_price_history(
self,
token_id: str,
timeframe: str = "1h",
limit: int = 100
) -> Dict[str, Any]:
"""Get historical price data (OHLCV).
Args:
token_id: Token ID for price history
timeframe: Timeframe (1m, 5m, 15m, 1h, 4h, 1d, 7d, 30d)
limit: Number of data points (1-1000)
Returns:
Historical price data
"""
params = {
"tokenId": token_id,
"interval": timeframe,
"limit": limit
}
return await self._request("GET", "/openapi/token/price-history", params=params)
async def search_markets(self, query: str, limit: int = 20) -> Dict[str, Any]:
"""Search markets by keyword.
Args:
query: Search query
limit: Maximum results (1-100)
Returns:
Matching markets
"""
# Get all markets and filter client-side
# (Opinion.trade API may not have direct search endpoint)
markets_data = await self.get_markets(limit=limit * 2) # Get more to filter
markets = markets_data if isinstance(markets_data, list) else markets_data.get("list", [])
# Filter by query in title
query_lower = query.lower()
filtered = [
m for m in markets
if query_lower in m.get("marketTitle", "").lower()
]
return filtered[:limit]
async def close(self):
"""Close the HTTP client."""
await self.client.aclose()
logger.info("Public client closed")