"""
Storage service for managing files and outputs.
Supports local filesystem and S3-compatible storage (MinIO, AWS S3).
"""
import asyncio
import os
import shutil
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import BinaryIO, Optional, Union
from uuid import uuid4
import structlog
from geosight.config import settings
logger = structlog.get_logger(__name__)
class StorageBackend(ABC):
"""Abstract base class for storage backends."""
@abstractmethod
async def save(
self,
data: Union[bytes, BinaryIO],
path: str,
content_type: Optional[str] = None,
) -> str:
"""Save data and return URL/path."""
pass
@abstractmethod
async def load(self, path: str) -> bytes:
"""Load data from path."""
pass
@abstractmethod
async def delete(self, path: str) -> None:
"""Delete file at path."""
pass
@abstractmethod
async def exists(self, path: str) -> bool:
"""Check if path exists."""
pass
@abstractmethod
async def list_files(self, prefix: str = "") -> list[str]:
"""List files with given prefix."""
pass
class LocalStorageBackend(StorageBackend):
"""
Local filesystem storage backend.
Good for development and single-server deployments.
"""
def __init__(self, base_path: str = "/tmp/geosight"):
self._base_path = Path(base_path)
self._base_path.mkdir(parents=True, exist_ok=True)
def _full_path(self, path: str) -> Path:
"""Get full filesystem path."""
return self._base_path / path
async def save(
self,
data: Union[bytes, BinaryIO],
path: str,
content_type: Optional[str] = None,
) -> str:
full_path = self._full_path(path)
full_path.parent.mkdir(parents=True, exist_ok=True)
if isinstance(data, bytes):
full_path.write_bytes(data)
else:
with open(full_path, "wb") as f:
shutil.copyfileobj(data, f)
logger.debug("file_saved", path=str(full_path))
return str(full_path)
async def load(self, path: str) -> bytes:
full_path = self._full_path(path)
return full_path.read_bytes()
async def delete(self, path: str) -> None:
full_path = self._full_path(path)
if full_path.exists():
full_path.unlink()
async def exists(self, path: str) -> bool:
return self._full_path(path).exists()
async def list_files(self, prefix: str = "") -> list[str]:
base = self._full_path(prefix)
if not base.exists():
return []
files = []
for item in base.rglob("*"):
if item.is_file():
files.append(str(item.relative_to(self._base_path)))
return files
class S3StorageBackend(StorageBackend):
"""
S3-compatible storage backend.
Works with AWS S3, MinIO, and other S3-compatible services.
"""
def __init__(
self,
endpoint_url: str,
access_key: str,
secret_key: str,
bucket_name: str,
region: str = "us-east-1",
):
self._endpoint_url = endpoint_url
self._access_key = access_key
self._secret_key = secret_key
self._bucket_name = bucket_name
self._region = region
self._client = None
async def _get_client(self):
"""Get or create S3 client."""
if self._client is None:
try:
import aioboto3
session = aioboto3.Session()
self._client = await session.client(
"s3",
endpoint_url=self._endpoint_url,
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
region_name=self._region,
).__aenter__()
except ImportError:
logger.error("aioboto3_not_installed")
raise
return self._client
async def save(
self,
data: Union[bytes, BinaryIO],
path: str,
content_type: Optional[str] = None,
) -> str:
client = await self._get_client()
extra_args = {}
if content_type:
extra_args["ContentType"] = content_type
if isinstance(data, bytes):
from io import BytesIO
data = BytesIO(data)
await client.upload_fileobj(
data,
self._bucket_name,
path,
ExtraArgs=extra_args if extra_args else None,
)
url = f"{self._endpoint_url}/{self._bucket_name}/{path}"
logger.debug("file_saved_s3", path=path, url=url)
return url
async def load(self, path: str) -> bytes:
client = await self._get_client()
from io import BytesIO
buffer = BytesIO()
await client.download_fileobj(self._bucket_name, path, buffer)
buffer.seek(0)
return buffer.read()
async def delete(self, path: str) -> None:
client = await self._get_client()
await client.delete_object(Bucket=self._bucket_name, Key=path)
async def exists(self, path: str) -> bool:
client = await self._get_client()
try:
await client.head_object(Bucket=self._bucket_name, Key=path)
return True
except:
return False
async def list_files(self, prefix: str = "") -> list[str]:
client = await self._get_client()
files = []
paginator = client.get_paginator("list_objects_v2")
async for page in paginator.paginate(Bucket=self._bucket_name, Prefix=prefix):
for obj in page.get("Contents", []):
files.append(obj["Key"])
return files
async def get_presigned_url(self, path: str, expires_in: int = 3600) -> str:
"""Generate a presigned URL for temporary access."""
client = await self._get_client()
url = await client.generate_presigned_url(
"get_object",
Params={"Bucket": self._bucket_name, "Key": path},
ExpiresIn=expires_in,
)
return url
class StorageService:
"""
High-level storage service with automatic backend selection.
"""
def __init__(self):
self._backend: Optional[StorageBackend] = None
async def _get_backend(self) -> StorageBackend:
"""Get or initialize storage backend."""
if self._backend is not None:
return self._backend
s3_config = settings.s3
# Try S3 if configured
if s3_config.endpoint_url and s3_config.access_key:
try:
self._backend = S3StorageBackend(
endpoint_url=s3_config.endpoint_url,
access_key=s3_config.access_key,
secret_key=s3_config.secret_key,
bucket_name=s3_config.bucket_name,
region=s3_config.region,
)
logger.info("storage_initialized", backend="s3")
return self._backend
except Exception as e:
logger.warning("s3_init_failed", error=str(e))
# Fall back to local storage
self._backend = LocalStorageBackend()
logger.info("storage_initialized", backend="local")
return self._backend
async def save_image(
self,
data: bytes,
filename: Optional[str] = None,
folder: str = "images",
) -> str:
"""
Save an image file.
Args:
data: Image bytes
filename: Optional filename (generated if not provided)
folder: Folder/prefix for the file
Returns:
URL or path to saved file
"""
backend = await self._get_backend()
if filename is None:
filename = f"{uuid4().hex}.png"
path = f"{folder}/{datetime.now().strftime('%Y/%m/%d')}/{filename}"
return await backend.save(data, path, content_type="image/png")
async def save_report(
self,
data: bytes,
filename: str,
content_type: str = "application/pdf",
) -> str:
"""
Save a report file.
Args:
data: Report bytes
filename: Filename
content_type: MIME type
Returns:
URL or path to saved file
"""
backend = await self._get_backend()
path = f"reports/{datetime.now().strftime('%Y/%m/%d')}/{filename}"
return await backend.save(data, path, content_type=content_type)
async def save_map(self, html_content: str, filename: Optional[str] = None) -> str:
"""
Save an interactive map HTML file.
Args:
html_content: HTML string
filename: Optional filename
Returns:
URL or path to saved file
"""
backend = await self._get_backend()
if filename is None:
filename = f"map_{uuid4().hex[:8]}.html"
path = f"maps/{datetime.now().strftime('%Y/%m/%d')}/{filename}"
return await backend.save(
html_content.encode("utf-8"),
path,
content_type="text/html",
)
async def load(self, path: str) -> bytes:
"""Load a file."""
backend = await self._get_backend()
return await backend.load(path)
async def delete(self, path: str) -> None:
"""Delete a file."""
backend = await self._get_backend()
await backend.delete(path)
async def cleanup_old_files(self, days: int = 7) -> int:
"""
Clean up files older than specified days.
Returns:
Number of files deleted
"""
backend = await self._get_backend()
# Only implement for local storage
if not isinstance(backend, LocalStorageBackend):
logger.warning("cleanup_not_implemented_for_s3")
return 0
from datetime import timedelta
cutoff = datetime.now() - timedelta(days=days)
deleted = 0
for path in await backend.list_files():
full_path = backend._full_path(path)
if full_path.stat().st_mtime < cutoff.timestamp():
await backend.delete(path)
deleted += 1
logger.info("cleanup_complete", deleted=deleted)
return deleted