"""
Microsoft Planetary Computer integration.
Provides free access to satellite imagery through STAC API.
This is the recommended data source for getting started.
"""
import asyncio
from datetime import datetime
from typing import Dict, List, Optional, Any
import httpx
import numpy as np
import structlog
from geosight.utils.geo import BoundingBox
logger = structlog.get_logger(__name__)
# Planetary Computer STAC API endpoint
PC_STAC_URL = "https://planetarycomputer.microsoft.com/api/stac/v1"
# Collection IDs
COLLECTIONS = {
"sentinel-2": "sentinel-2-l2a",
"landsat-8": "landsat-c2-l2",
"landsat-9": "landsat-c2-l2",
"sentinel-1": "sentinel-1-rtc",
}
class PlanetaryComputerService:
"""
Service for accessing Microsoft Planetary Computer data.
Uses the free STAC API - no API key required for search,
but signing is needed for data download.
"""
def __init__(self):
self._client = None
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client."""
if self._client is None:
self._client = httpx.AsyncClient(timeout=30.0)
return self._client
async def search_imagery(
self,
latitude: float,
longitude: float,
start_date: datetime,
end_date: datetime,
collection: str = "sentinel-2",
max_cloud_cover: float = 20.0,
limit: int = 20,
) -> Dict[str, Any]:
"""
Search for available imagery using STAC API.
Args:
latitude: Center latitude
longitude: Center longitude
start_date: Start of time range
end_date: End of time range
collection: Data collection name
max_cloud_cover: Maximum cloud cover percentage
limit: Maximum number of results
Returns:
Dictionary with search results
"""
client = await self._get_client()
# Map to PC collection ID
pc_collection = COLLECTIONS.get(collection, "sentinel-2-l2a")
# Build STAC search query
search_body = {
"collections": [pc_collection],
"intersects": {
"type": "Point",
"coordinates": [longitude, latitude],
},
"datetime": f"{start_date.isoformat()}Z/{end_date.isoformat()}Z",
"limit": limit,
}
# Add cloud cover filter for optical data
if "sentinel-2" in collection or "landsat" in collection:
search_body["query"] = {
"eo:cloud_cover": {"lt": max_cloud_cover}
}
try:
response = await client.post(
f"{PC_STAC_URL}/search",
json=search_body,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
data = response.json()
# Format results
items = []
for feature in data.get("features", []):
props = feature.get("properties", {})
items.append({
"id": feature.get("id"),
"datetime": props.get("datetime"),
"cloud_cover": props.get("eo:cloud_cover"),
"collection": collection,
"bbox": feature.get("bbox"),
"assets": list(feature.get("assets", {}).keys()),
})
# Sort by cloud cover
items.sort(key=lambda x: x.get("cloud_cover") or 100)
logger.info(
"pc_search_complete",
collection=collection,
results=len(items),
)
return {"items": items}
except httpx.HTTPError as e:
logger.error("pc_search_error", error=str(e))
return {"items": [], "error": str(e)}
async def get_item(self, collection: str, item_id: str) -> Optional[Dict]:
"""
Get a specific STAC item.
Args:
collection: Collection ID
item_id: Item ID
Returns:
STAC item dictionary
"""
client = await self._get_client()
pc_collection = COLLECTIONS.get(collection, collection)
try:
response = await client.get(
f"{PC_STAC_URL}/collections/{pc_collection}/items/{item_id}"
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.error("pc_get_item_error", item_id=item_id, error=str(e))
return None
async def sign_url(self, url: str) -> str:
"""
Sign a Planetary Computer URL for download access.
Args:
url: Asset URL to sign
Returns:
Signed URL
"""
try:
import planetary_computer as pc
return pc.sign(url)
except ImportError:
logger.warning("planetary_computer_not_installed")
return url
except Exception as e:
logger.error("pc_sign_error", error=str(e))
return url
async def download_asset(
self,
item: Dict,
asset_key: str,
bbox: Optional[BoundingBox] = None,
) -> Optional[np.ndarray]:
"""
Download an asset from a STAC item.
Args:
item: STAC item dictionary
asset_key: Asset key to download
bbox: Optional bounding box to clip
Returns:
Numpy array with asset data
"""
assets = item.get("assets", {})
if asset_key not in assets:
logger.error("asset_not_found", asset=asset_key)
return None
asset_url = assets[asset_key].get("href")
if not asset_url:
return None
# Sign the URL
signed_url = await self.sign_url(asset_url)
try:
import rasterio
from rasterio.windows import from_bounds
with rasterio.open(signed_url) as src:
if bbox:
# Clip to bounding box
window = from_bounds(
bbox.west, bbox.south, bbox.east, bbox.north,
src.transform
)
data = src.read(1, window=window)
else:
data = src.read(1)
return data
except ImportError:
logger.warning("rasterio_not_installed")
return None
except Exception as e:
logger.error("download_error", error=str(e))
return None
async def get_rgb_preview(
self,
latitude: float,
longitude: float,
date: str,
radius_km: float = 10,
) -> Optional[np.ndarray]:
"""
Get RGB preview image for a location.
Args:
latitude: Center latitude
longitude: Center longitude
date: Target date (YYYY-MM-DD)
radius_km: Radius in kilometers
Returns:
RGB numpy array
"""
# Search for imagery
start = datetime.strptime(date, "%Y-%m-%d")
end = start
results = await self.search_imagery(
latitude=latitude,
longitude=longitude,
start_date=start,
end_date=end,
limit=1,
)
if not results.get("items"):
return None
item = results["items"][0]
# Get the visual preview if available
full_item = await self.get_item("sentinel-2", item["id"])
if not full_item:
return None
assets = full_item.get("assets", {})
# Try rendered preview first
if "rendered_preview" in assets:
return await self.download_asset(full_item, "rendered_preview")
# Fall back to visual
if "visual" in assets:
return await self.download_asset(full_item, "visual")
return None
async def close(self):
"""Close the HTTP client."""
if self._client:
await self._client.aclose()
self._client = None