get_daily_open_close_agg
Retrieve daily open, close, high, and low prices for a specific ticker and date using Polygon-io MCP Server. Ideal for analyzing stock data trends over time.
Instructions
Get daily open, close, high, and low for a specific ticker and date.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| adjusted | No | ||
| date | Yes | ||
| params | No | ||
| ticker | Yes |
Input Schema (JSON Schema)
{
"properties": {
"adjusted": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"title": "Adjusted"
},
"date": {
"title": "Date",
"type": "string"
},
"params": {
"anyOf": [
{
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"title": "Params"
},
"ticker": {
"title": "Ticker",
"type": "string"
}
},
"required": [
"ticker",
"date"
],
"title": "get_daily_open_close_aggArguments",
"type": "object"
}
Implementation Reference
- src/mcp_massive/server.py:129-147 (handler)The handler function implementing the get_daily_open_close_agg MCP tool. It fetches daily OHLC data via the massive_client API and returns it formatted as CSV.@poly_mcp.tool(annotations=ToolAnnotations(readOnlyHint=True)) async def get_daily_open_close_agg( ticker: str, date: str, adjusted: Optional[bool] = None, params: Optional[Dict[str, Any]] = None, ) -> str: """ Get daily open, close, high, and low for a specific ticker and date. """ try: results = massive_client.get_daily_open_close_agg( ticker=ticker, date=date, adjusted=adjusted, params=params, raw=True ) return json_to_csv(results.data.decode("utf-8")) except Exception as e: return f"Error: {e}"
- src/mcp_massive/formatters.py:7-105 (helper)Helper function used by the tool to convert JSON API responses to CSV format for output.def json_to_csv(json_input: str | dict) -> str: """ Convert JSON to flattened CSV format. Args: json_input: JSON string or dict. If the JSON has a 'results' key containing a list, it will be extracted. Otherwise, the entire structure will be wrapped in a list for processing. Returns: CSV string with headers and flattened rows """ # Parse JSON if it's a string if isinstance(json_input, str): try: data = json.loads(json_input) except json.JSONDecodeError: # If JSON parsing fails, return empty CSV return "" else: data = json_input if isinstance(data, dict) and "results" in data: results_value = data["results"] # Handle both list and single object responses if isinstance(results_value, list): records = results_value elif isinstance(results_value, dict): # Single object response (e.g., get_last_trade returns results as object) records = [results_value] else: records = [results_value] elif isinstance(data, dict) and "last" in data: # Handle responses with "last" key (e.g., get_last_trade, get_last_quote) records = [data["last"]] if isinstance(data["last"], dict) else [data] elif isinstance(data, list): records = data else: records = [data] # Only flatten dict records, skip non-dict items flattened_records = [] for record in records: if isinstance(record, dict): flattened_records.append(_flatten_dict(record)) else: # If it's not a dict, wrap it in a dict with a 'value' key flattened_records.append({"value": str(record)}) if not flattened_records: return "" # Get all unique keys across all records (for consistent column ordering) all_keys = [] seen = set() for record in flattened_records: if isinstance(record, dict): for key in record.keys(): if key not in seen: all_keys.append(key) seen.add(key) output = io.StringIO() writer = csv.DictWriter(output, fieldnames=all_keys, lineterminator="\n") writer.writeheader() writer.writerows(flattened_records) return output.getvalue() def _flatten_dict( d: dict[str, Any], parent_key: str = "", sep: str = "_" ) -> dict[str, Any]: """ Flatten a nested dictionary by joining keys with separator. Args: d: Dictionary to flatten parent_key: Key from parent level (for recursion) sep: Separator to use between nested keys Returns: Flattened dictionary with no nested structures """ items = [] for k, v in d.items(): new_key = f"{parent_key}{sep}{k}" if parent_key else k if isinstance(v, dict): # Recursively flatten nested dicts items.extend(_flatten_dict(v, new_key, sep=sep).items()) elif isinstance(v, list): # Convert lists to comma-separated strings items.append((new_key, str(v))) else: items.append((new_key, v)) return dict(items)