top_cost_drivers
Identify the top-spending agents and models in a configurable time window to reveal where your money is going.
Instructions
Highest-spend agents + models in the window — flat list, no per-provider or anomaly noise. Useful for 'where's our money going' digest format.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| window_hours | No | ||
| top_n | No |
Implementation Reference
- Main handler function `build_top_drivers` that composes the `TopCostDriversReport` by computing total cost, top agents (via `_by_agent`), and top models (via `_by_model`).
def build_top_drivers( entries: list[CostEntry], window_hours: int, top_n: int = 5, ) -> TopCostDriversReport: """Compose the `top_cost_drivers` response.""" return TopCostDriversReport( window_hours=window_hours, total_cost_usd=sum(e.cost_usd for e in entries), top_agents=_by_agent(entries, top_n=top_n), top_models=_by_model(entries, top_n=top_n), ) - Helper `_by_agent` aggregates cost entries by agent_id, sorting by total cost and returning top_n AgentCost records with share of total.
def _by_agent(entries: list[CostEntry], top_n: int | None = None) -> list[AgentCost]: if not entries: return [] total = sum(e.cost_usd for e in entries) or 1.0 grouped: dict[str, list[CostEntry]] = defaultdict(list) for e in entries: agent = e.agent_id or "<unattributed>" grouped[agent].append(e) out: list[AgentCost] = [] for agent, group in grouped.items(): cost = sum(e.cost_usd for e in group) # Most-common provider/model in the group provider_counts = Counter(e.provider for e in group) model_counts = Counter((e.provider, e.model) for e in group) primary_provider = provider_counts.most_common(1)[0][0] if provider_counts else None primary_model = model_counts.most_common(1)[0][0][1] if model_counts else None out.append( AgentCost( agent_id=agent, total_cost_usd=cost, request_count=len(group), total_tokens=sum(e.total_tokens for e in group), avg_cost_per_request_usd=cost / len(group), primary_provider=primary_provider, primary_model=primary_model, share_of_total_pct=100.0 * cost / total, ) ) out.sort(key=lambda a: a.total_cost_usd, reverse=True) if top_n is not None: out = out[:top_n] return out - Helper `_by_model` aggregates cost entries by (provider, model) pair, sorting by total cost and returning top_n ModelCost records.
def _by_model(entries: list[CostEntry], top_n: int | None = None) -> list[ModelCost]: if not entries: return [] grouped: dict[tuple[Provider, str], list[CostEntry]] = defaultdict(list) for e in entries: grouped[(e.provider, e.model)].append(e) out: list[ModelCost] = [] for (provider, model), group in grouped.items(): cost = sum(e.cost_usd for e in group) out.append( ModelCost( provider=provider, model=model, total_cost_usd=cost, request_count=len(group), avg_cost_per_request_usd=cost / len(group), avg_tokens_per_request=sum(e.total_tokens for e in group) / len(group), ) ) out.sort(key=lambda m: m.total_cost_usd, reverse=True) if top_n is not None: out = out[:top_n] return out - Pydantic model `TopCostDriversReport` with window_hours, total_cost_usd, top_agents (list of AgentCost), and top_models (list of ModelCost).
class TopCostDriversReport(BaseModel): """Response for `top_cost_drivers`.""" window_hours: int total_cost_usd: float top_agents: list[AgentCost] top_models: list[ModelCost] - src/openclaw_cost_tracker_mcp/server.py:127-141 (registration)Tool registration with name 'top_cost_drivers', description, and inputSchema (window_hours, top_n).
Tool( name="top_cost_drivers", description=( "Highest-spend agents + models in the window — flat list, no per-provider " "or anomaly noise. Useful for 'where's our money going' digest format." ), inputSchema={ "type": "object", "properties": { "window_hours": {"type": "integer", "default": 168}, "top_n": {"type": "integer", "default": 5}, }, "required": [], }, ), - src/openclaw_cost_tracker_mcp/server.py:226-230 (registration)Dispatch logic in `call_tool` that reads arguments, calls `_entries_in_window` and delegates to `build_top_drivers`.
if name == "top_cost_drivers": window_hours = max(1, min(int(arguments.get("window_hours", 168)), 720)) top_n = max(1, min(int(arguments.get("top_n", 5)), 50)) entries = await _entries_in_window(window_hours) return _serialize(build_top_drivers(entries, window_hours, top_n=top_n))