"""
Sentinel Hub API integration.
Provides access to Sentinel-2, Sentinel-1, and Landsat data
through the Sentinel Hub services.
"""
import asyncio
from datetime import datetime
from typing import Dict, List, Optional, Literal, Any
import numpy as np
import structlog
from geosight.config import settings
from geosight.utils.geo import BoundingBox
from geosight.utils.cache import cached
logger = structlog.get_logger(__name__)
# Evalscripts for different calculations
EVALSCRIPTS = {
"true_color": """
//VERSION=3
function setup() {
return {
input: ["B04", "B03", "B02"],
output: { bands: 3 }
};
}
function evaluatePixel(sample) {
return [sample.B04 * 2.5, sample.B03 * 2.5, sample.B02 * 2.5];
}
""",
"ndvi": """
//VERSION=3
function setup() {
return {
input: ["B04", "B08"],
output: { bands: 1, sampleType: "FLOAT32" }
};
}
function evaluatePixel(sample) {
let ndvi = (sample.B08 - sample.B04) / (sample.B08 + sample.B04);
return [ndvi];
}
""",
"ndwi": """
//VERSION=3
function setup() {
return {
input: ["B03", "B08"],
output: { bands: 1, sampleType: "FLOAT32" }
};
}
function evaluatePixel(sample) {
let ndwi = (sample.B03 - sample.B08) / (sample.B03 + sample.B08);
return [ndwi];
}
""",
"ndbi": """
//VERSION=3
function setup() {
return {
input: ["B08", "B11"],
output: { bands: 1, sampleType: "FLOAT32" }
};
}
function evaluatePixel(sample) {
let ndbi = (sample.B11 - sample.B08) / (sample.B11 + sample.B08);
return [ndbi];
}
""",
"all_bands": """
//VERSION=3
function setup() {
return {
input: ["B02", "B03", "B04", "B08", "B11", "B12"],
output: { bands: 6, sampleType: "FLOAT32" }
};
}
function evaluatePixel(sample) {
return [sample.B02, sample.B03, sample.B04, sample.B08, sample.B11, sample.B12];
}
""",
}
class SentinelHubService:
"""
Service for accessing Sentinel Hub API.
Provides methods for:
- Searching available imagery
- Downloading imagery
- Calculating spectral indices
"""
def __init__(self):
self._config = settings.sentinel_hub
self._client = None
self._initialized = False
async def _ensure_initialized(self):
"""Initialize Sentinel Hub client if needed."""
if self._initialized:
return
if not self._config.is_configured:
logger.warning("sentinel_hub_not_configured")
return
try:
from sentinelhub import (
SHConfig,
SentinelHubRequest,
DataCollection,
MimeType,
BBox as SHBBox,
CRS,
)
# Configure credentials
config = SHConfig()
config.sh_client_id = self._config.client_id
config.sh_client_secret = self._config.client_secret
if self._config.instance_id:
config.instance_id = self._config.instance_id
self._sh_config = config
self._initialized = True
logger.info("sentinel_hub_initialized")
except ImportError:
logger.warning("sentinelhub_not_installed")
except Exception as e:
logger.error("sentinel_hub_init_error", error=str(e))
async def search_imagery(
self,
bbox: BoundingBox,
start_date: datetime,
end_date: datetime,
collection: str = "sentinel-2",
max_cloud_cover: float = 20.0,
) -> Dict[str, Any]:
"""
Search for available imagery in a region.
Args:
bbox: Bounding box for search area
start_date: Start of time range
end_date: End of time range
collection: Data collection name
max_cloud_cover: Maximum cloud cover percentage
Returns:
Dictionary with search results
"""
await self._ensure_initialized()
if not self._initialized:
logger.warning("using_mock_search_results")
return self._mock_search_results(start_date, end_date)
try:
from sentinelhub import (
SentinelHubCatalog,
DataCollection,
BBox as SHBBox,
CRS,
)
# Map collection names
collection_map = {
"sentinel-2": DataCollection.SENTINEL2_L2A,
"sentinel-1": DataCollection.SENTINEL1_IW,
"landsat-8": DataCollection.LANDSAT_OT_L2,
"landsat-9": DataCollection.LANDSAT_OT_L2,
}
data_collection = collection_map.get(collection, DataCollection.SENTINEL2_L2A)
# Create catalog client
catalog = SentinelHubCatalog(config=self._sh_config)
# Create bounding box
sh_bbox = SHBBox(bbox.to_tuple(), crs=CRS.WGS84)
# Search
search_results = list(catalog.search(
data_collection,
bbox=sh_bbox,
time=(start_date.isoformat(), end_date.isoformat()),
query={"eo:cloud_cover": {"lt": max_cloud_cover}},
))
# Format results
items = []
for result in search_results:
items.append({
"id": result.get("id"),
"datetime": result.get("properties", {}).get("datetime"),
"cloud_cover": result.get("properties", {}).get("eo:cloud_cover"),
"collection": collection,
})
return {"items": items}
except Exception as e:
logger.error("search_error", error=str(e))
return self._mock_search_results(start_date, end_date)
async def get_imagery(
self,
bbox: BoundingBox,
date: str,
bands: List[str] = None,
resolution: int = 10,
) -> Optional[np.ndarray]:
"""
Download imagery for a region.
Args:
bbox: Bounding box
date: Target date (YYYY-MM-DD)
bands: List of bands to download
resolution: Resolution in meters
Returns:
Numpy array with imagery data
"""
await self._ensure_initialized()
if not self._initialized:
return None
try:
from sentinelhub import (
SentinelHubRequest,
DataCollection,
MimeType,
BBox as SHBBox,
CRS,
)
sh_bbox = SHBBox(bbox.to_tuple(), crs=CRS.WGS84)
# Calculate size based on bbox and resolution
width_m = bbox.width_deg * 111000 * np.cos(np.radians(bbox.center[0]))
height_m = bbox.height_deg * 111000
size = (int(width_m / resolution), int(height_m / resolution))
size = (min(2500, size[0]), min(2500, size[1]))
request = SentinelHubRequest(
evalscript=EVALSCRIPTS["all_bands"],
input_data=[
SentinelHubRequest.input_data(
data_collection=DataCollection.SENTINEL2_L2A,
time_interval=(date, date),
)
],
responses=[
SentinelHubRequest.output_response("default", MimeType.TIFF)
],
bbox=sh_bbox,
size=size,
config=self._sh_config,
)
data = request.get_data()[0]
return data
except Exception as e:
logger.error("get_imagery_error", error=str(e))
return None
async def calculate_index(
self,
bbox: BoundingBox,
start_date: str,
end_date: str,
index_type: Literal["ndvi", "ndwi", "ndbi"] = "ndvi",
resolution: int = 10,
) -> Optional[np.ndarray]:
"""
Calculate a spectral index for a region.
Args:
bbox: Bounding box
start_date: Start date
end_date: End date
index_type: Type of index to calculate
resolution: Output resolution in meters
Returns:
2D numpy array with index values
"""
await self._ensure_initialized()
if not self._initialized:
logger.info("using_demo_data", index=index_type)
return None
try:
from sentinelhub import (
SentinelHubRequest,
DataCollection,
MimeType,
BBox as SHBBox,
CRS,
)
if index_type not in EVALSCRIPTS:
raise ValueError(f"Unknown index type: {index_type}")
sh_bbox = SHBBox(bbox.to_tuple(), crs=CRS.WGS84)
# Calculate size
width_m = bbox.width_deg * 111000 * np.cos(np.radians(bbox.center[0]))
height_m = bbox.height_deg * 111000
size = (int(width_m / resolution), int(height_m / resolution))
size = (min(2500, size[0]), min(2500, size[1]))
request = SentinelHubRequest(
evalscript=EVALSCRIPTS[index_type],
input_data=[
SentinelHubRequest.input_data(
data_collection=DataCollection.SENTINEL2_L2A,
time_interval=(start_date, end_date),
mosaicking_order="leastCC",
)
],
responses=[
SentinelHubRequest.output_response("default", MimeType.TIFF)
],
bbox=sh_bbox,
size=size,
config=self._sh_config,
)
data = request.get_data()[0]
# Handle multi-dimensional output
if data.ndim == 3:
data = data[:, :, 0]
return data.astype(np.float32)
except Exception as e:
logger.error("calculate_index_error", error=str(e), index=index_type)
return None
def _mock_search_results(
self,
start_date: datetime,
end_date: datetime,
) -> Dict[str, Any]:
"""Generate mock search results for demo purposes."""
from datetime import timedelta
import random
items = []
current = start_date
while current <= end_date:
# Add entry every ~5 days (Sentinel-2 revisit time)
items.append({
"id": f"S2A_{current.strftime('%Y%m%d')}",
"datetime": current.isoformat(),
"cloud_cover": random.uniform(0, 30),
"collection": "sentinel-2",
})
current += timedelta(days=random.randint(4, 6))
# Sort by cloud cover
items.sort(key=lambda x: x["cloud_cover"])
return {"items": items}