Skip to main content
Glama
indices.py15.9 kB
""" Spectral index calculation tools (NDVI, NDWI, NDBI). These indices are calculated from satellite imagery bands to analyze vegetation, water, and built-up areas. """ import asyncio import base64 from datetime import datetime from io import BytesIO from typing import Optional import numpy as np import structlog from PIL import Image from geosight.config import settings from geosight.tools.geocoding import resolve_location from geosight.services.sentinel_hub import SentinelHubService from geosight.utils.visualization import ( create_index_colormap, create_folium_map, array_to_base64_png, ) from geosight.utils.geo import create_bbox_from_point logger = structlog.get_logger(__name__) async def calculate_ndvi( start_date: str, end_date: str, latitude: Optional[float] = None, longitude: Optional[float] = None, location_name: Optional[str] = None, radius_km: float = 10.0, resolution: int = 10, ) -> dict: """ Calculate NDVI (Normalized Difference Vegetation Index). NDVI = (NIR - Red) / (NIR + Red) Values range from -1 to 1: - 0.6 to 1.0: Dense, healthy vegetation - 0.3 to 0.6: Moderate vegetation - 0.1 to 0.3: Sparse vegetation - -0.1 to 0.1: Bare soil - -1 to -0.1: Water, snow, clouds Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) latitude: Center latitude longitude: Center longitude location_name: Location name to geocode radius_km: Analysis radius in kilometers resolution: Output resolution in meters Returns: Dictionary with NDVI results, statistics, and visualization """ try: # Resolve location lat, lon, display_name = await resolve_location( location_name=location_name, latitude=latitude, longitude=longitude, ) logger.info( "calculating_ndvi", location=display_name, radius_km=radius_km, start_date=start_date, end_date=end_date, ) # Create bounding box bbox = create_bbox_from_point(lat, lon, radius_km) # Fetch imagery and calculate NDVI service = SentinelHubService() ndvi_data = await service.calculate_index( bbox=bbox, start_date=start_date, end_date=end_date, index_type="ndvi", resolution=resolution, ) if ndvi_data is None: # Generate demo data for demonstration ndvi_data = generate_demo_ndvi(radius_km, resolution) # Calculate statistics valid_data = ndvi_data[~np.isnan(ndvi_data)] stats = calculate_ndvi_statistics(valid_data) # Generate visualization ndvi_colored = create_index_colormap(ndvi_data, "ndvi") image_base64 = array_to_base64_png(ndvi_colored) # Generate interactive map map_html = create_folium_map( center=(lat, lon), overlay_data=ndvi_data, overlay_bounds=bbox, colormap="ndvi", title="NDVI Analysis", ) # Build interpretation interpretation = interpret_ndvi(stats) summary = f"""🌿 **NDVI Analysis Results** 📍 **Location:** {display_name} 📅 **Date Range:** {start_date} to {end_date} 📐 **Area:** {radius_km * 2} km × {radius_km * 2} km ({(radius_km * 2) ** 2:.1f} km²) {interpretation} **Vegetation Distribution:** • Dense vegetation (>0.6): {stats['dense_vegetation_pct']:.1f}% • Moderate vegetation (0.3-0.6): {stats['moderate_vegetation_pct']:.1f}% • Sparse vegetation (0.1-0.3): {stats['sparse_vegetation_pct']:.1f}% • Non-vegetation (<0.1): {stats['non_vegetation_pct']:.1f}% """ return { "summary": summary, "statistics": stats, "image_base64": image_base64, "image_mime_type": "image/png", "location": { "name": display_name, "latitude": lat, "longitude": lon, "radius_km": radius_km, }, "analysis_params": { "index": "NDVI", "start_date": start_date, "end_date": end_date, "resolution_m": resolution, }, } except ValueError as e: return {"summary": f"❌ **Error:** {str(e)}", "error": str(e)} except Exception as e: logger.error("ndvi_calculation_error", error=str(e), exc_info=True) return {"summary": f"❌ **Calculation failed:** {str(e)}", "error": str(e)} async def calculate_ndwi( start_date: str, end_date: str, latitude: Optional[float] = None, longitude: Optional[float] = None, location_name: Optional[str] = None, radius_km: float = 10.0, water_threshold: float = 0.3, ) -> dict: """ Calculate NDWI (Normalized Difference Water Index). NDWI = (Green - NIR) / (Green + NIR) Values: - > 0.3: Open water - 0.0 to 0.3: Flooding, wetlands - < 0.0: Non-water surfaces """ try: lat, lon, display_name = await resolve_location( location_name=location_name, latitude=latitude, longitude=longitude, ) logger.info( "calculating_ndwi", location=display_name, radius_km=radius_km, ) bbox = create_bbox_from_point(lat, lon, radius_km) service = SentinelHubService() ndwi_data = await service.calculate_index( bbox=bbox, start_date=start_date, end_date=end_date, index_type="ndwi", ) if ndwi_data is None: ndwi_data = generate_demo_ndwi(radius_km) valid_data = ndwi_data[~np.isnan(ndwi_data)] # Calculate water statistics water_mask = ndwi_data > water_threshold total_pixels = np.sum(~np.isnan(ndwi_data)) water_pixels = np.sum(water_mask) # Estimate area (rough calculation) pixel_area_km2 = (10 / 1000) ** 2 # 10m resolution water_area_km2 = water_pixels * pixel_area_km2 total_area_km2 = total_pixels * pixel_area_km2 stats = { "mean_ndwi": float(np.nanmean(ndwi_data)), "max_ndwi": float(np.nanmax(ndwi_data)), "min_ndwi": float(np.nanmin(ndwi_data)), "water_coverage_pct": float(water_pixels / total_pixels * 100) if total_pixels > 0 else 0, "water_area_km2": float(water_area_km2), "total_area_km2": float(total_area_km2), "wetland_pct": float(np.sum((ndwi_data > 0) & (ndwi_data <= water_threshold)) / total_pixels * 100) if total_pixels > 0 else 0, } # Generate visualization ndwi_colored = create_index_colormap(ndwi_data, "ndwi") image_base64 = array_to_base64_png(ndwi_colored) summary = f"""💧 **NDWI Water Analysis Results** 📍 **Location:** {display_name} 📅 **Date Range:** {start_date} to {end_date} 📐 **Area Analyzed:** {total_area_km2:.1f} km² **Water Detection:** • Open water (>{water_threshold}): {stats['water_coverage_pct']:.1f}% ({stats['water_area_km2']:.2f} km²) • Wetlands/flooding (0 to {water_threshold}): {stats['wetland_pct']:.1f}% • Non-water surfaces: {100 - stats['water_coverage_pct'] - stats['wetland_pct']:.1f}% **Index Statistics:** • Mean NDWI: {stats['mean_ndwi']:.3f} • Range: {stats['min_ndwi']:.3f} to {stats['max_ndwi']:.3f} """ return { "summary": summary, "statistics": stats, "image_base64": image_base64, "image_mime_type": "image/png", "location": { "name": display_name, "latitude": lat, "longitude": lon, }, } except ValueError as e: return {"summary": f"❌ **Error:** {str(e)}", "error": str(e)} except Exception as e: logger.error("ndwi_calculation_error", error=str(e), exc_info=True) return {"summary": f"❌ **Calculation failed:** {str(e)}", "error": str(e)} async def calculate_ndbi( start_date: str, end_date: str, latitude: Optional[float] = None, longitude: Optional[float] = None, location_name: Optional[str] = None, radius_km: float = 10.0, ) -> dict: """ Calculate NDBI (Normalized Difference Built-up Index). NDBI = (SWIR - NIR) / (SWIR + NIR) High values indicate built-up/urban areas. """ try: lat, lon, display_name = await resolve_location( location_name=location_name, latitude=latitude, longitude=longitude, ) logger.info( "calculating_ndbi", location=display_name, radius_km=radius_km, ) bbox = create_bbox_from_point(lat, lon, radius_km) service = SentinelHubService() ndbi_data = await service.calculate_index( bbox=bbox, start_date=start_date, end_date=end_date, index_type="ndbi", ) if ndbi_data is None: ndbi_data = generate_demo_ndbi(radius_km) valid_data = ndbi_data[~np.isnan(ndbi_data)] # Calculate urban statistics urban_threshold = 0.1 urban_mask = ndbi_data > urban_threshold total_pixels = np.sum(~np.isnan(ndbi_data)) urban_pixels = np.sum(urban_mask) pixel_area_km2 = (10 / 1000) ** 2 urban_area_km2 = urban_pixels * pixel_area_km2 stats = { "mean_ndbi": float(np.nanmean(ndbi_data)), "max_ndbi": float(np.nanmax(ndbi_data)), "min_ndbi": float(np.nanmin(ndbi_data)), "urban_coverage_pct": float(urban_pixels / total_pixels * 100) if total_pixels > 0 else 0, "urban_area_km2": float(urban_area_km2), "dense_urban_pct": float(np.sum(ndbi_data > 0.3) / total_pixels * 100) if total_pixels > 0 else 0, } ndbi_colored = create_index_colormap(ndbi_data, "ndbi") image_base64 = array_to_base64_png(ndbi_colored) summary = f"""🏙️ **NDBI Built-up Index Results** 📍 **Location:** {display_name} 📅 **Date Range:** {start_date} to {end_date} **Urban Area Detection:** • Built-up areas (>0.1): {stats['urban_coverage_pct']:.1f}% ({stats['urban_area_km2']:.2f} km²) • Dense urban (>0.3): {stats['dense_urban_pct']:.1f}% • Non-urban areas: {100 - stats['urban_coverage_pct']:.1f}% **Index Statistics:** • Mean NDBI: {stats['mean_ndbi']:.3f} • Range: {stats['min_ndbi']:.3f} to {stats['max_ndbi']:.3f} """ return { "summary": summary, "statistics": stats, "image_base64": image_base64, "image_mime_type": "image/png", "location": { "name": display_name, "latitude": lat, "longitude": lon, }, } except ValueError as e: return {"summary": f"❌ **Error:** {str(e)}", "error": str(e)} except Exception as e: logger.error("ndbi_calculation_error", error=str(e), exc_info=True) return {"summary": f"❌ **Calculation failed:** {str(e)}", "error": str(e)} # ============================================================================= # Helper Functions # ============================================================================= def calculate_ndvi_statistics(valid_data: np.ndarray) -> dict: """Calculate comprehensive NDVI statistics.""" total = len(valid_data) if total == 0: return { "mean_ndvi": 0, "median_ndvi": 0, "std_ndvi": 0, "min_ndvi": 0, "max_ndvi": 0, "dense_vegetation_pct": 0, "moderate_vegetation_pct": 0, "sparse_vegetation_pct": 0, "non_vegetation_pct": 0, } return { "mean_ndvi": float(np.mean(valid_data)), "median_ndvi": float(np.median(valid_data)), "std_ndvi": float(np.std(valid_data)), "min_ndvi": float(np.min(valid_data)), "max_ndvi": float(np.max(valid_data)), "dense_vegetation_pct": float(np.sum(valid_data > 0.6) / total * 100), "moderate_vegetation_pct": float(np.sum((valid_data > 0.3) & (valid_data <= 0.6)) / total * 100), "sparse_vegetation_pct": float(np.sum((valid_data > 0.1) & (valid_data <= 0.3)) / total * 100), "non_vegetation_pct": float(np.sum(valid_data <= 0.1) / total * 100), } def interpret_ndvi(stats: dict) -> str: """Generate human-readable interpretation of NDVI results.""" mean = stats.get("mean_ndvi", 0) dense = stats.get("dense_vegetation_pct", 0) if mean > 0.5: health = "🌳 **Excellent** - Area shows dense, healthy vegetation" elif mean > 0.3: health = "🌿 **Good** - Moderate vegetation coverage" elif mean > 0.1: health = "🌾 **Fair** - Sparse vegetation or crops" else: health = "🏜️ **Low** - Limited vegetation, possibly urban or barren" return f"**Vegetation Health:** {health}" def generate_demo_ndvi(radius_km: float, resolution: int = 10) -> np.ndarray: """Generate demo NDVI data for demonstration purposes.""" # Calculate size based on area and resolution size = int(radius_km * 2 * 1000 / resolution) size = min(size, 512) # Cap at 512x512 # Create realistic-looking NDVI pattern np.random.seed(42) # Base vegetation gradient x = np.linspace(0, 4 * np.pi, size) y = np.linspace(0, 4 * np.pi, size) xx, yy = np.meshgrid(x, y) # Combine multiple patterns for realism ndvi = ( 0.4 + 0.2 * np.sin(xx) * np.cos(yy) + 0.15 * np.sin(2 * xx + yy) + 0.1 * np.random.randn(size, size) ) # Add some water bodies (negative NDVI) water_mask = (np.sin(xx / 2) > 0.8) & (np.cos(yy / 2) > 0.7) ndvi[water_mask] = -0.3 + 0.1 * np.random.randn(np.sum(water_mask)) # Clip to valid range ndvi = np.clip(ndvi, -1, 1) return ndvi.astype(np.float32) def generate_demo_ndwi(radius_km: float) -> np.ndarray: """Generate demo NDWI data.""" size = min(int(radius_km * 2 * 100), 512) np.random.seed(43) x = np.linspace(0, 4 * np.pi, size) y = np.linspace(0, 4 * np.pi, size) xx, yy = np.meshgrid(x, y) # Mostly land with some water features ndwi = -0.2 + 0.15 * np.random.randn(size, size) # Add river-like feature river = np.abs(np.sin(xx + 0.5 * yy)) < 0.1 ndwi[river] = 0.5 + 0.1 * np.random.randn(np.sum(river)) # Add lake lake = ((xx - 2) ** 2 + (yy - 2) ** 2) < 1 ndwi[lake] = 0.6 + 0.05 * np.random.randn(np.sum(lake)) return np.clip(ndwi, -1, 1).astype(np.float32) def generate_demo_ndbi(radius_km: float) -> np.ndarray: """Generate demo NDBI data.""" size = min(int(radius_km * 2 * 100), 512) np.random.seed(44) x = np.linspace(-1, 1, size) y = np.linspace(-1, 1, size) xx, yy = np.meshgrid(x, y) # Urban center with decreasing density outward distance = np.sqrt(xx**2 + yy**2) ndbi = 0.4 * np.exp(-2 * distance) + 0.1 * np.random.randn(size, size) # Add some roads roads = (np.abs(xx) < 0.05) | (np.abs(yy) < 0.05) ndbi[roads] = 0.3 + 0.05 * np.random.randn(np.sum(roads)) return np.clip(ndbi, -1, 1).astype(np.float32)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/armaasinghn/geosight-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server