Skip to main content
Glama

get_statistics

Retrieve aggregated statistics (mean, min, max) for a Home Assistant entity over a specified time range, using data from the recorder.

Instructions

Get long-term aggregated statistics for an entity over the last N hours.

Uses HA's recorder statistics (over WebSocket) — aggregated buckets (mean / min / max per period) that survive the short-term retention window. Use this instead of get_history when: - You want data older than the recorder's default 10-day window. - You want aggregated values rather than every individual change. - The entity is a high-frequency sensor (temperature, power) and raw history would be too many tokens.

Args: entity_id: The entity (must have a state_class HA records as statistics — measurement, total, total_increasing). hours: How far back from now. Defaults to 24. period: Bucket size — 5minute, hour, day, week, month. Defaults to hour.

Returns: entity_id, period, start_time, end_time, statistics (list of {start, end, mean, min, max, ...} points).

Examples: get_statistics("sensor.power_usage", hours=168, period="day") get_statistics("sensor.temperature", hours=24)

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
entity_idYes
hoursNo
periodNohour

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The MCP tool handler for get_statistics. Decorated with @mcp.tool() and @async_handler('get_statistics'). Takes entity_id, hours (default 24), period (default 'hour') and delegates to get_entity_statistics in app/hass.py.
    @mcp.tool()
    @async_handler("get_statistics")
    async def get_statistics(
        entity_id: str,
        hours: int = 24,
        period: str = "hour",
    ) -> Dict[str, Any]:
        """
        Get long-term aggregated statistics for an entity over the last N hours.
    
        Uses HA's recorder statistics (over WebSocket) — aggregated buckets
        (mean / min / max per period) that survive the short-term retention
        window. Use this instead of `get_history` when:
            - You want data older than the recorder's default 10-day window.
            - You want aggregated values rather than every individual change.
            - The entity is a high-frequency sensor (temperature, power) and
              raw history would be too many tokens.
    
        Args:
            entity_id: The entity (must have a `state_class` HA records as
                       statistics — `measurement`, `total`, `total_increasing`).
            hours: How far back from now. Defaults to 24.
            period: Bucket size — `5minute`, `hour`, `day`, `week`, `month`.
                    Defaults to `hour`.
    
        Returns:
            `entity_id`, `period`, `start_time`, `end_time`, `statistics`
            (list of `{start, end, mean, min, max, ...}` points).
    
        Examples:
            get_statistics("sensor.power_usage", hours=168, period="day")
            get_statistics("sensor.temperature", hours=24)
        """
        logger.info(
            f"Getting statistics for {entity_id}: last {hours}h, period={period}"
        )
        try:
            return await get_entity_statistics(entity_id, hours=hours, period=period)
        except ValueError as e:
            return {"entity_id": entity_id, "error": str(e), "statistics": []}
  • Core business logic for get_statistics. Computes start/end datetime range and delegates to get_entity_statistics_range.
    async def get_entity_statistics(
        entity_id: str,
        hours: int = 24,
        period: str = "hour",
    ) -> Dict[str, Any]:
        """Convenience wrapper: last N hours of statistics for an entity.
    
        Args:
            entity_id: The entity to fetch statistics for.
            hours: How far back from now (UTC) to query. Defaults to 24.
            period: Bucket size — `5minute`, `hour`, `day`, `week`, `month`.
    
        Returns:
            Same shape as `get_entity_statistics_range`.
        """
        end_dt = datetime.now(timezone.utc)
        start_dt = end_dt - timedelta(hours=hours)
        return await get_entity_statistics_range(entity_id, start_dt, end_dt, period)
  • Core business logic that calls HA WebSocket API. Validates period and date range, then calls call_ws with 'recorder/statistics_during_period' to fetch statistics over WebSocket. Returns structured response with entity_id, period, times and statistics list.
    @handle_api_errors
    async def get_entity_statistics_range(
        entity_id: str,
        start_time: Union[str, datetime],
        end_time: Optional[Union[str, datetime]] = None,
        period: str = "hour",
    ) -> Dict[str, Any]:
        """Get long-term statistics for an entity over a date/time range.
    
        Hits HA's `recorder/statistics_during_period` over the WebSocket API.
        Statistics survive the recorder's short-term retention window
        (default 10 days), so this is the right call for anything older than
        that or anything you want aggregated (mean / min / max per period).
    
        Args:
            entity_id: The entity (must have a `state_class` that HA records
                       as statistics — e.g. `measurement`, `total_increasing`).
            start_time: ISO-8601 string or datetime, UTC if naive.
            end_time: ISO-8601 string or datetime, defaults to now.
            period: Aggregation bucket — one of `5minute`, `hour`, `day`,
                    `week`, `month`. Defaults to `hour`.
    
        Returns:
            ``{"entity_id", "period", "start_time", "end_time", "statistics"}``.
            ``statistics`` is the list HA returned (each entry has `start`,
            `end`, `mean`, `min`, `max`, optionally `sum`/`state`).
        """
        if period not in _STATISTICS_PERIODS:
            raise ValueError(
                f"period must be one of {sorted(_STATISTICS_PERIODS)}, got {period!r}"
            )
        start_dt = _parse_iso_dt(start_time)
        end_dt = _parse_iso_dt(end_time) if end_time is not None else datetime.now(timezone.utc)
        if start_dt >= end_dt:
            raise ValueError("start_time must be before end_time")
    
        from app.ws import call_ws
        start_iso = start_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        end_iso = end_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
        result = await call_ws(
            "recorder/statistics_during_period",
            start_time=start_iso,
            end_time=end_iso,
            statistic_ids=[entity_id],
            period=period,
        )
        return {
            "entity_id": entity_id,
            "period": period,
            "start_time": start_iso,
            "end_time": end_iso,
            # HA returns `{entity_id: [points...]}`; flatten to the list when
            # we only asked for one entity.
            "statistics": (result or {}).get(entity_id, []) if isinstance(result, dict) else result,
        }
  • WebSocket client helper that opens a fresh connection, authenticates, sends the request type 'recorder/statistics_during_period' with payload, and returns the result. Used by get_entity_statistics_range.
    async def call_ws(message_type: str, **payload: Any) -> Any:
        """Send a single request over the HA WebSocket API and return its result.
    
        Args:
            message_type: HA WS message type, e.g. ``"recorder/statistics_during_period"``.
            **payload: Additional fields merged into the request body.
    
        Returns:
            The ``result`` field of HA's success response — shape depends on
            the message type (dict, list, etc.).
    
        Raises:
            HassWebSocketError: HA replied with ``success=False`` or auth failed.
        """
        url = _ws_url()
        ssl_ctx = _build_ssl_context() if url.startswith("wss://") else None
    
        async with websockets.connect(url, ssl=ssl_ctx) as ws:
            # 1. Server sends auth_required first.
            auth_required = json.loads(await ws.recv())
            if auth_required.get("type") != "auth_required":
                raise HassWebSocketError(
                    f"Unexpected initial WS message: {auth_required}"
                )
    
            # 2. Authenticate.
            await ws.send(json.dumps({"type": "auth", "access_token": HA_TOKEN}))
            auth_result = json.loads(await ws.recv())
            if auth_result.get("type") != "auth_ok":
                raise HassWebSocketError(f"WS authentication failed: {auth_result}")
    
            # 3. Send the actual request. HA requires monotonically increasing
            #    `id` per connection; since we open a fresh connection per call,
            #    `1` is always valid.
            await ws.send(json.dumps({"id": 1, "type": message_type, **payload}))
            response = json.loads(await ws.recv())
    
            if not response.get("success", False):
                raise HassWebSocketError(
                    f"WS request {message_type!r} failed: {response.get('error', response)}"
                )
            return response.get("result")
  • Registration of get_statistics as an MCP tool via the @mcp.tool() decorator on the handler function in app/server.py.
    @mcp.tool()
    @async_handler("get_statistics")
    async def get_statistics(
        entity_id: str,
        hours: int = 24,
        period: str = "hour",
    ) -> Dict[str, Any]:
        """
        Get long-term aggregated statistics for an entity over the last N hours.
    
        Uses HA's recorder statistics (over WebSocket) — aggregated buckets
        (mean / min / max per period) that survive the short-term retention
        window. Use this instead of `get_history` when:
            - You want data older than the recorder's default 10-day window.
            - You want aggregated values rather than every individual change.
            - The entity is a high-frequency sensor (temperature, power) and
              raw history would be too many tokens.
    
        Args:
            entity_id: The entity (must have a `state_class` HA records as
                       statistics — `measurement`, `total`, `total_increasing`).
            hours: How far back from now. Defaults to 24.
            period: Bucket size — `5minute`, `hour`, `day`, `week`, `month`.
                    Defaults to `hour`.
    
        Returns:
            `entity_id`, `period`, `start_time`, `end_time`, `statistics`
            (list of `{start, end, mean, min, max, ...}` points).
    
        Examples:
            get_statistics("sensor.power_usage", hours=168, period="day")
            get_statistics("sensor.temperature", hours=24)
        """
        logger.info(
            f"Getting statistics for {entity_id}: last {hours}h, period={period}"
        )
        try:
            return await get_entity_statistics(entity_id, hours=hours, period=period)
        except ValueError as e:
            return {"entity_id": entity_id, "error": str(e), "statistics": []}
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

Describes that it uses HA's recorder statistics and returns aggregated buckets (mean/min/max). Discloses that it survives short-term retention window. Does not explicitly state read-only nature, but purpose implies no modification. No annotations to contradict.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

Concise yet informative. Front-loaded with main purpose, then structured bullet points for usage, Args, Returns, Examples. Every sentence adds value. No unnecessary repetition.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness5/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given output schema exists, the description still provides a summary of return structure (entity_id, period, start_time, end_time, statistics). Covers all needed context: what it does, when to use, parameters, and returns. No gaps.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema has 0% description coverage, but the description compensates by explaining each parameter: entity_id requires a state_class, hours defaults to 24, period with allowed bucket sizes. Provides examples. Adds meaning beyond schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

Clearly states it gets long-term aggregated statistics for an entity over the last N hours. Distinguishes from sibling get_history by specifying use cases for older data or aggregated values. Includes examples and mentions HA recorder statistics.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines5/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

Explicitly provides three conditions when to use instead of get_history: for data older than 10-day window, for aggregated values, and for high-frequency sensors. Also implies when not to use (short-term or raw history).

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/voska/hass-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server