"""
Spectral index calculation tools (NDVI, NDWI, NDBI).
These indices are calculated from satellite imagery bands to analyze
vegetation, water, and built-up areas.
"""
import asyncio
import base64
from datetime import datetime
from io import BytesIO
from typing import Optional
import numpy as np
import structlog
from PIL import Image
from geosight.config import settings
from geosight.tools.geocoding import resolve_location
from geosight.services.sentinel_hub import SentinelHubService
from geosight.utils.visualization import (
create_index_colormap,
create_folium_map,
array_to_base64_png,
)
from geosight.utils.geo import create_bbox_from_point
logger = structlog.get_logger(__name__)
async def calculate_ndvi(
start_date: str,
end_date: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
location_name: Optional[str] = None,
radius_km: float = 10.0,
resolution: int = 10,
) -> dict:
"""
Calculate NDVI (Normalized Difference Vegetation Index).
NDVI = (NIR - Red) / (NIR + Red)
Values range from -1 to 1:
- 0.6 to 1.0: Dense, healthy vegetation
- 0.3 to 0.6: Moderate vegetation
- 0.1 to 0.3: Sparse vegetation
- -0.1 to 0.1: Bare soil
- -1 to -0.1: Water, snow, clouds
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
latitude: Center latitude
longitude: Center longitude
location_name: Location name to geocode
radius_km: Analysis radius in kilometers
resolution: Output resolution in meters
Returns:
Dictionary with NDVI results, statistics, and visualization
"""
try:
# Resolve location
lat, lon, display_name = await resolve_location(
location_name=location_name,
latitude=latitude,
longitude=longitude,
)
logger.info(
"calculating_ndvi",
location=display_name,
radius_km=radius_km,
start_date=start_date,
end_date=end_date,
)
# Create bounding box
bbox = create_bbox_from_point(lat, lon, radius_km)
# Fetch imagery and calculate NDVI
service = SentinelHubService()
ndvi_data = await service.calculate_index(
bbox=bbox,
start_date=start_date,
end_date=end_date,
index_type="ndvi",
resolution=resolution,
)
if ndvi_data is None:
# Generate demo data for demonstration
ndvi_data = generate_demo_ndvi(radius_km, resolution)
# Calculate statistics
valid_data = ndvi_data[~np.isnan(ndvi_data)]
stats = calculate_ndvi_statistics(valid_data)
# Generate visualization
ndvi_colored = create_index_colormap(ndvi_data, "ndvi")
image_base64 = array_to_base64_png(ndvi_colored)
# Generate interactive map
map_html = create_folium_map(
center=(lat, lon),
overlay_data=ndvi_data,
overlay_bounds=bbox,
colormap="ndvi",
title="NDVI Analysis",
)
# Build interpretation
interpretation = interpret_ndvi(stats)
summary = f"""🌿 **NDVI Analysis Results**
📍 **Location:** {display_name}
📅 **Date Range:** {start_date} to {end_date}
📐 **Area:** {radius_km * 2} km × {radius_km * 2} km ({(radius_km * 2) ** 2:.1f} km²)
{interpretation}
**Vegetation Distribution:**
• Dense vegetation (>0.6): {stats['dense_vegetation_pct']:.1f}%
• Moderate vegetation (0.3-0.6): {stats['moderate_vegetation_pct']:.1f}%
• Sparse vegetation (0.1-0.3): {stats['sparse_vegetation_pct']:.1f}%
• Non-vegetation (<0.1): {stats['non_vegetation_pct']:.1f}%
"""
return {
"summary": summary,
"statistics": stats,
"image_base64": image_base64,
"image_mime_type": "image/png",
"location": {
"name": display_name,
"latitude": lat,
"longitude": lon,
"radius_km": radius_km,
},
"analysis_params": {
"index": "NDVI",
"start_date": start_date,
"end_date": end_date,
"resolution_m": resolution,
},
}
except ValueError as e:
return {"summary": f"❌ **Error:** {str(e)}", "error": str(e)}
except Exception as e:
logger.error("ndvi_calculation_error", error=str(e), exc_info=True)
return {"summary": f"❌ **Calculation failed:** {str(e)}", "error": str(e)}
async def calculate_ndwi(
start_date: str,
end_date: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
location_name: Optional[str] = None,
radius_km: float = 10.0,
water_threshold: float = 0.3,
) -> dict:
"""
Calculate NDWI (Normalized Difference Water Index).
NDWI = (Green - NIR) / (Green + NIR)
Values:
- > 0.3: Open water
- 0.0 to 0.3: Flooding, wetlands
- < 0.0: Non-water surfaces
"""
try:
lat, lon, display_name = await resolve_location(
location_name=location_name,
latitude=latitude,
longitude=longitude,
)
logger.info(
"calculating_ndwi",
location=display_name,
radius_km=radius_km,
)
bbox = create_bbox_from_point(lat, lon, radius_km)
service = SentinelHubService()
ndwi_data = await service.calculate_index(
bbox=bbox,
start_date=start_date,
end_date=end_date,
index_type="ndwi",
)
if ndwi_data is None:
ndwi_data = generate_demo_ndwi(radius_km)
valid_data = ndwi_data[~np.isnan(ndwi_data)]
# Calculate water statistics
water_mask = ndwi_data > water_threshold
total_pixels = np.sum(~np.isnan(ndwi_data))
water_pixels = np.sum(water_mask)
# Estimate area (rough calculation)
pixel_area_km2 = (10 / 1000) ** 2 # 10m resolution
water_area_km2 = water_pixels * pixel_area_km2
total_area_km2 = total_pixels * pixel_area_km2
stats = {
"mean_ndwi": float(np.nanmean(ndwi_data)),
"max_ndwi": float(np.nanmax(ndwi_data)),
"min_ndwi": float(np.nanmin(ndwi_data)),
"water_coverage_pct": float(water_pixels / total_pixels * 100) if total_pixels > 0 else 0,
"water_area_km2": float(water_area_km2),
"total_area_km2": float(total_area_km2),
"wetland_pct": float(np.sum((ndwi_data > 0) & (ndwi_data <= water_threshold)) / total_pixels * 100) if total_pixels > 0 else 0,
}
# Generate visualization
ndwi_colored = create_index_colormap(ndwi_data, "ndwi")
image_base64 = array_to_base64_png(ndwi_colored)
summary = f"""💧 **NDWI Water Analysis Results**
📍 **Location:** {display_name}
📅 **Date Range:** {start_date} to {end_date}
📐 **Area Analyzed:** {total_area_km2:.1f} km²
**Water Detection:**
• Open water (>{water_threshold}): {stats['water_coverage_pct']:.1f}% ({stats['water_area_km2']:.2f} km²)
• Wetlands/flooding (0 to {water_threshold}): {stats['wetland_pct']:.1f}%
• Non-water surfaces: {100 - stats['water_coverage_pct'] - stats['wetland_pct']:.1f}%
**Index Statistics:**
• Mean NDWI: {stats['mean_ndwi']:.3f}
• Range: {stats['min_ndwi']:.3f} to {stats['max_ndwi']:.3f}
"""
return {
"summary": summary,
"statistics": stats,
"image_base64": image_base64,
"image_mime_type": "image/png",
"location": {
"name": display_name,
"latitude": lat,
"longitude": lon,
},
}
except ValueError as e:
return {"summary": f"❌ **Error:** {str(e)}", "error": str(e)}
except Exception as e:
logger.error("ndwi_calculation_error", error=str(e), exc_info=True)
return {"summary": f"❌ **Calculation failed:** {str(e)}", "error": str(e)}
async def calculate_ndbi(
start_date: str,
end_date: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
location_name: Optional[str] = None,
radius_km: float = 10.0,
) -> dict:
"""
Calculate NDBI (Normalized Difference Built-up Index).
NDBI = (SWIR - NIR) / (SWIR + NIR)
High values indicate built-up/urban areas.
"""
try:
lat, lon, display_name = await resolve_location(
location_name=location_name,
latitude=latitude,
longitude=longitude,
)
logger.info(
"calculating_ndbi",
location=display_name,
radius_km=radius_km,
)
bbox = create_bbox_from_point(lat, lon, radius_km)
service = SentinelHubService()
ndbi_data = await service.calculate_index(
bbox=bbox,
start_date=start_date,
end_date=end_date,
index_type="ndbi",
)
if ndbi_data is None:
ndbi_data = generate_demo_ndbi(radius_km)
valid_data = ndbi_data[~np.isnan(ndbi_data)]
# Calculate urban statistics
urban_threshold = 0.1
urban_mask = ndbi_data > urban_threshold
total_pixels = np.sum(~np.isnan(ndbi_data))
urban_pixels = np.sum(urban_mask)
pixel_area_km2 = (10 / 1000) ** 2
urban_area_km2 = urban_pixels * pixel_area_km2
stats = {
"mean_ndbi": float(np.nanmean(ndbi_data)),
"max_ndbi": float(np.nanmax(ndbi_data)),
"min_ndbi": float(np.nanmin(ndbi_data)),
"urban_coverage_pct": float(urban_pixels / total_pixels * 100) if total_pixels > 0 else 0,
"urban_area_km2": float(urban_area_km2),
"dense_urban_pct": float(np.sum(ndbi_data > 0.3) / total_pixels * 100) if total_pixels > 0 else 0,
}
ndbi_colored = create_index_colormap(ndbi_data, "ndbi")
image_base64 = array_to_base64_png(ndbi_colored)
summary = f"""🏙️ **NDBI Built-up Index Results**
📍 **Location:** {display_name}
📅 **Date Range:** {start_date} to {end_date}
**Urban Area Detection:**
• Built-up areas (>0.1): {stats['urban_coverage_pct']:.1f}% ({stats['urban_area_km2']:.2f} km²)
• Dense urban (>0.3): {stats['dense_urban_pct']:.1f}%
• Non-urban areas: {100 - stats['urban_coverage_pct']:.1f}%
**Index Statistics:**
• Mean NDBI: {stats['mean_ndbi']:.3f}
• Range: {stats['min_ndbi']:.3f} to {stats['max_ndbi']:.3f}
"""
return {
"summary": summary,
"statistics": stats,
"image_base64": image_base64,
"image_mime_type": "image/png",
"location": {
"name": display_name,
"latitude": lat,
"longitude": lon,
},
}
except ValueError as e:
return {"summary": f"❌ **Error:** {str(e)}", "error": str(e)}
except Exception as e:
logger.error("ndbi_calculation_error", error=str(e), exc_info=True)
return {"summary": f"❌ **Calculation failed:** {str(e)}", "error": str(e)}
# =============================================================================
# Helper Functions
# =============================================================================
def calculate_ndvi_statistics(valid_data: np.ndarray) -> dict:
"""Calculate comprehensive NDVI statistics."""
total = len(valid_data)
if total == 0:
return {
"mean_ndvi": 0,
"median_ndvi": 0,
"std_ndvi": 0,
"min_ndvi": 0,
"max_ndvi": 0,
"dense_vegetation_pct": 0,
"moderate_vegetation_pct": 0,
"sparse_vegetation_pct": 0,
"non_vegetation_pct": 0,
}
return {
"mean_ndvi": float(np.mean(valid_data)),
"median_ndvi": float(np.median(valid_data)),
"std_ndvi": float(np.std(valid_data)),
"min_ndvi": float(np.min(valid_data)),
"max_ndvi": float(np.max(valid_data)),
"dense_vegetation_pct": float(np.sum(valid_data > 0.6) / total * 100),
"moderate_vegetation_pct": float(np.sum((valid_data > 0.3) & (valid_data <= 0.6)) / total * 100),
"sparse_vegetation_pct": float(np.sum((valid_data > 0.1) & (valid_data <= 0.3)) / total * 100),
"non_vegetation_pct": float(np.sum(valid_data <= 0.1) / total * 100),
}
def interpret_ndvi(stats: dict) -> str:
"""Generate human-readable interpretation of NDVI results."""
mean = stats.get("mean_ndvi", 0)
dense = stats.get("dense_vegetation_pct", 0)
if mean > 0.5:
health = "🌳 **Excellent** - Area shows dense, healthy vegetation"
elif mean > 0.3:
health = "🌿 **Good** - Moderate vegetation coverage"
elif mean > 0.1:
health = "🌾 **Fair** - Sparse vegetation or crops"
else:
health = "🏜️ **Low** - Limited vegetation, possibly urban or barren"
return f"**Vegetation Health:** {health}"
def generate_demo_ndvi(radius_km: float, resolution: int = 10) -> np.ndarray:
"""Generate demo NDVI data for demonstration purposes."""
# Calculate size based on area and resolution
size = int(radius_km * 2 * 1000 / resolution)
size = min(size, 512) # Cap at 512x512
# Create realistic-looking NDVI pattern
np.random.seed(42)
# Base vegetation gradient
x = np.linspace(0, 4 * np.pi, size)
y = np.linspace(0, 4 * np.pi, size)
xx, yy = np.meshgrid(x, y)
# Combine multiple patterns for realism
ndvi = (
0.4 +
0.2 * np.sin(xx) * np.cos(yy) +
0.15 * np.sin(2 * xx + yy) +
0.1 * np.random.randn(size, size)
)
# Add some water bodies (negative NDVI)
water_mask = (np.sin(xx / 2) > 0.8) & (np.cos(yy / 2) > 0.7)
ndvi[water_mask] = -0.3 + 0.1 * np.random.randn(np.sum(water_mask))
# Clip to valid range
ndvi = np.clip(ndvi, -1, 1)
return ndvi.astype(np.float32)
def generate_demo_ndwi(radius_km: float) -> np.ndarray:
"""Generate demo NDWI data."""
size = min(int(radius_km * 2 * 100), 512)
np.random.seed(43)
x = np.linspace(0, 4 * np.pi, size)
y = np.linspace(0, 4 * np.pi, size)
xx, yy = np.meshgrid(x, y)
# Mostly land with some water features
ndwi = -0.2 + 0.15 * np.random.randn(size, size)
# Add river-like feature
river = np.abs(np.sin(xx + 0.5 * yy)) < 0.1
ndwi[river] = 0.5 + 0.1 * np.random.randn(np.sum(river))
# Add lake
lake = ((xx - 2) ** 2 + (yy - 2) ** 2) < 1
ndwi[lake] = 0.6 + 0.05 * np.random.randn(np.sum(lake))
return np.clip(ndwi, -1, 1).astype(np.float32)
def generate_demo_ndbi(radius_km: float) -> np.ndarray:
"""Generate demo NDBI data."""
size = min(int(radius_km * 2 * 100), 512)
np.random.seed(44)
x = np.linspace(-1, 1, size)
y = np.linspace(-1, 1, size)
xx, yy = np.meshgrid(x, y)
# Urban center with decreasing density outward
distance = np.sqrt(xx**2 + yy**2)
ndbi = 0.4 * np.exp(-2 * distance) + 0.1 * np.random.randn(size, size)
# Add some roads
roads = (np.abs(xx) < 0.05) | (np.abs(yy) < 0.05)
ndbi[roads] = 0.3 + 0.05 * np.random.randn(np.sum(roads))
return np.clip(ndbi, -1, 1).astype(np.float32)