exchange_history
Retrieve historical exchange rate data for currency pairs over specified date ranges to analyze trends and track currency performance.
Instructions
Get an exchange-rate time series for a currency pair and date interval.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| start | Yes | ||
| end | Yes | ||
| from_currency | Yes | ||
| to_currency | Yes |
Implementation Reference
- The core execute function that implements the exchange_history tool logic. It fetches historical exchange rate data from the Frankfurter API, processes the response, and returns a structured HistoryOutput with date-rate pairs.
async def execute(input_data: HistoryInput, client: FrankfurterClient) -> HistoryOutput: payload = await client.history( start_date=input_data.start_date.isoformat(), end_date=input_data.end_date.isoformat(), source_currency=input_data.from_currency, destination_currency=input_data.to_currency, ) payload_rates = payload.get("rates", {}) if not isinstance(payload_rates, dict): raise FrankfurterClientError("Frankfurter API returned malformed history payload") points: list[HistoryPoint] = [] for point_date in sorted(payload_rates.keys()): point_rates = payload_rates[point_date] if not isinstance(point_rates, dict): raise FrankfurterClientError("Frankfurter API returned malformed history point") rate_value = point_rates.get(input_data.to_currency) if rate_value is None: continue points.append(HistoryPoint(date=point_date, rate=float(rate_value))) return HistoryOutput( from_currency=input_data.from_currency, to_currency=input_data.to_currency, start_date=input_data.start_date, end_date=input_data.end_date, points=points, total=len(points), ) - Schema definitions for exchange_history tool: HistoryInput (input validation with date range and currency codes), HistoryPoint (individual date-rate data), and HistoryOutput (structured response with metadata and points list).
class HistoryInput(BaseModel): model_config = ConfigDict(populate_by_name=True, str_strip_whitespace=True) start_date: date = Field(alias="start") end_date: date = Field(alias="end") from_currency: str = Field(alias="from", min_length=3, max_length=3) to_currency: str = Field(alias="to", min_length=3, max_length=3) @field_validator("from_currency", "to_currency") @classmethod def normalize_currency(cls, value: str) -> str: value = _normalize_currency(value) if len(value) != 3 or not value.isalpha(): raise ValueError("currency must be a 3-letter ISO 4217 code") return value @model_validator(mode="after") def validate_range(self) -> HistoryInput: if self.end_date < self.start_date: raise ValueError("end date must be on or after start date") return self class HistoryPoint(BaseModel): date: date rate: float class HistoryOutput(BaseModel): from_currency: str to_currency: str start_date: date end_date: date points: list[HistoryPoint] total: int - src/frankfurter_forex_mcp/server.py:72-94 (registration)MCP tool registration for exchange_history. The @mcp.tool decorator registers exchange_history_tool which validates input, calls the execute handler, and handles validation/upstream/internal errors with appropriate error responses.
@mcp.tool(name="exchange_history") async def exchange_history_tool(start: str, end: str, from_currency: str, to_currency: str) -> dict: """Get an exchange-rate time series for a currency pair and date interval.""" try: input_data = HistoryInput( start=start, end=end, from_currency=from_currency, to_currency=to_currency, ) async with FrankfurterClient.from_env() as client: output = await exchange_history.execute(input_data, client) return output.model_dump(mode="json") except ValidationError as exc: return _to_error(str(exc), tool="exchange_history", error_code="validation_error") except FrankfurterClientError as exc: return _to_error(str(exc), tool="exchange_history", error_code="upstream_error") except Exception: return _to_error( "Unexpected internal error", tool="exchange_history", error_code="internal_error", ) - Helper function _to_error that standardizes error responses across all tools by wrapping error messages in an ErrorResponse schema with tool name, error code, and trace ID.
def _to_error(message: str, *, tool: str, error_code: str) -> dict: payload = ErrorResponse(message=message).model_dump(mode="json") payload["tool"] = tool payload["error_code"] = error_code payload["trace_id"] = str(uuid.uuid4()) return payload