"""
Change detection tool for bi-temporal analysis.
"""
from typing import Optional, Literal
import numpy as np
import structlog
from geosight.tools.geocoding import resolve_location
from geosight.models.change_detector import ChangeDetector
from geosight.utils.visualization import create_index_colormap, array_to_base64_png
from geosight.utils.geo import create_bbox_from_point
logger = structlog.get_logger(__name__)
async def detect_changes(
date_before: str,
date_after: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
location_name: Optional[str] = None,
radius_km: float = 10.0,
change_type: Literal["all", "vegetation", "urban", "water"] = "all",
sensitivity: Literal["low", "medium", "high"] = "medium",
) -> dict:
"""Detect changes between two time periods."""
try:
lat, lon, display_name = await resolve_location(
location_name=location_name,
latitude=latitude,
longitude=longitude,
)
logger.info(
"detecting_changes",
location=display_name,
date_before=date_before,
date_after=date_after,
)
# Set threshold based on sensitivity
thresholds = {"low": 0.5, "medium": 0.3, "high": 0.15}
threshold = thresholds[sensitivity]
# Generate demo change data
size = min(int(radius_km * 20), 512)
before_data = generate_demo_data(size, seed=42)
after_data = generate_demo_data(size, seed=43)
# Detect changes
detector = ChangeDetector()
change_mask, change_magnitude = await detector.detect(
before_data, after_data, threshold
)
# Calculate statistics
total_pixels = change_mask.size
changed_pixels = np.sum(change_mask > 0)
total_area = (radius_km * 2) ** 2
changed_area = (changed_pixels / total_pixels) * total_area
stats = {
"total_area_km2": float(total_area),
"changed_area_km2": float(changed_area),
"change_percentage": float(changed_pixels / total_pixels * 100),
"mean_change_magnitude": float(np.mean(change_magnitude)),
"max_change_magnitude": float(np.max(change_magnitude)),
"sensitivity": sensitivity,
"threshold": threshold,
}
# Generate visualization
change_colored = create_index_colormap(change_magnitude * 2 - 1, "change")
image_base64 = array_to_base64_png(change_colored)
# Interpret results
if stats["change_percentage"] > 20:
interpretation = "🔴 **Significant changes detected** - Major alterations in the area"
elif stats["change_percentage"] > 5:
interpretation = "🟡 **Moderate changes detected** - Some visible alterations"
else:
interpretation = "🟢 **Minimal changes detected** - Area relatively stable"
summary = f"""🔄 **Change Detection Analysis**
📍 **Location:** {display_name}
📅 **Period:** {date_before} → {date_after}
📐 **Area Analyzed:** {total_area:.1f} km²
🔧 **Sensitivity:** {sensitivity.title()}
{interpretation}
**Change Statistics:**
• Changed area: {changed_area:.2f} km² ({stats['change_percentage']:.1f}%)
• Mean change magnitude: {stats['mean_change_magnitude']:.3f}
• Maximum change: {stats['max_change_magnitude']:.3f}
**Legend:**
• 🔴 Red = Decrease (e.g., deforestation, water loss)
• ⚪ White = No change
• 🟢 Green = Increase (e.g., vegetation growth, construction)
"""
return {
"summary": summary,
"statistics": stats,
"image_base64": image_base64,
"image_mime_type": "image/png",
"location": {"name": display_name, "latitude": lat, "longitude": lon},
}
except ValueError as e:
return {"summary": f"❌ **Error:** {str(e)}", "error": str(e)}
except Exception as e:
logger.error("change_detection_error", error=str(e), exc_info=True)
return {"summary": f"❌ **Detection failed:** {str(e)}", "error": str(e)}
def generate_demo_data(size: int, seed: int) -> np.ndarray:
"""Generate demo imagery data."""
np.random.seed(seed)
x = np.linspace(0, 4 * np.pi, size)
y = np.linspace(0, 4 * np.pi, size)
xx, yy = np.meshgrid(x, y)
data = (
0.5 +
0.2 * np.sin(xx) * np.cos(yy) +
0.15 * np.sin(2 * xx + yy) +
0.1 * np.random.randn(size, size)
)
return np.clip(data, 0, 1).astype(np.float32)