Skip to main content
Glama
sentinel_hub.py12 kB
""" Sentinel Hub API integration. Provides access to Sentinel-2, Sentinel-1, and Landsat data through the Sentinel Hub services. """ import asyncio from datetime import datetime from typing import Dict, List, Optional, Literal, Any import numpy as np import structlog from geosight.config import settings from geosight.utils.geo import BoundingBox from geosight.utils.cache import cached logger = structlog.get_logger(__name__) # Evalscripts for different calculations EVALSCRIPTS = { "true_color": """ //VERSION=3 function setup() { return { input: ["B04", "B03", "B02"], output: { bands: 3 } }; } function evaluatePixel(sample) { return [sample.B04 * 2.5, sample.B03 * 2.5, sample.B02 * 2.5]; } """, "ndvi": """ //VERSION=3 function setup() { return { input: ["B04", "B08"], output: { bands: 1, sampleType: "FLOAT32" } }; } function evaluatePixel(sample) { let ndvi = (sample.B08 - sample.B04) / (sample.B08 + sample.B04); return [ndvi]; } """, "ndwi": """ //VERSION=3 function setup() { return { input: ["B03", "B08"], output: { bands: 1, sampleType: "FLOAT32" } }; } function evaluatePixel(sample) { let ndwi = (sample.B03 - sample.B08) / (sample.B03 + sample.B08); return [ndwi]; } """, "ndbi": """ //VERSION=3 function setup() { return { input: ["B08", "B11"], output: { bands: 1, sampleType: "FLOAT32" } }; } function evaluatePixel(sample) { let ndbi = (sample.B11 - sample.B08) / (sample.B11 + sample.B08); return [ndbi]; } """, "all_bands": """ //VERSION=3 function setup() { return { input: ["B02", "B03", "B04", "B08", "B11", "B12"], output: { bands: 6, sampleType: "FLOAT32" } }; } function evaluatePixel(sample) { return [sample.B02, sample.B03, sample.B04, sample.B08, sample.B11, sample.B12]; } """, } class SentinelHubService: """ Service for accessing Sentinel Hub API. Provides methods for: - Searching available imagery - Downloading imagery - Calculating spectral indices """ def __init__(self): self._config = settings.sentinel_hub self._client = None self._initialized = False async def _ensure_initialized(self): """Initialize Sentinel Hub client if needed.""" if self._initialized: return if not self._config.is_configured: logger.warning("sentinel_hub_not_configured") return try: from sentinelhub import ( SHConfig, SentinelHubRequest, DataCollection, MimeType, BBox as SHBBox, CRS, ) # Configure credentials config = SHConfig() config.sh_client_id = self._config.client_id config.sh_client_secret = self._config.client_secret if self._config.instance_id: config.instance_id = self._config.instance_id self._sh_config = config self._initialized = True logger.info("sentinel_hub_initialized") except ImportError: logger.warning("sentinelhub_not_installed") except Exception as e: logger.error("sentinel_hub_init_error", error=str(e)) async def search_imagery( self, bbox: BoundingBox, start_date: datetime, end_date: datetime, collection: str = "sentinel-2", max_cloud_cover: float = 20.0, ) -> Dict[str, Any]: """ Search for available imagery in a region. Args: bbox: Bounding box for search area start_date: Start of time range end_date: End of time range collection: Data collection name max_cloud_cover: Maximum cloud cover percentage Returns: Dictionary with search results """ await self._ensure_initialized() if not self._initialized: logger.warning("using_mock_search_results") return self._mock_search_results(start_date, end_date) try: from sentinelhub import ( SentinelHubCatalog, DataCollection, BBox as SHBBox, CRS, ) # Map collection names collection_map = { "sentinel-2": DataCollection.SENTINEL2_L2A, "sentinel-1": DataCollection.SENTINEL1_IW, "landsat-8": DataCollection.LANDSAT_OT_L2, "landsat-9": DataCollection.LANDSAT_OT_L2, } data_collection = collection_map.get(collection, DataCollection.SENTINEL2_L2A) # Create catalog client catalog = SentinelHubCatalog(config=self._sh_config) # Create bounding box sh_bbox = SHBBox(bbox.to_tuple(), crs=CRS.WGS84) # Search search_results = list(catalog.search( data_collection, bbox=sh_bbox, time=(start_date.isoformat(), end_date.isoformat()), query={"eo:cloud_cover": {"lt": max_cloud_cover}}, )) # Format results items = [] for result in search_results: items.append({ "id": result.get("id"), "datetime": result.get("properties", {}).get("datetime"), "cloud_cover": result.get("properties", {}).get("eo:cloud_cover"), "collection": collection, }) return {"items": items} except Exception as e: logger.error("search_error", error=str(e)) return self._mock_search_results(start_date, end_date) async def get_imagery( self, bbox: BoundingBox, date: str, bands: List[str] = None, resolution: int = 10, ) -> Optional[np.ndarray]: """ Download imagery for a region. Args: bbox: Bounding box date: Target date (YYYY-MM-DD) bands: List of bands to download resolution: Resolution in meters Returns: Numpy array with imagery data """ await self._ensure_initialized() if not self._initialized: return None try: from sentinelhub import ( SentinelHubRequest, DataCollection, MimeType, BBox as SHBBox, CRS, ) sh_bbox = SHBBox(bbox.to_tuple(), crs=CRS.WGS84) # Calculate size based on bbox and resolution width_m = bbox.width_deg * 111000 * np.cos(np.radians(bbox.center[0])) height_m = bbox.height_deg * 111000 size = (int(width_m / resolution), int(height_m / resolution)) size = (min(2500, size[0]), min(2500, size[1])) request = SentinelHubRequest( evalscript=EVALSCRIPTS["all_bands"], input_data=[ SentinelHubRequest.input_data( data_collection=DataCollection.SENTINEL2_L2A, time_interval=(date, date), ) ], responses=[ SentinelHubRequest.output_response("default", MimeType.TIFF) ], bbox=sh_bbox, size=size, config=self._sh_config, ) data = request.get_data()[0] return data except Exception as e: logger.error("get_imagery_error", error=str(e)) return None async def calculate_index( self, bbox: BoundingBox, start_date: str, end_date: str, index_type: Literal["ndvi", "ndwi", "ndbi"] = "ndvi", resolution: int = 10, ) -> Optional[np.ndarray]: """ Calculate a spectral index for a region. Args: bbox: Bounding box start_date: Start date end_date: End date index_type: Type of index to calculate resolution: Output resolution in meters Returns: 2D numpy array with index values """ await self._ensure_initialized() if not self._initialized: logger.info("using_demo_data", index=index_type) return None try: from sentinelhub import ( SentinelHubRequest, DataCollection, MimeType, BBox as SHBBox, CRS, ) if index_type not in EVALSCRIPTS: raise ValueError(f"Unknown index type: {index_type}") sh_bbox = SHBBox(bbox.to_tuple(), crs=CRS.WGS84) # Calculate size width_m = bbox.width_deg * 111000 * np.cos(np.radians(bbox.center[0])) height_m = bbox.height_deg * 111000 size = (int(width_m / resolution), int(height_m / resolution)) size = (min(2500, size[0]), min(2500, size[1])) request = SentinelHubRequest( evalscript=EVALSCRIPTS[index_type], input_data=[ SentinelHubRequest.input_data( data_collection=DataCollection.SENTINEL2_L2A, time_interval=(start_date, end_date), mosaicking_order="leastCC", ) ], responses=[ SentinelHubRequest.output_response("default", MimeType.TIFF) ], bbox=sh_bbox, size=size, config=self._sh_config, ) data = request.get_data()[0] # Handle multi-dimensional output if data.ndim == 3: data = data[:, :, 0] return data.astype(np.float32) except Exception as e: logger.error("calculate_index_error", error=str(e), index=index_type) return None def _mock_search_results( self, start_date: datetime, end_date: datetime, ) -> Dict[str, Any]: """Generate mock search results for demo purposes.""" from datetime import timedelta import random items = [] current = start_date while current <= end_date: # Add entry every ~5 days (Sentinel-2 revisit time) items.append({ "id": f"S2A_{current.strftime('%Y%m%d')}", "datetime": current.isoformat(), "cloud_cover": random.uniform(0, 30), "collection": "sentinel-2", }) current += timedelta(days=random.randint(4, 6)) # Sort by cloud cover items.sort(key=lambda x: x["cloud_cover"]) return {"items": items}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/armaasinghn/geosight-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server