get_statistics
Retrieve aggregated statistics (mean, min, max) for a Home Assistant entity over a specified time range, using data from the recorder.
Instructions
Get long-term aggregated statistics for an entity over the last N hours.
Uses HA's recorder statistics (over WebSocket) — aggregated buckets
(mean / min / max per period) that survive the short-term retention
window. Use this instead of get_history when:
- You want data older than the recorder's default 10-day window.
- You want aggregated values rather than every individual change.
- The entity is a high-frequency sensor (temperature, power) and
raw history would be too many tokens.
Args:
entity_id: The entity (must have a state_class HA records as
statistics — measurement, total, total_increasing).
hours: How far back from now. Defaults to 24.
period: Bucket size — 5minute, hour, day, week, month.
Defaults to hour.
Returns:
entity_id, period, start_time, end_time, statistics
(list of {start, end, mean, min, max, ...} points).
Examples: get_statistics("sensor.power_usage", hours=168, period="day") get_statistics("sensor.temperature", hours=24)
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| entity_id | Yes | ||
| hours | No | ||
| period | No | hour |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- app/server.py:1350-1389 (handler)The MCP tool handler for get_statistics. Decorated with @mcp.tool() and @async_handler('get_statistics'). Takes entity_id, hours (default 24), period (default 'hour') and delegates to get_entity_statistics in app/hass.py.
@mcp.tool() @async_handler("get_statistics") async def get_statistics( entity_id: str, hours: int = 24, period: str = "hour", ) -> Dict[str, Any]: """ Get long-term aggregated statistics for an entity over the last N hours. Uses HA's recorder statistics (over WebSocket) — aggregated buckets (mean / min / max per period) that survive the short-term retention window. Use this instead of `get_history` when: - You want data older than the recorder's default 10-day window. - You want aggregated values rather than every individual change. - The entity is a high-frequency sensor (temperature, power) and raw history would be too many tokens. Args: entity_id: The entity (must have a `state_class` HA records as statistics — `measurement`, `total`, `total_increasing`). hours: How far back from now. Defaults to 24. period: Bucket size — `5minute`, `hour`, `day`, `week`, `month`. Defaults to `hour`. Returns: `entity_id`, `period`, `start_time`, `end_time`, `statistics` (list of `{start, end, mean, min, max, ...}` points). Examples: get_statistics("sensor.power_usage", hours=168, period="day") get_statistics("sensor.temperature", hours=24) """ logger.info( f"Getting statistics for {entity_id}: last {hours}h, period={period}" ) try: return await get_entity_statistics(entity_id, hours=hours, period=period) except ValueError as e: return {"entity_id": entity_id, "error": str(e), "statistics": []} - app/hass.py:774-791 (handler)Core business logic for get_statistics. Computes start/end datetime range and delegates to get_entity_statistics_range.
async def get_entity_statistics( entity_id: str, hours: int = 24, period: str = "hour", ) -> Dict[str, Any]: """Convenience wrapper: last N hours of statistics for an entity. Args: entity_id: The entity to fetch statistics for. hours: How far back from now (UTC) to query. Defaults to 24. period: Bucket size — `5minute`, `hour`, `day`, `week`, `month`. Returns: Same shape as `get_entity_statistics_range`. """ end_dt = datetime.now(timezone.utc) start_dt = end_dt - timedelta(hours=hours) return await get_entity_statistics_range(entity_id, start_dt, end_dt, period) - app/hass.py:717-771 (handler)Core business logic that calls HA WebSocket API. Validates period and date range, then calls call_ws with 'recorder/statistics_during_period' to fetch statistics over WebSocket. Returns structured response with entity_id, period, times and statistics list.
@handle_api_errors async def get_entity_statistics_range( entity_id: str, start_time: Union[str, datetime], end_time: Optional[Union[str, datetime]] = None, period: str = "hour", ) -> Dict[str, Any]: """Get long-term statistics for an entity over a date/time range. Hits HA's `recorder/statistics_during_period` over the WebSocket API. Statistics survive the recorder's short-term retention window (default 10 days), so this is the right call for anything older than that or anything you want aggregated (mean / min / max per period). Args: entity_id: The entity (must have a `state_class` that HA records as statistics — e.g. `measurement`, `total_increasing`). start_time: ISO-8601 string or datetime, UTC if naive. end_time: ISO-8601 string or datetime, defaults to now. period: Aggregation bucket — one of `5minute`, `hour`, `day`, `week`, `month`. Defaults to `hour`. Returns: ``{"entity_id", "period", "start_time", "end_time", "statistics"}``. ``statistics`` is the list HA returned (each entry has `start`, `end`, `mean`, `min`, `max`, optionally `sum`/`state`). """ if period not in _STATISTICS_PERIODS: raise ValueError( f"period must be one of {sorted(_STATISTICS_PERIODS)}, got {period!r}" ) start_dt = _parse_iso_dt(start_time) end_dt = _parse_iso_dt(end_time) if end_time is not None else datetime.now(timezone.utc) if start_dt >= end_dt: raise ValueError("start_time must be before end_time") from app.ws import call_ws start_iso = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ") end_iso = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ") result = await call_ws( "recorder/statistics_during_period", start_time=start_iso, end_time=end_iso, statistic_ids=[entity_id], period=period, ) return { "entity_id": entity_id, "period": period, "start_time": start_iso, "end_time": end_iso, # HA returns `{entity_id: [points...]}`; flatten to the list when # we only asked for one entity. "statistics": (result or {}).get(entity_id, []) if isinstance(result, dict) else result, } - app/ws.py:38-79 (helper)WebSocket client helper that opens a fresh connection, authenticates, sends the request type 'recorder/statistics_during_period' with payload, and returns the result. Used by get_entity_statistics_range.
async def call_ws(message_type: str, **payload: Any) -> Any: """Send a single request over the HA WebSocket API and return its result. Args: message_type: HA WS message type, e.g. ``"recorder/statistics_during_period"``. **payload: Additional fields merged into the request body. Returns: The ``result`` field of HA's success response — shape depends on the message type (dict, list, etc.). Raises: HassWebSocketError: HA replied with ``success=False`` or auth failed. """ url = _ws_url() ssl_ctx = _build_ssl_context() if url.startswith("wss://") else None async with websockets.connect(url, ssl=ssl_ctx) as ws: # 1. Server sends auth_required first. auth_required = json.loads(await ws.recv()) if auth_required.get("type") != "auth_required": raise HassWebSocketError( f"Unexpected initial WS message: {auth_required}" ) # 2. Authenticate. await ws.send(json.dumps({"type": "auth", "access_token": HA_TOKEN})) auth_result = json.loads(await ws.recv()) if auth_result.get("type") != "auth_ok": raise HassWebSocketError(f"WS authentication failed: {auth_result}") # 3. Send the actual request. HA requires monotonically increasing # `id` per connection; since we open a fresh connection per call, # `1` is always valid. await ws.send(json.dumps({"id": 1, "type": message_type, **payload})) response = json.loads(await ws.recv()) if not response.get("success", False): raise HassWebSocketError( f"WS request {message_type!r} failed: {response.get('error', response)}" ) return response.get("result") - app/server.py:1350-1389 (registration)Registration of get_statistics as an MCP tool via the @mcp.tool() decorator on the handler function in app/server.py.
@mcp.tool() @async_handler("get_statistics") async def get_statistics( entity_id: str, hours: int = 24, period: str = "hour", ) -> Dict[str, Any]: """ Get long-term aggregated statistics for an entity over the last N hours. Uses HA's recorder statistics (over WebSocket) — aggregated buckets (mean / min / max per period) that survive the short-term retention window. Use this instead of `get_history` when: - You want data older than the recorder's default 10-day window. - You want aggregated values rather than every individual change. - The entity is a high-frequency sensor (temperature, power) and raw history would be too many tokens. Args: entity_id: The entity (must have a `state_class` HA records as statistics — `measurement`, `total`, `total_increasing`). hours: How far back from now. Defaults to 24. period: Bucket size — `5minute`, `hour`, `day`, `week`, `month`. Defaults to `hour`. Returns: `entity_id`, `period`, `start_time`, `end_time`, `statistics` (list of `{start, end, mean, min, max, ...}` points). Examples: get_statistics("sensor.power_usage", hours=168, period="day") get_statistics("sensor.temperature", hours=24) """ logger.info( f"Getting statistics for {entity_id}: last {hours}h, period={period}" ) try: return await get_entity_statistics(entity_id, hours=hours, period=period) except ValueError as e: return {"entity_id": entity_id, "error": str(e), "statistics": []}