get_trades
Retrieve historical trades for a stock by ticker symbol. Filter by date range and set row limit for paginated results.
Instructions
Historical trades for a stock.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| ticker | Yes | Stock symbol. | |
| timestamp_gte | No | Inclusive lower bound ("YYYY-MM-DD" or ns/ms unix). | |
| timestamp_lt | No | Exclusive upper bound. | |
| limit | No | Max rows. Default 50. | |
| cursor | No | Pagination cursor. |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- src/massive_mcp/tools/quotes.py:56-81 (handler)The get_trades handler function. Fetches historical trades for a stock via GET /v3/trades/{ticker} with optional timestamp filtering, limit, and cursor pagination.
@mcp.tool() async def get_trades( ticker: str, timestamp_gte: str | None = None, timestamp_lt: str | None = None, limit: int = 50, cursor: str | None = None, ) -> dict[str, Any]: """Historical trades for a stock. Args: ticker: Stock symbol. timestamp_gte: Inclusive lower bound ("YYYY-MM-DD" or ns/ms unix). timestamp_lt: Exclusive upper bound. limit: Max rows. Default 50. cursor: Pagination cursor. """ return await client.get( f"/v3/trades/{ticker}", { "timestamp.gte": timestamp_gte, "timestamp.lt": timestamp_lt, "limit": limit, "cursor": cursor, }, ) - src/massive_mcp/server.py:37-48 (registration)Registration: quotes.register(mcp, client) is called for each tool module in build_server().
for module in ( aggregates, quotes, snapshots, tickers, news, reference, indicators, corporate, financials, ): module.register(mcp, client) - src/massive_mcp/tools/quotes.py:10-11 (registration)The register() function that registers all tools in quotes.py (including get_trades) via the @mcp.tool() decorator.
def register(mcp: FastMCP, client: MassiveClient) -> None: @mcp.tool() - src/massive_mcp/client.py:32-116 (helper)MassiveClient.get() – the HTTP client helper used by get_trades to make the actual API call.
class MassiveClient: def __init__(self, settings: Settings) -> None: self._settings = settings headers = {"User-Agent": "massive-mcp/0.1"} if settings.auth_mode == "bearer": headers["Authorization"] = f"Bearer {settings.api_key}" self._http = httpx.AsyncClient( base_url=settings.base_url, headers=headers, timeout=DEFAULT_TIMEOUT, ) async def aclose(self) -> None: await self._http.aclose() async def get( self, path: str, params: dict[str, Any] | None = None, *, trim: bool = True ) -> dict[str, Any]: merged: dict[str, Any] = {k: v for k, v in (params or {}).items() if v is not None} if self._settings.auth_mode == "query": merged["apiKey"] = self._settings.api_key last_exc: Exception | None = None for attempt in range(MAX_RETRIES): try: resp = await self._http.get(path, params=merged) except httpx.HTTPError as exc: last_exc = exc await asyncio.sleep(2**attempt) continue if resp.status_code == 429: retry_after = float(resp.headers.get("Retry-After", 2**attempt)) await asyncio.sleep(min(retry_after, 30)) continue if 500 <= resp.status_code < 600 and attempt < MAX_RETRIES - 1: await asyncio.sleep(2**attempt) continue if resp.status_code == 401: hint = ( "auth rejected — verify MASSIVE_API_KEY; " "if you used MASSIVE_AUTH_MODE=bearer, try 'query' (or vice versa)" ) raise MassiveAPIError(401, hint, _strip_secrets(str(resp.request.url))) try: data = resp.json() except ValueError: data = {"raw": resp.text} if not resp.is_success: msg = data.get("error") or data.get("message") or resp.reason_phrase or "request failed" raise MassiveAPIError(resp.status_code, str(msg), _strip_secrets(str(resp.request.url))) return _trim(data) if trim else data raise MassiveAPIError(0, f"network error after {MAX_RETRIES} retries: {last_exc}", path) def _trim(data: dict[str, Any]) -> dict[str, Any]: """If `results` is a huge array, truncate and surface a hint to paginate.""" results = data.get("results") if isinstance(results, list) and len(results) > TRIM_THRESHOLD: kept = results[:TRIM_THRESHOLD] data = dict(data) data["results"] = kept data["_truncated_note"] = ( f"response had {len(results)} items; truncated to {TRIM_THRESHOLD}. " "Re-call with a tighter `limit` or use `cursor`/`next_url` to page." ) if "next_url" in data and data.get("next_url"): cursor = _extract_cursor(data["next_url"]) if cursor: data["next_cursor"] = cursor return data def _extract_cursor(next_url: str) -> str | None: parts = urlsplit(next_url) for kv in parts.query.split("&"): if kv.startswith("cursor="): return kv.split("=", 1)[1] return None - Input schema for get_trades: ticker (str), timestamp_gte (optional str), timestamp_lt (optional str), limit (int, default 50), cursor (optional str). Returns dict[str, Any].
async def get_trades( ticker: str, timestamp_gte: str | None = None, timestamp_lt: str | None = None, limit: int = 50, cursor: str | None = None, ) -> dict[str, Any]: