get_aggs
Retrieve aggregate bars for a specific ticker within a defined date range and custom time window using specified parameters. Ideal for analyzing financial data trends over time.
Instructions
List aggregate bars for a ticker over a given date range in custom time window sizes.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| adjusted | No | ||
| from_ | Yes | ||
| limit | No | ||
| multiplier | Yes | ||
| params | No | ||
| sort | No | ||
| ticker | Yes | ||
| timespan | Yes | ||
| to | Yes |
Implementation Reference
- src/mcp_massive/server.py:32-64 (handler)The get_aggs tool handler: decorated with @poly_mcp.tool for registration, defines input parameters (serving as schema), calls massive_client.get_aggs API, formats response to CSV via json_to_csv helper, handles errors.@poly_mcp.tool(annotations=ToolAnnotations(readOnlyHint=True)) async def get_aggs( ticker: str, multiplier: int, timespan: str, from_: Union[str, int, datetime, date], to: Union[str, int, datetime, date], adjusted: Optional[bool] = None, sort: Optional[str] = None, limit: Optional[int] = 10, params: Optional[Dict[str, Any]] = None, ) -> str: """ List aggregate bars for a ticker over a given date range in custom time window sizes. """ try: results = massive_client.get_aggs( ticker=ticker, multiplier=multiplier, timespan=timespan, from_=from_, to=to, adjusted=adjusted, sort=sort, limit=limit, params=params, raw=True, ) # Parse the binary data to string and then to JSON return json_to_csv(results.data.decode("utf-8")) except Exception as e: return f"Error: {e}"
- src/mcp_massive/formatters.py:7-105 (helper)json_to_csv helper function imported and used in get_aggs to convert API JSON response to flattened CSV string for tool output. Includes _flatten_dict sub-helper.def json_to_csv(json_input: str | dict) -> str: """ Convert JSON to flattened CSV format. Args: json_input: JSON string or dict. If the JSON has a 'results' key containing a list, it will be extracted. Otherwise, the entire structure will be wrapped in a list for processing. Returns: CSV string with headers and flattened rows """ # Parse JSON if it's a string if isinstance(json_input, str): try: data = json.loads(json_input) except json.JSONDecodeError: # If JSON parsing fails, return empty CSV return "" else: data = json_input if isinstance(data, dict) and "results" in data: results_value = data["results"] # Handle both list and single object responses if isinstance(results_value, list): records = results_value elif isinstance(results_value, dict): # Single object response (e.g., get_last_trade returns results as object) records = [results_value] else: records = [results_value] elif isinstance(data, dict) and "last" in data: # Handle responses with "last" key (e.g., get_last_trade, get_last_quote) records = [data["last"]] if isinstance(data["last"], dict) else [data] elif isinstance(data, list): records = data else: records = [data] # Only flatten dict records, skip non-dict items flattened_records = [] for record in records: if isinstance(record, dict): flattened_records.append(_flatten_dict(record)) else: # If it's not a dict, wrap it in a dict with a 'value' key flattened_records.append({"value": str(record)}) if not flattened_records: return "" # Get all unique keys across all records (for consistent column ordering) all_keys = [] seen = set() for record in flattened_records: if isinstance(record, dict): for key in record.keys(): if key not in seen: all_keys.append(key) seen.add(key) output = io.StringIO() writer = csv.DictWriter(output, fieldnames=all_keys, lineterminator="\n") writer.writeheader() writer.writerows(flattened_records) return output.getvalue() def _flatten_dict( d: dict[str, Any], parent_key: str = "", sep: str = "_" ) -> dict[str, Any]: """ Flatten a nested dictionary by joining keys with separator. Args: d: Dictionary to flatten parent_key: Key from parent level (for recursion) sep: Separator to use between nested keys Returns: Flattened dictionary with no nested structures """ items = [] for k, v in d.items(): new_key = f"{parent_key}{sep}{k}" if parent_key else k if isinstance(v, dict): # Recursively flatten nested dicts items.extend(_flatten_dict(v, new_key, sep=sep).items()) elif isinstance(v, list): # Convert lists to comma-separated strings items.append((new_key, str(v))) else: items.append((new_key, v)) return dict(items)
- src/mcp_massive/server.py:32-32 (registration)@poly_mcp.tool decorator registers get_aggs as an MCP tool with readOnlyHint annotation.@poly_mcp.tool(annotations=ToolAnnotations(readOnlyHint=True))
- src/mcp_massive/server.py:33-43 (schema)Function signature with type hints defines the input schema and docstring for the get_aggs tool, used by FastMCP for tool schema generation.async def get_aggs( ticker: str, multiplier: int, timespan: str, from_: Union[str, int, datetime, date], to: Union[str, int, datetime, date], adjusted: Optional[bool] = None, sort: Optional[str] = None, limit: Optional[int] = 10, params: Optional[Dict[str, Any]] = None, ) -> str: