Skip to main content
Glama

fitbit_trends

Analyze trends in Fitbit data using cached averages and totals over weekly, monthly, or quarterly periods. Supports heart rate, activity, sleep, weight, SpO2, and HRV. Compare two periods to identify changes.

Instructions

Analyse trends in cached Fitbit data.

Computes averages and totals over time from the local cache, auto-syncing if stale.

Args: data_type: What to analyse. Options: "heart_rate", "activity", "exercises", "sleep", "weight", "spo2", "hrv". Default: "activity". period: Aggregation period. Options: "weekly", "monthly", "quarterly". Default: "monthly". start_date: Start date as "YYYY-MM-DD" or "365d". Default: last 12 months. end_date: End date as "YYYY-MM-DD". Default: today. compare: Compare two periods. Format: "last_30d vs previous_30d", "2026-03 vs 2026-02", "2026-Q1 vs 2025-Q4". When set, period/start_date/end_date are ignored.

Returns aggregated averages per period. For activity: steps, distance, active minutes. For exercises: sessions, duration, calories. For sleep: duration, efficiency, stage breakdown. For heart_rate: resting HR min/avg/max. For weight: weight, fat%, BMI. For spo2: avg/min/max oxygen saturation. For hrv: daily and deep RMSSD. Not for raw data - use fitbit_get_* tools instead.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
data_typeNoactivity
periodNomonthly
start_dateNo
end_dateNo
compareNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The main 'fitbit_trends' tool handler function, decorated with @mcp.tool() and @require_auth. Accepts data_type, period, start_date, end_date, and compare parameters. Uses an inner _analyse() function that auto-syncs if stale, then either compares two periods or computes trends per data type using _trend_* helper functions. Returns formatted JSON via format_response().
    @mcp.tool()
    @require_auth
    async def fitbit_trends(
        data_type: str = "activity",
        period: str = "monthly",
        start_date: str | None = None,
        end_date: str | None = None,
        compare: str | None = None,
    ) -> str:
        """Analyse trends in cached Fitbit data.
    
        Computes averages and totals over time from the local cache,
        auto-syncing if stale.
    
        Args:
            data_type: What to analyse. Options: "heart_rate", "activity",
                "exercises", "sleep", "weight", "spo2", "hrv". Default: "activity".
            period: Aggregation period. Options: "weekly", "monthly",
                "quarterly". Default: "monthly".
            start_date: Start date as "YYYY-MM-DD" or "365d". Default: last 12 months.
            end_date: End date as "YYYY-MM-DD". Default: today.
            compare: Compare two periods. Format: "last_30d vs previous_30d",
                "2026-03 vs 2026-02", "2026-Q1 vs 2025-Q4".
                When set, period/start_date/end_date are ignored.
    
        Returns aggregated averages per period. For activity: steps, distance,
        active minutes. For exercises: sessions, duration, calories.
        For sleep: duration, efficiency, stage breakdown.
        For heart_rate: resting HR min/avg/max. For weight: weight, fat%, BMI.
        For spo2: avg/min/max oxygen saturation. For hrv: daily and deep RMSSD.
        Not for raw data - use fitbit_get_* tools instead.
        """
        def _analyse():
            auto_sync_if_stale(data_type)
            conn = db.get_db()
    
            if compare:
                result = _compare_periods(conn, data_type, compare)
            else:
                start, end = parse_date(start_date, end_date, default_days=365)
                s, e = start.isoformat(), end.isoformat()
    
                trend_fns = {
                    "heart_rate": _trend_heart_rate,
                    "activity": _trend_activity,
                    "exercises": _trend_exercises,
                    "sleep": _trend_sleep,
                    "weight": _trend_weight,
                    "spo2": _trend_spo2,
                    "hrv": _trend_hrv,
                }
                fn = trend_fns.get(data_type)
                if fn:
                    result = fn(conn, s, e, period)
                else:
                    result = {"error": f"Unknown data_type '{data_type}'. Use: heart_rate, activity, exercises, sleep, weight, spo2, or hrv."}
    
            conn.close()
            return result
    
        result = await anyio.to_thread.run_sync(_analyse)
        return format_response(result)
  • Tool registration via the @mcp.tool() decorator on the fitbit_trends function, using the shared FastMCP instance from mcp_instance.py.
    @mcp.tool()
  • The _compare_periods function handles the 'compare' parameter of fitbit_trends, parsing range specs like 'last_30d vs previous_30d', querying the DB for both periods, and returning aggregated summaries.
    def _compare_periods(conn, data_type: str, compare_str: str) -> dict:
        parts = re.split(r"\s+vs\s+", compare_str.strip(), maxsplit=1)
        if len(parts) != 2:
            return {"error": "Invalid compare format. Use: 'last_30d vs previous_30d' or '2026-03 vs 2026-02'"}
    
        ranges = []
        for part in parts:
            r = _parse_compare_range(part.strip())
            if r is None:
                return {"error": f"Cannot parse period '{part}'. Use: last_30d, previous_30d, 2026-03, or 2026-Q1"}
            ranges.append(r)
    
        query_fns = {
            "heart_rate": db.query_heart_rate,
            "activity": db.query_activity,
            "exercises": db.query_exercises,
            "sleep": db.query_sleep,
            "weight": db.query_weight,
            "spo2": db.query_spo2,
            "hrv": db.query_hrv,
        }
        query_fn = query_fns.get(data_type)
        if not query_fn:
            return {"error": f"Cannot compare data_type '{data_type}'. Use: heart_rate, activity, exercises, sleep, weight, spo2, or hrv."}
    
        def summarize(rows, dtype):
            if not rows:
                return {"count": 0}
            if dtype == "heart_rate":
                hrs = [r["resting_hr"] for r in rows if r.get("resting_hr")]
                return {"count": len(rows), "avg_resting_hr": _avg(hrs)}
            elif dtype == "activity":
                steps = [r["steps"] for r in rows if r.get("steps")]
                return {"count": len(rows), "avg_steps": _avg(steps)}
            elif dtype == "exercises":
                dur = [r["duration_min"] for r in rows if r.get("duration_min")]
                return {"count": len(rows), "avg_duration": format_duration(_avg(dur))}
            elif dtype == "sleep":
                mins = [r["total_minutes"] for r in rows if r.get("total_minutes")]
                return {"count": len(rows), "avg_total_sleep": format_duration(_avg(mins))}
            elif dtype == "weight":
                weights = [r["weight_kg"] for r in rows if r.get("weight_kg")]
                return {"count": len(rows), "avg_weight_kg": _avg(weights)}
            elif dtype == "spo2":
                avgs = [r["avg"] for r in rows if r.get("avg")]
                return {"count": len(rows), "avg_spo2": _avg(avgs)}
            elif dtype == "hrv":
                rmssd = [r["daily_rmssd"] for r in rows if r.get("daily_rmssd")]
                return {"count": len(rows), "avg_daily_rmssd": _avg(rmssd)}
            return {"count": len(rows)}
    
        period_a = query_fn(conn, ranges[0][0].isoformat(), ranges[0][1].isoformat())
        period_b = query_fn(conn, ranges[1][0].isoformat(), ranges[1][1].isoformat())
        result_a = summarize(period_a, data_type)
        result_b = summarize(period_b, data_type)
        result_a["period"] = f"{ranges[0][0]} to {ranges[0][1]}"
        result_b["period"] = f"{ranges[1][0]} to {ranges[1][1]}"
        return {"period_1": result_a, "period_2": result_b, "data_type": data_type}
  • A set of _trend_* helper functions (_trend_heart_rate, _trend_activity, _trend_sleep, _trend_weight, _trend_spo2, _trend_exercises, _trend_hrv) that compute period-based averages from cached data. Each queries the appropriate db table and aggregates data into period buckets (weekly/monthly/quarterly).
    def _trend_heart_rate(conn, start_date: str, end_date: str, period: str) -> dict:
        rows = db.query_heart_rate(conn, start_date, end_date)
        if not rows:
            return {"message": "No heart rate data in cache. No data recorded for this period."}
    
        buckets = defaultdict(list)
        for r in rows:
            hr = r.get("resting_hr")
            if hr is not None:
                buckets[_get_period_key(r["date"], period)].append(hr)
    
        periods = []
        for key in sorted(buckets.keys()):
            vals = buckets[key]
            periods.append({
                "period": key,
                "days": len(vals),
                "avg_resting_hr": _avg(vals),
                "min_resting_hr": min(vals) if vals else None,
                "max_resting_hr": max(vals) if vals else None,
            })
        return {"periods": periods, "data_type": "heart_rate", "aggregation": period}
    
    
    def _trend_activity(conn, start_date: str, end_date: str, period: str) -> dict:
        rows = db.query_activity(conn, start_date, end_date)
        if not rows:
            return {"message": "No activity data in cache. No data recorded for this period."}
    
        buckets = defaultdict(lambda: defaultdict(list))
        for r in rows:
            key = _get_period_key(r["date"], period)
            for f in ["steps", "calories_out", "active_minutes", "distance_km"]:
                v = r.get(f)
                if v is not None:
                    buckets[key][f].append(v)
    
        periods = []
        for key in sorted(buckets.keys()):
            b = buckets[key]
            dist = b.get("distance_km", [])
            periods.append({
                "period": key,
                "days": len(b.get("steps", [])),
                "avg_steps": _avg(b.get("steps", [])),
                "avg_active_minutes": _avg(b.get("active_minutes", [])),
                "total_distance_km": round(sum(dist), 1) if dist else None,
                "avg_calories_out": _avg(b.get("calories_out", [])),
            })
        return {"periods": periods, "data_type": "activity", "aggregation": period}
    
    
    def _trend_sleep(conn, start_date: str, end_date: str, period: str) -> dict:
        rows = db.query_sleep(conn, start_date, end_date)
        if not rows:
            return {"message": "No sleep data in cache. No data recorded for this period."}
    
        buckets = defaultdict(lambda: defaultdict(list))
        for r in rows:
            key = _get_period_key(r["date"], period)
            for f in ["total_minutes", "efficiency", "deep_minutes", "rem_minutes"]:
                v = r.get(f)
                if v is not None:
                    buckets[key][f].append(v)
    
        periods = []
        for key in sorted(buckets.keys()):
            b = buckets[key]
            total = b.get("total_minutes", [])
            periods.append({
                "period": key,
                "nights": len(total),
                "avg_total_sleep": format_duration(_avg(total)),
                "avg_deep_sleep": format_duration(_avg(b.get("deep_minutes", []))),
                "avg_rem_sleep": format_duration(_avg(b.get("rem_minutes", []))),
                "avg_efficiency": _avg(b.get("efficiency", [])),
            })
        return {"periods": periods, "data_type": "sleep", "aggregation": period}
    
    
    def _trend_weight(conn, start_date: str, end_date: str, period: str) -> dict:
        rows = db.query_weight(conn, start_date, end_date)
        if not rows:
            return {"message": "No weight data in cache. No data recorded for this period."}
    
        buckets = defaultdict(lambda: defaultdict(list))
        for r in rows:
            key = _get_period_key(r["date"], period)
            for f in ["weight_kg", "fat_pct", "bmi"]:
                v = r.get(f)
                if v is not None:
                    buckets[key][f].append(v)
    
        periods = []
        for key in sorted(buckets.keys()):
            b = buckets[key]
            periods.append({
                "period": key,
                "count": len(b.get("weight_kg", [])),
                "avg_weight_kg": _avg(b.get("weight_kg", [])),
                "avg_fat_pct": _avg(b.get("fat_pct", [])),
                "avg_bmi": _avg(b.get("bmi", [])),
            })
        return {"periods": periods, "data_type": "weight", "aggregation": period}
    
    
    def _trend_spo2(conn, start_date: str, end_date: str, period: str) -> dict:
        rows = db.query_spo2(conn, start_date, end_date)
        if not rows:
            return {"message": "No SpO2 data in cache. No data recorded for this period."}
    
        buckets = defaultdict(lambda: defaultdict(list))
        for r in rows:
            key = _get_period_key(r["date"], period)
            for f in ["avg", "min", "max"]:
                v = r.get(f)
                if v is not None:
                    buckets[key][f].append(v)
    
        periods = []
        for key in sorted(buckets.keys()):
            b = buckets[key]
            periods.append({
                "period": key,
                "nights": len(b.get("avg", [])),
                "avg_spo2": _avg(b.get("avg", [])),
                "min_spo2": min(b.get("min", [0])) if b.get("min") else None,
                "max_spo2": max(b.get("max", [0])) if b.get("max") else None,
            })
        return {"periods": periods, "data_type": "spo2", "aggregation": period}
    
    
    def _trend_exercises(conn, start_date: str, end_date: str, period: str) -> dict:
        rows = db.query_exercises(conn, start_date, end_date)
        if not rows:
            return {"message": "No exercise data in cache. No data recorded for this period."}
    
        buckets = defaultdict(lambda: defaultdict(list))
        for r in rows:
            key = _get_period_key(r["date"], period)
            for f in ["duration_min", "calories"]:
                v = r.get(f)
                if v is not None:
                    buckets[key][f].append(v)
            buckets[key]["_count"].append(1)
    
        periods = []
        for key in sorted(buckets.keys()):
            b = buckets[key]
            dur = b.get("duration_min", [])
            periods.append({
                "period": key,
                "sessions": len(b.get("_count", [])),
                "total_duration": format_duration(sum(dur)) if dur else None,
                "avg_duration": format_duration(_avg(dur)),
                "total_calories": sum(b.get("calories", [])) if b.get("calories") else None,
            })
        return {"periods": periods, "data_type": "exercises", "aggregation": period}
    
    
    def _trend_hrv(conn, start_date: str, end_date: str, period: str) -> dict:
        rows = db.query_hrv(conn, start_date, end_date)
        if not rows:
            return {"message": "No HRV data in cache. No data recorded for this period."}
    
        buckets = defaultdict(lambda: defaultdict(list))
        for r in rows:
            key = _get_period_key(r["date"], period)
            for f in ["daily_rmssd", "deep_rmssd"]:
                v = r.get(f)
                if v is not None:
                    buckets[key][f].append(v)
    
        periods = []
        for key in sorted(buckets.keys()):
            b = buckets[key]
            periods.append({
                "period": key,
                "nights": len(b.get("daily_rmssd", [])),
                "avg_daily_rmssd": _avg(b.get("daily_rmssd", [])),
                "avg_deep_rmssd": _avg(b.get("deep_rmssd", [])),
            })
        return {"periods": periods, "data_type": "hrv", "aggregation": period}
  • The _get_period_key helper maps a YYYY-MM-DD date string to a period bucket key (weekly, quarterly, or monthly) used to group trend data.
    def _get_period_key(ds: str, period: str) -> str:
        """Map a YYYY-MM-DD date string to a period bucket key."""
        year = ds[:4]
        month = int(ds[5:7])
        if period == "weekly":
            d = date.fromisoformat(ds)
            iso_year, iso_week, _ = d.isocalendar()
            return f"{iso_year}-W{iso_week:02d}"
        elif period == "quarterly":
            q = (month - 1) // 3 + 1
            return f"{year}-Q{q}"
        else:  # monthly
            return ds[:7]
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations provided, so description carries full burden. It discloses auto-syncing behavior if cache is stale, and details the output format for each data type. It does not mention authentication or rate limits, but given the read-only, analytical nature, the disclosures are adequate.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

Description is well-structured with sections for purpose, arguments, and return values. It is comprehensive but not overly verbose. A few sentences could be tightened, but overall it is efficient and front-loaded.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given no annotations and an output schema present, the description covers the tool's behavior (auto-sync, aggregated output), parameter details, and return format per data type. It lacks nothing essential for understanding how and when to use the tool.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters5/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 0%, so description provides all parameter meaning. It explains each parameter: data_type lists all options, period lists options, start_date/end_date specify format and defaults, and compare describes format and override behavior. This adds significant value beyond the bare schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

Description clearly states the tool analyses trends in cached Fitbit data, computing averages and totals, and explicitly distinguishes from raw data retrieval tools by name (fitbit_get_*). The verb 'analyse' and resource 'trends' are specific and unambiguous.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines4/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

Description explicitly states when to use (for aggregated trends, not raw data) and when not to use ('Not for raw data - use fitbit_get_* tools instead'). It does not provide exhaustive comparative guidance among the many trend-related siblings but clearly delineates the key distinction.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/partymola/fitbit-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server