Skip to main content
Glama

search_items

Search for geospatial data items across STAC collections using spatial, temporal, and attribute filters to access satellite imagery and weather datasets.

Instructions

Search for STAC items across collections

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
bboxNoBounding box [west, south, east, north] in WGS84
catalog_urlNoSTAC catalog URL (optional, defaults to Microsoft Planetary Computer)
collectionsNoList of collection IDs to search within
datetimeNoDate/time filter (ISO 8601 format, e.g., '2023-01-01/2023-12-31')
limitNoMaximum number of items to return
queryNoAdditional query parameters for filtering items

Implementation Reference

  • The core handler function that executes the search_items tool logic. It takes a STACClient and arguments, performs the search, and returns formatted text or JSON output.
    def handle_search_items( client: STACClient, arguments: dict[str, Any], ) -> list[TextContent] | dict[str, Any]: collections = arguments.get("collections") bbox = arguments.get("bbox") dt = arguments.get("datetime") query = arguments.get("query") limit = arguments.get("limit", 10) items = client.search_items( collections=collections, bbox=bbox, datetime=dt, query=query, limit=limit, ) if arguments.get("output_format") == "json": return {"type": "item_list", "count": len(items), "items": items} result_text = f"Found {len(items)} items:\n\n" asset_keys = set() for item in items: item_id = item.get("id", "unknown") collection_id = item.get("collection", "unknown") result_text += f"**{item_id}** (Collection: `{collection_id}`)\n" dt_value = item.get("datetime") if dt_value: result_text += f" Date: {dt_value}\n" bbox = item.get("bbox") if isinstance(bbox, list | tuple) and len(bbox) >= BBOX_MIN_COORDS: result_text += ( " BBox: " f"[{bbox[0]:.2f}, {bbox[1]:.2f}, {bbox[2]:.2f}, {bbox[3]:.2f}]\n" ) assets = item.get("assets") or {} asset_keys.update(assets.keys()) asset_count = len(assets) if hasattr(assets, "__len__") else 0 result_text += f" Assets: {asset_count}\n\n" result_text += "\n" if asset_keys: result_text += "Assets found across items:\n" for key in sorted(asset_keys): result_text += f" - {key}\n" return [TextContent(type="text", text=result_text)]
  • Registers the 'search_items' tool on the FastMCP server instance. Defines the tool schema via type hints and calls the execution dispatcher.
    @app.tool async def search_items( collections: list[str] | str, bbox: list[float] | str | None = None, datetime: str | None = None, limit: int | None = 10, query: dict[str, Any] | str | None = None, output_format: str | None = "text", catalog_url: str | None = None, ) -> list[dict[str, Any]]: """Search for STAC items.""" arguments = preprocess_parameters( { "collections": collections, "bbox": bbox, "datetime": datetime, "limit": limit, "query": query, "output_format": output_format, } ) return await execution.execute_tool( "search_items", arguments=arguments, catalog_url=catalog_url, headers=None, )
  • Internal registration of tool handlers in a dispatch dictionary, mapping 'search_items' to its handler function.
    _TOOL_HANDLERS: dict[str, Handler] = { "search_collections": handle_search_collections, "get_collection": handle_get_collection, "search_items": handle_search_items, "get_item": handle_get_item, "estimate_data_size": handle_estimate_data_size, "get_root": handle_get_root, "get_conformance": handle_get_conformance, "get_queryables": handle_get_queryables, "get_aggregations": handle_get_aggregations, "sensor_registry_info": handle_sensor_registry_info, }
  • The execute_tool function that dispatches to the appropriate handler based on tool_name, manages STACClient caching, instrumentation, and normalizes output to TextContent.
    async def execute_tool( tool_name: str, arguments: dict[str, Any] | None = None, catalog_url: str | None = None, headers: dict[str, str] | None = None, handler: Handler | None = None, client: STACClient | None = None, ): """Execute a tool handler with optional overrides for tests. Parameters mirror the comprehensive execution tests: when *handler* or *client* are provided they are used directly, otherwise the registered handler and shared client are used. The return value is always normalized to a ``list[TextContent]`` for compatibility with existing tooling. """ arguments = dict(arguments or {}) if handler is None: handler = _TOOL_HANDLERS.get(tool_name) if handler is None: _raise_unknown_tool(tool_name) # Offload handler execution to a thread to avoid blocking the async event loop # (handlers may perform network I/O or heavy CPU work like odc.stac.load). if client is None: # Reuse a cached STACClient when possible so multiple tool calls # within the same session/context share connection and session # state (HTTP sessions, timeout wrappers, etc.). This keeps # tools lightweight and consistent across invocations. client = _get_cached_client(catalog_url, headers) # Run the handler under the instrumented wrapper in a thread instrumented = await asyncio.to_thread( instrument_tool_execution, tool_name, catalog_url, handler, client, arguments, ) raw_result = instrumented.value output_format = arguments.get("output_format", "text") if output_format == "json": if isinstance(raw_result, list): normalized = _as_text_content_list(raw_result) payload = { "mode": "text_fallback", "content": [item.text for item in normalized], } else: payload = {"mode": "json", "data": raw_result} payload_text = json.dumps(payload, separators=(",", ":")) record_tool_result_size(tool_name, len(payload_text.encode("utf-8"))) return [TextContent(type="text", text=payload_text)] normalized = _as_text_content_list(raw_result) total_bytes = sum(len(item.text.encode("utf-8")) for item in normalized) record_tool_result_size(tool_name, total_bytes) return normalized

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BnJam/stac-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server