get_aggregations
Calculate statistical aggregations for geospatial datasets by applying filters to STAC items across collections, bounding boxes, time ranges, and custom queries.
Instructions
Get aggregations for STAC items.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| collections | Yes | ||
| bbox | No | ||
| datetime | No | ||
| query | No | ||
| catalog_url | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- stac_mcp/tools/get_aggregations.py:10-46 (handler)The handler function that implements the core logic of the 'get_aggregations' tool. It invokes STACClient.get_aggregations with parameters from arguments and formats the result as text or JSON.
def handle_get_aggregations( client: STACClient, arguments: dict[str, Any], ) -> list[TextContent] | dict[str, Any]: data = client.get_aggregations( collections=arguments.get("collections"), ids=arguments.get("ids"), bbox=arguments.get("bbox"), intersects=arguments.get("intersects"), datetime=arguments.get("datetime"), query=arguments.get("query"), filter_lang=arguments.get("filter_lang"), filter_expr=arguments.get("filter"), fields=arguments.get("fields"), sortby=arguments.get("sortby"), limit=arguments.get("limit", 0), ) if arguments.get("output_format") == "json": return {"type": "aggregations", **data} result_text = "**Aggregations**\n\n" result_text += f"Supported: {'Yes' if data.get('supported') else 'No'}\n" if data.get("aggregations"): result_text += "Aggregations:\n" for agg in data["aggregations"]: name = agg.get("name", "unnamed") value = agg.get("value", {}) result_text += f" - {name}:\n" if isinstance(value, dict): for k, v in value.items(): result_text += f" - {k}: {v}\n" else: result_text += f" - value: {value}\n" if data.get("meta"): result_text += f"\nMeta:\n - Matched: {data['meta'].get('matched')}\n" result_text += f"\n{data.get('message', '')}\n" return [TextContent(type="text", text=result_text)] - stac_mcp/server.py:159-178 (registration)The primary registration of the 'get_aggregations' tool using @app.tool decorator in FastMCP. The function signature defines the input schema and dispatches to the internal execution machinery.
@app.tool async def get_aggregations( collections: list[str], bbox: list[float] | None = None, datetime: str | None = None, query: dict[str, Any] | None = None, catalog_url: str | None = None, ) -> list[dict[str, Any]]: """Get aggregations for STAC items.""" return await execution.execute_tool( "get_aggregations", arguments={ "collections": collections, "bbox": bbox, "datetime": datetime, "query": query, }, catalog_url=catalog_url, headers=None, ) - stac_mcp/tools/execution.py:56-67 (registration)Internal registry in execution.py that maps the tool name 'get_aggregations' to its handler function handle_get_aggregations, used by execute_tool.
_TOOL_HANDLERS: dict[str, Handler] = { "search_collections": handle_search_collections, "get_collection": handle_get_collection, "search_items": handle_search_items, "get_item": handle_get_item, "estimate_data_size": handle_estimate_data_size, "get_root": handle_get_root, "get_conformance": handle_get_conformance, "get_queryables": handle_get_queryables, "get_aggregations": handle_get_aggregations, "sensor_registry_info": handle_sensor_registry_info, } - stac_mcp/tools/execution.py:21-21 (handler)Import of the get_aggregations handler into the execution module.
from stac_mcp.tools.get_aggregations import handle_get_aggregations