Skip to main content
Glama
satellite_imagery.py12.2 kB
import json import logging from pathlib import Path from typing import Optional, Dict, Any, List, Union from ..mcp import gis_mcp from pystac_client import Client import planetary_computer as pc import rasterio from rasterio.transform import Affine from rasterio.windows import from_bounds as window_from_bounds from rasterio.mask import mask as rio_mask from shapely.geometry import box as shapely_box, shape as shapely_shape, mapping as shapely_mapping from pyproj import Transformer from shapely.ops import transform as shapely_transform logger = logging.getLogger(__name__) DEFAULT_PATH = Path(__file__).resolve().parent / "satellite_imagery" MPC_STAC_URL = "https://planetarycomputer.microsoft.com/api/stac/v1" def _parse_bbox(bbox_str: Optional[str]) -> Optional[List[float]]: """ Parse "minx,miny,maxx,maxy" -> [minx, miny, maxx, maxy] """ if not bbox_str: return None parts = [p.strip() for p in bbox_str.split(",")] if len(parts) != 4: raise ValueError("bbox must be 'minx,miny,maxx,maxy'") return [float(v) for v in parts] def _ensure_dir(p: Optional[str]) -> Path: out_dir = Path(p) if p else DEFAULT_PATH out_dir.mkdir(parents=True, exist_ok=True) return out_dir def _pick_item(collection: str, bbox: Optional[List[float]], datetime: str, cloud_cover: Optional[int], intersects: Optional[dict] = None): catalog = Client.open(MPC_STAC_URL) def _search(dt, cc): kwargs = {"collections": [collection], "datetime": dt} if intersects is not None: kwargs["intersects"] = intersects elif bbox: kwargs["bbox"] = bbox if cc is not None: kwargs["query"] = {"eo:cloud_cover": {"lt": cc}} return list(catalog.search(**kwargs).items()) items = _search(datetime, cloud_cover) if not items and cloud_cover is not None: items = _search(datetime, None) if not items: try: start, end = datetime.split("/") items = _search(f"{start}/{end}", None) if not items: items = _search("2018-01-01/2100-01-01", None) except Exception: items = _search("2018-01-01/2100-01-01", None) if not items: raise RuntimeError("No items found. Try widening date range or removing cloud filter.") items.sort(key=lambda it: it.properties.get("eo:cloud_cover", 1000)) return items[0] def _read_and_optionally_clip( href: str, bbox: Optional[List[float]], geometry_geojson: Optional[dict], out_crs: Optional[str] ): """ Read a single-band COG from a (signed) href, with robust bbox/geometry handling: - If bbox/geometry are assumed WGS84 (typical), they are reprojected to the asset CRS for cropping. - Protects against empty windows (0x0) and raises a helpful error instead of cryptic rasterio errors. - Optional reprojection to out_crs after cropping. """ signed = pc.sign(href) with rasterio.Env(): with rasterio.open(signed) as src: src_crs = src.crs if src_crs is None: raise RuntimeError("Source asset has no CRS; cannot crop safely.") roi_geom_src = None if geometry_geojson: roi_geom_src = _project_geojson_to(src_crs, geometry_geojson) elif bbox: bbox_geom_wgs84 = shapely_box(*bbox) roi_geom_src = _project_geometry_to(src_crs, bbox_geom_wgs84, from_crs="EPSG:4326") if roi_geom_src: roi_bounds = shapely_shape(roi_geom_src).bounds if not _bounds_intersect(roi_bounds, src.bounds): raise RuntimeError( f"ROI does not intersect asset. ROI(bounds)={roi_bounds}, asset(bounds)={src.bounds}" ) data, transform = rio_mask(src, [roi_geom_src], crop=True, nodata=src.nodata) if data.ndim == 3: data = data[0] if data.size == 0 or data.shape[0] == 0 or data.shape[1] == 0: raise RuntimeError("Crop produced an empty window (0x0). Check ROI and CRS.") profile = src.profile.copy() profile.update({ "height": data.shape[0], "width": data.shape[1], "transform": transform, "count": 1, "crs": src_crs }) else: data = src.read(1) profile = src.profile.copy() profile.update({"count": 1}) if data.size == 0: raise RuntimeError("Read returned empty data unexpectedly.") if out_crs and str(out_crs) != str(profile["crs"]): from rasterio.vrt import WarpedVRT from rasterio.warp import calculate_default_transform transform_out, width_out, height_out = calculate_default_transform( src_crs, out_crs, src.width, src.height, *src.bounds ) with WarpedVRT(src, crs=out_crs, transform=transform_out, width=width_out, height=height_out) as vrt: if roi_geom_src: roi_geom_out = _project_geojson_to( out_crs, shapely_mapping(shapely_shape(roi_geom_src)), from_crs=str(src_crs) ) data, transform2 = rio_mask(vrt, [roi_geom_out], crop=True, nodata=vrt.nodata) data = data[0] if data.ndim == 3 else data transform = transform2 else: data = vrt.read(1) transform = vrt.transform if data.size == 0 or data.shape[0] == 0 or data.shape[1] == 0: raise RuntimeError("Reprojected read produced empty data (0x0). Check ROI and CRS.") profile.update({ "crs": out_crs, "transform": transform, "width": data.shape[1], "height": data.shape[0] }) return data, profile def _project_geojson_to(to_crs, geom_geojson, from_crs="EPSG:4326"): geom = shapely_shape(geom_geojson) return shapely_mapping(_project_geometry_to(to_crs, geom, from_crs=from_crs)) def _project_geometry_to(to_crs, geom, from_crs="EPSG:4326"): """Project a Shapely geometry from `from_crs` to `to_crs`.""" transformer = Transformer.from_crs(from_crs, to_crs, always_xy=True) return shapely_transform(lambda x, y: transformer.transform(x, y), geom) def _bounds_intersect(a, b): ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b return not (ax2 <= bx1 or bx2 <= ax1 or ay2 <= by1 or by2 <= ay1) def _write_multiband_geotiff(out_path: Path, bands: List[Any], profiles: List[Dict[str, Any]], dtype: Optional[str] = None): """ Write stacked bands into a single GeoTIFF using the first profile as a template. Assumes bands are aligned and same shape. """ if not bands: raise RuntimeError("No bands to write.") base_profile = profiles[0].copy() base_profile.update( driver="GTiff", count=len(bands), compress="deflate", tiled=True, bigtiff="IF_SAFER" ) if dtype: base_profile["dtype"] = dtype h0, w0 = bands[0].shape for i, b in enumerate(bands): if b.shape != (h0, w0): raise RuntimeError(f"Band shape mismatch at index {i}: {b.shape} vs {(h0, w0)}") with rasterio.open(out_path, "w", **base_profile) as dst: for i, b in enumerate(bands, start=1): dst.write(b, i) @gis_mcp.resource("gis://operations/satellite_imagery") def get_satellite_operations() -> dict: """List available satellite imagery operations.""" return {"operations": ["download_satellite_imagery"]} @gis_mcp.tool() def download_satellite_imagery( collection: str = "sentinel-2-l2a", assets: Union[List[str], str] = ("B04", "B03", "B02"), datetime: str = "2024-01-01/2024-12-31", cloud_cover_lt: Optional[int] = 20, bbox: Optional[str] = None, geometry_geojson: Optional[str] = None, out_crs: Optional[str] = None, filename: Optional[str] = None, path: Optional[str] = None ) -> Dict[str, Any]: """ Download analysis-ready satellite imagery from Microsoft Planetary Computer (STAC + SAS). - Picks the least-cloudy item matching your query (by default Sentinel-2 L2A). - Downloads specified asset bands and writes a multi-band GeoTIFF. - Optional bbox/geometry crop and CRS reprojection. Args: collection: STAC collection id (e.g., "sentinel-2-l2a", "landsat-8-c2-l2") assets: list of asset keys to download (e.g., ["B04","B03","B02"]) datetime: STAC datetime/interval (e.g., "2025-08-01/2025-08-31" or "2025-08-05") cloud_cover_lt: only items with eo:cloud_cover < this value. Use None to disable. bbox: "minx,miny,maxx,maxy" (in degrees if using search with WGS84). Used for cropping window. geometry_geojson: a GeoJSON geometry (string). If provided, precise clipping is applied. out_crs: target CRS for output (e.g., "EPSG:4326"). If None, keep source asset CRS. filename: output file name (without path). If None, an automatic name is generated. path: output folder. Defaults to ./data/satellite_imagery Returns: {"status": "success", "file_path": "...", "item_id": "...", "collection": "...", "assets": [...], "properties": {...}} or {"status": "error", "message": "..."} """ try: if isinstance(assets, str): assets = [a.strip() for a in assets.split(",") if a.strip()] out_dir = _ensure_dir(path) bbox_vals = _parse_bbox(bbox) if bbox else None geom_obj = json.loads(geometry_geojson) if geometry_geojson else None item = _pick_item( collection=collection, bbox=bbox_vals, datetime=datetime, cloud_cover=cloud_cover_lt, intersects=geom_obj ) if geom_obj and not bbox_vals: minx, miny, maxx, maxy = shapely_shape(geom_obj).bounds bbox_vals = [minx, miny, maxx, maxy] bands = [] profiles = [] missing_assets = [] for asset_key in assets: if asset_key not in item.assets: missing_assets.append(asset_key) continue href = item.assets[asset_key].href data, profile = _read_and_optionally_clip( href=href, bbox=bbox_vals, geometry_geojson=geom_obj, out_crs=out_crs ) bands.append(data) profiles.append(profile) if missing_assets: logger.warning("Missing assets in item %s: %s", item.id, ", ".join(missing_assets)) if not bands: raise RuntimeError(f"Requested assets not available in the chosen item: {missing_assets}") if not filename: safe_assets = "-".join([a.lower() for a in assets if a not in missing_assets]) or "bands" filename = f"{collection}_{item.id}_{safe_assets}.tif" out_path = out_dir / filename dtype = profiles[0].get("dtype") _write_multiband_geotiff(out_path=out_path, bands=bands, profiles=profiles, dtype=dtype) logger.info("Saved satellite imagery (%s) to %s", assets, out_path) return { "status": "success", "file_path": str(out_path), "item_id": item.id, "collection": collection, "assets": [a for a in assets if a not in missing_assets], "missing_assets": missing_assets, "properties": item.properties, } except Exception as e: logger.exception("Failed to download satellite imagery") return {"status": "error", "message": str(e)}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mahdin75/gis-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server