Skip to main content
Glama
storage.py10.9 kB
""" Storage service for managing files and outputs. Supports local filesystem and S3-compatible storage (MinIO, AWS S3). """ import asyncio import os import shutil from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path from typing import BinaryIO, Optional, Union from uuid import uuid4 import structlog from geosight.config import settings logger = structlog.get_logger(__name__) class StorageBackend(ABC): """Abstract base class for storage backends.""" @abstractmethod async def save( self, data: Union[bytes, BinaryIO], path: str, content_type: Optional[str] = None, ) -> str: """Save data and return URL/path.""" pass @abstractmethod async def load(self, path: str) -> bytes: """Load data from path.""" pass @abstractmethod async def delete(self, path: str) -> None: """Delete file at path.""" pass @abstractmethod async def exists(self, path: str) -> bool: """Check if path exists.""" pass @abstractmethod async def list_files(self, prefix: str = "") -> list[str]: """List files with given prefix.""" pass class LocalStorageBackend(StorageBackend): """ Local filesystem storage backend. Good for development and single-server deployments. """ def __init__(self, base_path: str = "/tmp/geosight"): self._base_path = Path(base_path) self._base_path.mkdir(parents=True, exist_ok=True) def _full_path(self, path: str) -> Path: """Get full filesystem path.""" return self._base_path / path async def save( self, data: Union[bytes, BinaryIO], path: str, content_type: Optional[str] = None, ) -> str: full_path = self._full_path(path) full_path.parent.mkdir(parents=True, exist_ok=True) if isinstance(data, bytes): full_path.write_bytes(data) else: with open(full_path, "wb") as f: shutil.copyfileobj(data, f) logger.debug("file_saved", path=str(full_path)) return str(full_path) async def load(self, path: str) -> bytes: full_path = self._full_path(path) return full_path.read_bytes() async def delete(self, path: str) -> None: full_path = self._full_path(path) if full_path.exists(): full_path.unlink() async def exists(self, path: str) -> bool: return self._full_path(path).exists() async def list_files(self, prefix: str = "") -> list[str]: base = self._full_path(prefix) if not base.exists(): return [] files = [] for item in base.rglob("*"): if item.is_file(): files.append(str(item.relative_to(self._base_path))) return files class S3StorageBackend(StorageBackend): """ S3-compatible storage backend. Works with AWS S3, MinIO, and other S3-compatible services. """ def __init__( self, endpoint_url: str, access_key: str, secret_key: str, bucket_name: str, region: str = "us-east-1", ): self._endpoint_url = endpoint_url self._access_key = access_key self._secret_key = secret_key self._bucket_name = bucket_name self._region = region self._client = None async def _get_client(self): """Get or create S3 client.""" if self._client is None: try: import aioboto3 session = aioboto3.Session() self._client = await session.client( "s3", endpoint_url=self._endpoint_url, aws_access_key_id=self._access_key, aws_secret_access_key=self._secret_key, region_name=self._region, ).__aenter__() except ImportError: logger.error("aioboto3_not_installed") raise return self._client async def save( self, data: Union[bytes, BinaryIO], path: str, content_type: Optional[str] = None, ) -> str: client = await self._get_client() extra_args = {} if content_type: extra_args["ContentType"] = content_type if isinstance(data, bytes): from io import BytesIO data = BytesIO(data) await client.upload_fileobj( data, self._bucket_name, path, ExtraArgs=extra_args if extra_args else None, ) url = f"{self._endpoint_url}/{self._bucket_name}/{path}" logger.debug("file_saved_s3", path=path, url=url) return url async def load(self, path: str) -> bytes: client = await self._get_client() from io import BytesIO buffer = BytesIO() await client.download_fileobj(self._bucket_name, path, buffer) buffer.seek(0) return buffer.read() async def delete(self, path: str) -> None: client = await self._get_client() await client.delete_object(Bucket=self._bucket_name, Key=path) async def exists(self, path: str) -> bool: client = await self._get_client() try: await client.head_object(Bucket=self._bucket_name, Key=path) return True except: return False async def list_files(self, prefix: str = "") -> list[str]: client = await self._get_client() files = [] paginator = client.get_paginator("list_objects_v2") async for page in paginator.paginate(Bucket=self._bucket_name, Prefix=prefix): for obj in page.get("Contents", []): files.append(obj["Key"]) return files async def get_presigned_url(self, path: str, expires_in: int = 3600) -> str: """Generate a presigned URL for temporary access.""" client = await self._get_client() url = await client.generate_presigned_url( "get_object", Params={"Bucket": self._bucket_name, "Key": path}, ExpiresIn=expires_in, ) return url class StorageService: """ High-level storage service with automatic backend selection. """ def __init__(self): self._backend: Optional[StorageBackend] = None async def _get_backend(self) -> StorageBackend: """Get or initialize storage backend.""" if self._backend is not None: return self._backend s3_config = settings.s3 # Try S3 if configured if s3_config.endpoint_url and s3_config.access_key: try: self._backend = S3StorageBackend( endpoint_url=s3_config.endpoint_url, access_key=s3_config.access_key, secret_key=s3_config.secret_key, bucket_name=s3_config.bucket_name, region=s3_config.region, ) logger.info("storage_initialized", backend="s3") return self._backend except Exception as e: logger.warning("s3_init_failed", error=str(e)) # Fall back to local storage self._backend = LocalStorageBackend() logger.info("storage_initialized", backend="local") return self._backend async def save_image( self, data: bytes, filename: Optional[str] = None, folder: str = "images", ) -> str: """ Save an image file. Args: data: Image bytes filename: Optional filename (generated if not provided) folder: Folder/prefix for the file Returns: URL or path to saved file """ backend = await self._get_backend() if filename is None: filename = f"{uuid4().hex}.png" path = f"{folder}/{datetime.now().strftime('%Y/%m/%d')}/{filename}" return await backend.save(data, path, content_type="image/png") async def save_report( self, data: bytes, filename: str, content_type: str = "application/pdf", ) -> str: """ Save a report file. Args: data: Report bytes filename: Filename content_type: MIME type Returns: URL or path to saved file """ backend = await self._get_backend() path = f"reports/{datetime.now().strftime('%Y/%m/%d')}/{filename}" return await backend.save(data, path, content_type=content_type) async def save_map(self, html_content: str, filename: Optional[str] = None) -> str: """ Save an interactive map HTML file. Args: html_content: HTML string filename: Optional filename Returns: URL or path to saved file """ backend = await self._get_backend() if filename is None: filename = f"map_{uuid4().hex[:8]}.html" path = f"maps/{datetime.now().strftime('%Y/%m/%d')}/{filename}" return await backend.save( html_content.encode("utf-8"), path, content_type="text/html", ) async def load(self, path: str) -> bytes: """Load a file.""" backend = await self._get_backend() return await backend.load(path) async def delete(self, path: str) -> None: """Delete a file.""" backend = await self._get_backend() await backend.delete(path) async def cleanup_old_files(self, days: int = 7) -> int: """ Clean up files older than specified days. Returns: Number of files deleted """ backend = await self._get_backend() # Only implement for local storage if not isinstance(backend, LocalStorageBackend): logger.warning("cleanup_not_implemented_for_s3") return 0 from datetime import timedelta cutoff = datetime.now() - timedelta(days=days) deleted = 0 for path in await backend.list_files(): full_path = backend._full_path(path) if full_path.stat().st_mtime < cutoff.timestamp(): await backend.delete(path) deleted += 1 logger.info("cleanup_complete", deleted=deleted) return deleted

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/armaasinghn/geosight-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server