search_items
Search for geospatial data items across STAC collections using spatial, temporal, and attribute filters to access satellite imagery and weather datasets.
Instructions
Search for STAC items across collections
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| bbox | No | Bounding box [west, south, east, north] in WGS84 | |
| catalog_url | No | STAC catalog URL (optional, defaults to Microsoft Planetary Computer) | |
| collections | No | List of collection IDs to search within | |
| datetime | No | Date/time filter (ISO 8601 format, e.g., '2023-01-01/2023-12-31') | |
| limit | No | Maximum number of items to return | |
| query | No | Additional query parameters for filtering items |
Implementation Reference
- stac_mcp/tools/search_items.py:12-54 (handler)The core handler function that executes the search_items tool logic. It takes a STACClient and arguments, performs the search, and returns formatted text or JSON output.def handle_search_items( client: STACClient, arguments: dict[str, Any], ) -> list[TextContent] | dict[str, Any]: collections = arguments.get("collections") bbox = arguments.get("bbox") dt = arguments.get("datetime") query = arguments.get("query") limit = arguments.get("limit", 10) items = client.search_items( collections=collections, bbox=bbox, datetime=dt, query=query, limit=limit, ) if arguments.get("output_format") == "json": return {"type": "item_list", "count": len(items), "items": items} result_text = f"Found {len(items)} items:\n\n" asset_keys = set() for item in items: item_id = item.get("id", "unknown") collection_id = item.get("collection", "unknown") result_text += f"**{item_id}** (Collection: `{collection_id}`)\n" dt_value = item.get("datetime") if dt_value: result_text += f" Date: {dt_value}\n" bbox = item.get("bbox") if isinstance(bbox, list | tuple) and len(bbox) >= BBOX_MIN_COORDS: result_text += ( " BBox: " f"[{bbox[0]:.2f}, {bbox[1]:.2f}, {bbox[2]:.2f}, {bbox[3]:.2f}]\n" ) assets = item.get("assets") or {} asset_keys.update(assets.keys()) asset_count = len(assets) if hasattr(assets, "__len__") else 0 result_text += f" Assets: {asset_count}\n\n" result_text += "\n" if asset_keys: result_text += "Assets found across items:\n" for key in sorted(asset_keys): result_text += f" - {key}\n" return [TextContent(type="text", text=result_text)]
- stac_mcp/server.py:83-109 (registration)Registers the 'search_items' tool on the FastMCP server instance. Defines the tool schema via type hints and calls the execution dispatcher.@app.tool async def search_items( collections: list[str] | str, bbox: list[float] | str | None = None, datetime: str | None = None, limit: int | None = 10, query: dict[str, Any] | str | None = None, output_format: str | None = "text", catalog_url: str | None = None, ) -> list[dict[str, Any]]: """Search for STAC items.""" arguments = preprocess_parameters( { "collections": collections, "bbox": bbox, "datetime": datetime, "limit": limit, "query": query, "output_format": output_format, } ) return await execution.execute_tool( "search_items", arguments=arguments, catalog_url=catalog_url, headers=None, )
- stac_mcp/tools/execution.py:56-67 (registration)Internal registration of tool handlers in a dispatch dictionary, mapping 'search_items' to its handler function._TOOL_HANDLERS: dict[str, Handler] = { "search_collections": handle_search_collections, "get_collection": handle_get_collection, "search_items": handle_search_items, "get_item": handle_get_item, "estimate_data_size": handle_estimate_data_size, "get_root": handle_get_root, "get_conformance": handle_get_conformance, "get_queryables": handle_get_queryables, "get_aggregations": handle_get_aggregations, "sensor_registry_info": handle_sensor_registry_info, }
- stac_mcp/tools/execution.py:131-189 (helper)The execute_tool function that dispatches to the appropriate handler based on tool_name, manages STACClient caching, instrumentation, and normalizes output to TextContent.async def execute_tool( tool_name: str, arguments: dict[str, Any] | None = None, catalog_url: str | None = None, headers: dict[str, str] | None = None, handler: Handler | None = None, client: STACClient | None = None, ): """Execute a tool handler with optional overrides for tests. Parameters mirror the comprehensive execution tests: when *handler* or *client* are provided they are used directly, otherwise the registered handler and shared client are used. The return value is always normalized to a ``list[TextContent]`` for compatibility with existing tooling. """ arguments = dict(arguments or {}) if handler is None: handler = _TOOL_HANDLERS.get(tool_name) if handler is None: _raise_unknown_tool(tool_name) # Offload handler execution to a thread to avoid blocking the async event loop # (handlers may perform network I/O or heavy CPU work like odc.stac.load). if client is None: # Reuse a cached STACClient when possible so multiple tool calls # within the same session/context share connection and session # state (HTTP sessions, timeout wrappers, etc.). This keeps # tools lightweight and consistent across invocations. client = _get_cached_client(catalog_url, headers) # Run the handler under the instrumented wrapper in a thread instrumented = await asyncio.to_thread( instrument_tool_execution, tool_name, catalog_url, handler, client, arguments, ) raw_result = instrumented.value output_format = arguments.get("output_format", "text") if output_format == "json": if isinstance(raw_result, list): normalized = _as_text_content_list(raw_result) payload = { "mode": "text_fallback", "content": [item.text for item in normalized], } else: payload = {"mode": "json", "data": raw_result} payload_text = json.dumps(payload, separators=(",", ":")) record_tool_result_size(tool_name, len(payload_text.encode("utf-8"))) return [TextContent(type="text", text=payload_text)] normalized = _as_text_content_list(raw_result) total_bytes = sum(len(item.text.encode("utf-8")) for item in normalized) record_tool_result_size(tool_name, total_bytes) return normalized