get_aggregations
Calculate statistical aggregations for geospatial datasets by applying filters to STAC items across collections, bounding boxes, time ranges, and custom queries.
Instructions
Get aggregations for STAC items.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| collections | Yes | ||
| bbox | No | ||
| datetime | No | ||
| query | No | ||
| catalog_url | No |
Implementation Reference
- stac_mcp/tools/get_aggregations.py:10-46 (handler)The handler function that implements the core logic of the 'get_aggregations' tool. It invokes STACClient.get_aggregations with parameters from arguments and formats the result as text or JSON.def handle_get_aggregations( client: STACClient, arguments: dict[str, Any], ) -> list[TextContent] | dict[str, Any]: data = client.get_aggregations( collections=arguments.get("collections"), ids=arguments.get("ids"), bbox=arguments.get("bbox"), intersects=arguments.get("intersects"), datetime=arguments.get("datetime"), query=arguments.get("query"), filter_lang=arguments.get("filter_lang"), filter_expr=arguments.get("filter"), fields=arguments.get("fields"), sortby=arguments.get("sortby"), limit=arguments.get("limit", 0), ) if arguments.get("output_format") == "json": return {"type": "aggregations", **data} result_text = "**Aggregations**\n\n" result_text += f"Supported: {'Yes' if data.get('supported') else 'No'}\n" if data.get("aggregations"): result_text += "Aggregations:\n" for agg in data["aggregations"]: name = agg.get("name", "unnamed") value = agg.get("value", {}) result_text += f" - {name}:\n" if isinstance(value, dict): for k, v in value.items(): result_text += f" - {k}: {v}\n" else: result_text += f" - value: {value}\n" if data.get("meta"): result_text += f"\nMeta:\n - Matched: {data['meta'].get('matched')}\n" result_text += f"\n{data.get('message', '')}\n" return [TextContent(type="text", text=result_text)]
- stac_mcp/server.py:159-178 (registration)The primary registration of the 'get_aggregations' tool using @app.tool decorator in FastMCP. The function signature defines the input schema and dispatches to the internal execution machinery.@app.tool async def get_aggregations( collections: list[str], bbox: list[float] | None = None, datetime: str | None = None, query: dict[str, Any] | None = None, catalog_url: str | None = None, ) -> list[dict[str, Any]]: """Get aggregations for STAC items.""" return await execution.execute_tool( "get_aggregations", arguments={ "collections": collections, "bbox": bbox, "datetime": datetime, "query": query, }, catalog_url=catalog_url, headers=None, )
- stac_mcp/tools/execution.py:56-67 (registration)Internal registry in execution.py that maps the tool name 'get_aggregations' to its handler function handle_get_aggregations, used by execute_tool._TOOL_HANDLERS: dict[str, Handler] = { "search_collections": handle_search_collections, "get_collection": handle_get_collection, "search_items": handle_search_items, "get_item": handle_get_item, "estimate_data_size": handle_estimate_data_size, "get_root": handle_get_root, "get_conformance": handle_get_conformance, "get_queryables": handle_get_queryables, "get_aggregations": handle_get_aggregations, "sensor_registry_info": handle_sensor_registry_info, }
- stac_mcp/tools/execution.py:21-21 (handler)Import of the get_aggregations handler into the execution module.from stac_mcp.tools.get_aggregations import handle_get_aggregations