Skip to main content
Glama

estimate_data_size

Calculate storage requirements for geospatial datasets by analyzing STAC items with spatial, temporal, and attribute filters to plan data downloads efficiently.

Instructions

Estimate data size for STAC items using lazy loading with odc.stac

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
aoi_geojsonNoArea of Interest as GeoJSON geometry for clipping (will use smallest bbox between this and bbox parameter)
bboxNoBounding box [west, south, east, north] in WGS84
catalog_urlNoSTAC catalog URL (optional, defaults to Microsoft Planetary Computer)
collectionsNoList of collection IDs to search within
datetimeNoDate/time filter (ISO 8601 format, e.g., '2023-01-01/2023-12-31')
limitNoMaximum number of items to analyze for size estimation
queryNoAdditional query parameters for filtering items

Implementation Reference

  • The primary handler function for the 'estimate_data_size' tool. It validates arguments, invokes STACClient.estimate_data_size(), processes the result, and returns formatted text or JSON output.
    def handle_estimate_data_size( client: STACClient, arguments: dict[str, Any], ) -> list[TextContent] | dict[str, Any]: collections = _validate_collections_argument(arguments.get("collections")) bbox = _validate_bbox_argument(arguments.get("bbox")) dt = _validate_datetime_argument(arguments.get("datetime")) query = _validate_query_argument(arguments.get("query")) aoi_geojson = _validate_aoi_geojson_argument(arguments.get("aoi_geojson")) limit = arguments.get("limit", 10) force_metadata_only = arguments.get("force_metadata_only", False) size_estimate = client.estimate_data_size( collections=collections, bbox=bbox, datetime=dt, query=query, aoi_geojson=aoi_geojson, limit=limit, force_metadata_only=force_metadata_only, ) # Note: we do not return JSON here immediately because we want to ensure # sensor-native and queried totals are computed and included in the JSON # output. The JSON branch is evaluated after computing MB/GB fallbacks. result_text = "**Data Size Estimation**\n\n" item_count = size_estimate.get("item_count", 0) result_text += f"Items analyzed: {item_count}\n" # Be defensive: some estimator implementations may omit the # pre-computed MB/GB fields. Prefer explicit fields but fall back to # reconstructing from bytes when necessary. estimated_bytes = size_estimate.get("estimated_size_bytes") if estimated_bytes is None: # Some older/test fixtures may use 'estimated_bytes' or 'estimated_size' estimated_bytes = size_estimate.get("estimated_bytes") estimated_mb = size_estimate.get("estimated_size_mb") if estimated_mb is None and estimated_bytes is not None: try: estimated_mb = float(estimated_bytes) / (1024 * 1024) except (TypeError, ValueError): estimated_mb = None estimated_gb = size_estimate.get("estimated_size_gb") if estimated_gb is None and estimated_mb is not None: try: estimated_gb = float(estimated_mb) / 1024.0 except (TypeError, ValueError): estimated_gb = None est_mb_str = ( f"{estimated_mb:.2f} MB" if isinstance(estimated_mb, (int, float)) else "n/a" ) est_gb_str = ( f"{estimated_gb:.4f} GB" if isinstance(estimated_gb, (int, float)) else "n/a" ) result_text += f"Estimated size: {est_mb_str} ({est_gb_str})\n" # Always surface sensor-native totals to the agent and the user. # Some estimator implementations compute an instrument-native (sensor) total # for narrower dtype suggestions; expose those values explicitly here. sensor_bytes = size_estimate.get("sensor_native_estimated_size_bytes") if sensor_bytes is None: sensor_bytes = size_estimate.get("sensor_native_estimated_bytes") sensor_mb = size_estimate.get("sensor_native_estimated_size_mb") if sensor_mb is None and sensor_bytes is not None: try: sensor_mb = float(sensor_bytes) / (1024 * 1024) except (TypeError, ValueError): sensor_mb = None sensor_gb = size_estimate.get("sensor_native_estimated_size_gb") if sensor_gb is None and sensor_mb is not None: try: sensor_gb = float(sensor_mb) / 1024.0 except (TypeError, ValueError): sensor_gb = None s_mb_str = f"{sensor_mb:.2f} MB" if isinstance(sensor_mb, (int, float)) else "n/a" s_gb_str = f"{sensor_gb:.4f} GB" if isinstance(sensor_gb, (int, float)) else "n/a" result_text += f"Sensor-native estimated size: {s_mb_str} ({s_gb_str})\n" raw_bytes_str = ( f"{int(estimated_bytes):,}" if estimated_bytes is not None else "n/a" ) result_text += f"Raw bytes: {raw_bytes_str}\n\n" result_text += "**Query Parameters:**\n" result_text += "Collections: " collections_list = ( ", ".join(size_estimate["collections"]) if size_estimate["collections"] else "All" ) result_text += f"{collections_list}\n" if size_estimate["bbox_used"]: b = size_estimate["bbox_used"] result_text += ( f"Bounding box: [{b[0]:.4f}, {b[1]:.4f}, {b[2]:.4f}, {b[3]:.4f}]\n" ) if size_estimate["temporal_extent"]: result_text += f"Time range: {size_estimate['temporal_extent']}\n" if size_estimate["clipped_to_aoi"]: result_text += "Clipped to AOI: Yes (minimized to smallest area)\n" if "data_variables" in size_estimate: result_text += "\n**Data Variables:**\n" for var_info in size_estimate["data_variables"]: # Support multiple possible size keys produced by different # estimator implementations/tests: prefer explicit 'size_mb', # then 'estimated_size_mb', then compute from 'estimated_bytes'. size_mb = None if "size_mb" in var_info: size_mb = var_info["size_mb"] elif "estimated_size_mb" in var_info: size_mb = var_info["estimated_size_mb"] elif ( "estimated_bytes" in var_info and var_info["estimated_bytes"] is not None ): try: size_mb = var_info["estimated_bytes"] / (1024 * 1024) except (TypeError, ValueError): size_mb = None size_str = f"{size_mb:.2f}" if isinstance(size_mb, (int, float)) else "n/a" result_text += ( f" - {var_info.get('variable', 'unknown')}: {size_str} MB, " f"shape {var_info.get('shape')}, dtype {var_info.get('dtype')}\n" ) if size_estimate.get("spatial_dims"): spatial = size_estimate["spatial_dims"] result_text += "\n**Spatial Dimensions:**\n" result_text += f" X (longitude): {spatial.get('x', 0)} pixels\n" result_text += f" Y (latitude): {spatial.get('y', 0)} pixels\n" if "assets_analyzed" in size_estimate: result_text += "\n**Assets Analyzed (fallback estimation):**\n" for asset_info in size_estimate["assets_analyzed"][:MAX_ASSET_LIST]: result_text += ( f" - {asset_info['asset']}: {asset_info['estimated_size_mb']} MB " f"({asset_info['media_type']})\n" ) remaining = len(size_estimate["assets_analyzed"]) - MAX_ASSET_LIST if remaining > 0: result_text += f" ... and {remaining} more assets\n" result_text += f"\n{size_estimate['message']}\n" # If JSON was requested, return a structured payload that includes both # the queried totals and the sensor-native totals so agents can rely on # a stable schema. if arguments.get("output_format") == "json": queried_totals = { "bytes": estimated_bytes, "mb": estimated_mb, "gb": estimated_gb, } sensor_native_totals = { "bytes": sensor_bytes, "mb": sensor_mb, "gb": sensor_gb, } return { "type": "data_size_estimate", "estimate": size_estimate, "queried_totals": queried_totals, "sensor_native_totals": sensor_native_totals, } # Append advisory guidance from the dtype prompt if available. This helps # agents and human users understand how to prefer compact dtypes and avoid # overestimation when NaN nodata forces float upcasts. if callable(dtype_size_preferences): try: advisory = dtype_size_preferences() if advisory: result_text += "\n**Estimator Advisory (dtype preferences)**\n" result_text += advisory + "\n" except ( RuntimeError, TypeError, ValueError, ) as exc: # pragma: no cover - best-effort _LOGGER.debug("estimate_data_size: advisory generation failed: %s", exc) return [TextContent(type="text", text=result_text)]
  • MCP server registration of the 'estimate_data_size' tool using FastMCP @app.tool decorator. Function parameters define the input schema; delegates to execution.execute_tool.
    @app.tool async def estimate_data_size( collections: list[str] | str, bbox: list[float] | str | None = None, datetime: str | None = None, query: dict[str, Any] | str | None = None, aoi_geojson: dict[str, Any] | str | None = None, limit: int | None = 10, force_metadata_only: bool | None = False, output_format: str | None = "text", catalog_url: str | None = None, ) -> list[dict[str, Any]]: """Estimate the data size for a STAC query.""" arguments = preprocess_parameters( { "collections": collections, "bbox": bbox, "datetime": datetime, "query": query, "aoi_geojson": aoi_geojson, "limit": limit, "force_metadata_only": force_metadata_only, "output_format": output_format, } ) return await execution.execute_tool( "estimate_data_size", arguments=arguments, catalog_url=catalog_url, headers=None, )
  • Internal registration mapping tool name 'estimate_data_size' to its handler function in the execution module's _TOOL_HANDLERS dictionary.
    _TOOL_HANDLERS: dict[str, Handler] = { "search_collections": handle_search_collections, "get_collection": handle_get_collection, "search_items": handle_search_items, "get_item": handle_get_item, "estimate_data_size": handle_estimate_data_size, "get_root": handle_get_root, "get_conformance": handle_get_conformance, "get_queryables": handle_get_queryables, "get_aggregations": handle_get_aggregations, "sensor_registry_info": handle_sensor_registry_info, }
  • Input validation helper functions used by the handler to validate and normalize tool arguments (collections, bbox, datetime, query, aoi_geojson).
    def _validate_collections_argument( collections: list[str] | None, ) -> list[str]: match collections: case None: msg = "Collections argument is required." raise ValueError(msg) case []: msg = "Collections argument cannot be empty." raise ValueError(msg) case _: return collections def _validate_datetime_argument(dt: str | None) -> str | None: """Datetime may be omitted. If 'latest' is provided, return today's date string.""" if dt is None or dt == "": return None if dt == "latest": return f"{get_today_date()}" return dt def _validate_query_argument(query: dict[str, Any] | None) -> dict[str, Any] | None: """Query is optional for estimate; return as-is (may be None).""" return query def _validate_bbox_argument(bbox: list[float] | None) -> list[float] | None: """Validate bbox argument. BBox is optional for many STAC queries; if omitted, return None. If provided, it must be a sequence of four floats [minx, miny, maxx, maxy]. """ if bbox is None: return None bbox_len = 4 # Accept any sequence of length 4 if isinstance(bbox, (list, tuple)) and len(bbox) == bbox_len: return list(bbox) msg = ( "Invalid bbox argument; must be a list of four floats: [minx, miny, maxx, maxy]" ) raise ValueError(msg) def _validate_aoi_geojson_argument( aoi_geojson: dict[str, Any] | None, ) -> dict[str, Any] | None: """AOI GeoJSON is optional; return as-is (may be None).""" return aoi_geojson
  • Core size estimation method in STACClient invoked by the handler. Supports advanced odc.stac/xarray loading or metadata/HEAD fallback, including sensor-native dtype corrections.
    def estimate_data_size( self, collections: list[str] | None = None, bbox: list[float] | None = None, datetime: str | None = None, query: dict[str, Any] | None = None, aoi_geojson: dict[str, Any] | None = None, limit: int = 10, force_metadata_only: bool = False, ) -> dict[str, Any]: """Simplified estimator: prefer odc.stac + xarray to compute eager size. This implementation is intentionally minimal: when odc.stac and xarray are available it loads the matching items into an xarray.Dataset and computes an eager size estimate by summing the number of elements across data variables. For simplicity we assume 1 byte per element (ignore dtype/itemsize). If optional libraries are missing we return a helpful message. """ # Retrieve matching items (pystac.Item objects). Keep this small and # deterministic. The underlying search may return more items than the # requested `limit` due to provider behavior or cached results, so # enforce truncation here before any expensive work (odc.stac.load). items = self._cached_search( collections=collections, bbox=bbox, datetime=datetime, query=query, limit=limit, ) # Respect the caller-provided limit strictly. if limit and limit > 0 and len(items) > limit: items = items[:limit] if not items: return { "item_count": 0, "estimated_size_bytes": 0, "estimated_size_mb": 0, "estimated_size_gb": 0, "bbox_used": bbox, "temporal_extent": datetime, "collections": collections or [], "clipped_to_aoi": bool(aoi_geojson), "message": "No items found for the given query parameters", } # If the optional odc.stac/xarray path is not available and the # caller did not request metadata-only behaviour, return a helpful # message explaining how to enable dataset-based estimates. If the # caller requested `force_metadata_only=True` we skip this early # return and fall back to metadata/HEAD aggregation below. if not ODC_STAC_AVAILABLE and not force_metadata_only: return { "item_count": len(items), "estimated_size_bytes": 0, "estimated_size_mb": 0, "estimated_size_gb": 0, "bbox_used": bbox, "temporal_extent": datetime, "collections": collections or [getattr(item, "collection_id", None) for item in items], "clipped_to_aoi": bool(aoi_geojson), "message": ( "odc.stac/xarray not available; install 'odc-stac' " "and 'xarray' to enable dataset-based estimates" ), } # If the caller requested metadata-only behaviour, skip the odc/xarray # eager load path and jump straight to the metadata/HEAD fallback # implemented later in this function. if not force_metadata_only: # Try to perform a per-item odc.stac load and compute sizes. try: import xarray as xr # type: ignore[import] # noqa: PLC0415 from odc.stac import ( # noqa: PLC0415 load as _odc_load, # type: ignore[import] ) # Load items one-by-one so sensor registry overrides can be # applied per-item and to keep memory usage predictable. total_bytes = 0 sensor_native_total_bytes = 0 data_variables: list[dict[str, Any]] = [] dtype_registry = SensorDtypeRegistry() for item in items: ds_item = _odc_load([item], chunks={}) if not isinstance(ds_item, xr.Dataset): continue collection_id = getattr(item, "collection_id", None) sensor_info = dtype_registry.get_info(collection_id) for name, da in ds_item.data_vars.items(): try: shape = tuple(int(s) for s in getattr(da, "shape", ())) elems = 1 for s in shape: elems *= s underlying = getattr(da, "data", None) nbytes = getattr(underlying, "nbytes", None) method = "computed" override_applied = False if nbytes is not None: size_bytes = int(nbytes) method = "nbytes" # Softer heuristic: if the reported dtype is a # floating type but the sensor registry suggests # an integer native dtype for this asset, produce # both reported and registry-corrected sizes so # callers can see both views and choose. try: import numpy as np # type: ignore[import] # noqa: PLC0415 - guarded import reported_dtype = getattr(da, "dtype", None) override_dtype = None if sensor_info is not None: try: override_dtype = ( sensor_info.get_dtype_for_asset(name) ) except (AttributeError, TypeError, ValueError): override_dtype = None # Only consider registry correction when the # reported dtype is a float and the registry # suggests an integer dtype for this asset. if ( reported_dtype is not None and hasattr(reported_dtype, "kind") and reported_dtype.kind == "f" and override_dtype is not None and np.issubdtype(override_dtype, np.integer) ): # Compute registry-corrected bytes (no # side-effects on total_bytes; we keep the # estimator's numeric total based on what # xarray reports unless a caller requests # otherwise). try: sensor_itemsize = int( np.dtype(override_dtype).itemsize ) except (TypeError, ValueError): sensor_itemsize = 1 sensor_native_bytes = int( elems * sensor_itemsize ) else: sensor_native_bytes = None except (ImportError, ModuleNotFoundError): # If numpy missing, skip registry check. sensor_native_bytes = None else: override_dtype = None if sensor_info is not None: try: override_dtype = ( sensor_info.get_dtype_for_asset(name) ) except (AttributeError, TypeError, ValueError): override_dtype = None dtype = getattr(da, "dtype", None) if override_dtype is not None: dtype = override_dtype override_applied = True itemsize = getattr(dtype, "itemsize", None) if itemsize is None: try: import numpy as np # type: ignore[import] # noqa: PLC0415 - guarded import itemsize = ( np.dtype(dtype).itemsize if dtype is not None else 1 ) except ( ImportError, ModuleNotFoundError, TypeError, ValueError, ): itemsize = 1 size_bytes = int(elems * int(itemsize)) total_bytes += size_bytes # sensor_native_total accumulates the sensor-native # bytes when available; otherwise fall back to the # reported/computed size_bytes so the sensor-native # total is a complete estimate. if ( "sensor_native_bytes" in locals() and sensor_native_bytes is not None ): sensor_native_total_bytes += int(sensor_native_bytes) else: sensor_native_total_bytes += int(size_bytes) var_entry: dict[str, Any] = { "variable": name, "shape": shape, "elements": elems, "estimated_bytes": int(size_bytes), "dtype": str(getattr(da, "dtype", None)), "method": method, "override_applied": bool(override_applied), } # If we computed a registry-corrected bytes value # for float->integer recommendations, include both # values so callers can inspect and choose. if ( "sensor_native_bytes" in locals() and sensor_native_bytes is not None ): var_entry["reported_bytes"] = int(size_bytes) var_entry["sensor_native_bytes"] = int( sensor_native_bytes ) var_entry["sensor_native_dtype"] = str(override_dtype) # Recommend the sensor-native value for # storage/instrument-native use-cases but do not # change the estimator total by default. var_entry["recommended"] = "sensor_native" var_entry["sensor_native_recommended"] = True data_variables.append(var_entry) except Exception as exc: # noqa: BLE001 - defensive skip # Skip variables we cannot introspect but emit a # debug-level message so failures are visible in # debugging runs while avoiding noisy user logs. logger.debug( "Skipping variable %s due to error: %s", name, exc ) continue estimated_mb = total_bytes / (1024 * 1024) estimated_gb = total_bytes / (1024 * 1024 * 1024) sensor_native_estimated_mb = sensor_native_total_bytes / (1024 * 1024) sensor_native_estimated_gb = sensor_native_total_bytes / ( 1024 * 1024 * 1024 ) # Summarize how many variables reported native .nbytes and how # many have a sensor-native alternative included. reported_nbytes_count = sum( 1 for v in data_variables if v.get("method") == "nbytes" ) sensor_native_corrections_count = sum( 1 for v in data_variables if v.get("sensor_native_bytes") is not None ) parts = [ "Estimated sizes computed using odc.stac/xarray.", f"Numeric total uses .data.nbytes: {int(total_bytes)} bytes", f"(~{round(estimated_gb, 4)} GB);", "sensor-native total (instrument-native) is", f"{int(sensor_native_total_bytes)} bytes", f"(~{round(sensor_native_estimated_gb, 4)} GB).", f"Reported .data.nbytes count: {reported_nbytes_count};", f"Sensor-native corrections: {sensor_native_corrections_count}.", ] message = " ".join(parts) return { "item_count": len(items), "estimated_size_bytes": int(total_bytes), "estimated_size_mb": round(estimated_mb, 2), "estimated_size_gb": round(estimated_gb, 4), "sensor_native_estimated_size_bytes": int( sensor_native_total_bytes ), "sensor_native_estimated_size_mb": round( sensor_native_estimated_mb, 2 ), "sensor_native_estimated_size_gb": round( sensor_native_estimated_gb, 4 ), "bbox_used": bbox, "temporal_extent": datetime, "collections": collections or [getattr(item, "collection_id", None) for item in items], "clipped_to_aoi": bool(aoi_geojson), "data_variables": data_variables, "message": message, } except Exception: # pragma: no cover - best-effort # odc may fail when tests pass in lightweight objects; log and # fall back to metadata/HEAD-based aggregation below. logger.exception("odc.stac eager estimate failed") # Fallback estimator: aggregate sizes from asset metadata (file:size) # and, when missing, use HEAD requests to probe Content-Length. This # path is exercised by unit tests and serves as a robust fallback # when odc/xarray-based introspection is unavailable or fails. total_bytes = 0 assets_analyzed: list[dict[str, Any]] = [] hrefs_to_probe: list[str] = [] for item in items: # Accept both dict-like and object items (tests use MagicMock) assets = getattr(item, "assets", None) or {} # assets may be a dict of asset objects or dicts for name, asset in assets.items() if isinstance(assets, dict) else []: try: a = self._asset_to_dict(asset) # First, try metadata-based size hints meta_size = self._size_from_metadata(a) if meta_size is not None: assets_analyzed.append( { "asset": name, "href": a.get("href"), "method": "metadata", "size": int(meta_size), } ) total_bytes += int(meta_size) continue # If no metadata size, and we have an href, queue for HEAD href = a.get("href") if href: hrefs_to_probe.append(href) assets_analyzed.append( { "asset": name, "href": href, "method": "head", "size": None, } ) continue # Otherwise we couldn't analyze this asset assets_analyzed.append( {"asset": name, "href": None, "method": "failed", "size": None} ) except (AttributeError, TypeError, ValueError) as exc: logger.debug("Failed to normalize asset %s: %s", name, exc) assets_analyzed.append( {"asset": name, "href": None, "method": "failed", "size": None} ) # Probe hrefs in parallel (HEAD requests). _parallel_head_content_lengths # returns a mapping href -> size | None. if hrefs_to_probe: try: head_results = self._parallel_head_content_lengths(hrefs_to_probe) except Exception as exc: # pragma: no cover - defensive # noqa: BLE001 logger.debug("HEAD probing failed: %s", exc) head_results = dict.fromkeys(hrefs_to_probe) # Fill in sizes for analyzed assets for a in assets_analyzed: if a.get("method") == "head" and a.get("href"): size = head_results.get(a["href"]) if size is None: a["method"] = "failed" a["size"] = None else: a["size"] = int(size) total_bytes += int(size) estimated_mb = total_bytes / (1024 * 1024) estimated_gb = total_bytes / (1024 * 1024 * 1024) if total_bytes else 0 message = ( "Estimated sizes computed using metadata/HEAD fallback. " f"Total (metadata+HEAD) is {int(total_bytes)} bytes " f"(~{round(estimated_gb, 4)} GB)." ) return { "item_count": len(items), "estimated_size_bytes": int(total_bytes), "estimated_size_mb": round(estimated_mb, 2), "estimated_size_gb": round(estimated_gb, 4), "bbox_used": bbox, "temporal_extent": datetime, "collections": collections or [getattr(item, "collection_id", None) for item in items], "clipped_to_aoi": bool(aoi_geojson), "assets_analyzed": assets_analyzed, "message": message, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BnJam/stac-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server