Skip to main content
Glama
cleaner.py15.5 kB
""" Data Cleaner for Analytics Engine. Handles cleanup and lifecycle management of analytics data. """ import asyncio from pathlib import Path from datetime import datetime, timezone, timedelta from typing import Optional, List, Dict, Any from dataclasses import dataclass from enum import Enum import re from ..utils.logging import get_logger from ..utils.errors import ShannonError logger = get_logger(__name__) class RetentionPolicy(str, Enum): """Data retention policies.""" KEEP_ALL = "keep_all" KEEP_DAYS = "keep_days" KEEP_SIZE = "keep_size" KEEP_COUNT = "keep_count" @dataclass class CleanupPolicy: """Policy for cleaning up analytics data.""" # Retention settings retention_policy: RetentionPolicy = RetentionPolicy.KEEP_DAYS retention_days: int = 90 # For KEEP_DAYS retention_size_mb: int = 1000 # For KEEP_SIZE retention_count: int = 100 # For KEEP_COUNT # Compression settings compress_after_days: int = 7 # Archive settings archive_enabled: bool = True archive_path: Optional[Path] = None archive_after_days: int = 30 # Cleanup schedule cleanup_interval_hours: int = 24 def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "retention_policy": self.retention_policy.value, "retention_days": self.retention_days, "retention_size_mb": self.retention_size_mb, "retention_count": self.retention_count, "compress_after_days": self.compress_after_days, "archive_enabled": self.archive_enabled, "archive_path": str(self.archive_path) if self.archive_path else None, "archive_after_days": self.archive_after_days, "cleanup_interval_hours": self.cleanup_interval_hours } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "CleanupPolicy": """Create from dictionary.""" policy = cls() if "retention_policy" in data: policy.retention_policy = RetentionPolicy(data["retention_policy"]) if "retention_days" in data: policy.retention_days = data["retention_days"] if "retention_size_mb" in data: policy.retention_size_mb = data["retention_size_mb"] if "retention_count" in data: policy.retention_count = data["retention_count"] if "compress_after_days" in data: policy.compress_after_days = data["compress_after_days"] if "archive_enabled" in data: policy.archive_enabled = data["archive_enabled"] if "archive_path" in data and data["archive_path"]: policy.archive_path = Path(data["archive_path"]) if "archive_after_days" in data: policy.archive_after_days = data["archive_after_days"] if "cleanup_interval_hours" in data: policy.cleanup_interval_hours = data["cleanup_interval_hours"] return policy class DataCleaner: """Cleans up old analytics data according to retention policies.""" def __init__( self, base_path: Path, policy: Optional[CleanupPolicy] = None ): """ Initialize data cleaner. Args: base_path: Base directory for analytics policy: Cleanup policy to use """ self.base_path = Path(base_path) self.metrics_dir = self.base_path / "metrics" self.policy = policy or CleanupPolicy() # Background task self._cleanup_task: Optional[asyncio.Task] = None self._stop_event = asyncio.Event() async def start(self) -> None: """Start background cleanup task.""" if self._cleanup_task and not self._cleanup_task.done(): logger.warning("Cleanup task already running") return self._stop_event.clear() self._cleanup_task = asyncio.create_task(self._cleanup_loop()) logger.info("Started analytics cleanup task") async def stop(self) -> None: """Stop background cleanup task.""" if not self._cleanup_task: return self._stop_event.set() try: await asyncio.wait_for(self._cleanup_task, timeout=5.0) except asyncio.TimeoutError: logger.warning("Cleanup task didn't stop gracefully, cancelling") self._cleanup_task.cancel() logger.info("Stopped analytics cleanup task") async def cleanup_now(self) -> Dict[str, Any]: """ Perform cleanup immediately. Returns: Cleanup statistics """ stats = { "files_deleted": 0, "files_compressed": 0, "files_archived": 0, "bytes_freed": 0, "errors": [] } try: # Compress old files compress_stats = await self._compress_old_files() stats["files_compressed"] = compress_stats["compressed"] stats["bytes_freed"] += compress_stats["bytes_saved"] # Archive old files if self.policy.archive_enabled: archive_stats = await self._archive_old_files() stats["files_archived"] = archive_stats["archived"] # Apply retention policy retention_stats = await self._apply_retention_policy() stats["files_deleted"] = retention_stats["deleted"] stats["bytes_freed"] += retention_stats["bytes_freed"] except Exception as e: logger.error(f"Error during cleanup: {e}") stats["errors"].append(str(e)) logger.info( f"Cleanup complete: {stats['files_deleted']} deleted, " f"{stats['files_compressed']} compressed, " f"{stats['files_archived']} archived, " f"{stats['bytes_freed']:,} bytes freed" ) return stats async def _cleanup_loop(self) -> None: """Background cleanup loop.""" interval = self.policy.cleanup_interval_hours * 3600 while not self._stop_event.is_set(): try: # Perform cleanup await self.cleanup_now() # Wait for next cleanup await asyncio.wait_for( self._stop_event.wait(), timeout=interval ) except asyncio.TimeoutError: # Timeout is expected - continue loop continue except Exception as e: logger.error(f"Error in cleanup loop: {e}") # Wait a bit before retrying await asyncio.sleep(60) async def _compress_old_files(self) -> Dict[str, Any]: """Compress files older than threshold.""" import gzip import aiofiles stats = {"compressed": 0, "bytes_saved": 0} cutoff = datetime.now(timezone.utc) - timedelta(days=self.policy.compress_after_days) # Find uncompressed files for file_path in self.metrics_dir.glob("metrics_*.jsonl"): try: # Check file age mtime = datetime.fromtimestamp( file_path.stat().st_mtime, tz=timezone.utc ) if mtime < cutoff: # Compress file original_size = file_path.stat().st_size compressed_path = file_path.with_suffix('.jsonl.gz') async with aiofiles.open(file_path, 'rb') as f_in: content = await f_in.read() async with aiofiles.open(compressed_path, 'wb') as f_out: compressed = gzip.compress(content, compresslevel=6) await f_out.write(compressed) # Remove original file_path.unlink() compressed_size = compressed_path.stat().st_size stats["compressed"] += 1 stats["bytes_saved"] += original_size - compressed_size logger.debug(f"Compressed {file_path.name} ({original_size} -> {compressed_size} bytes)") except Exception as e: logger.error(f"Failed to compress {file_path}: {e}") return stats async def _archive_old_files(self) -> Dict[str, Any]: """Archive files older than threshold.""" import shutil stats = {"archived": 0} if not self.policy.archive_path: # Default archive path self.policy.archive_path = self.base_path / "archive" # Ensure archive directory exists self.policy.archive_path.mkdir(parents=True, exist_ok=True) cutoff = datetime.now(timezone.utc) - timedelta(days=self.policy.archive_after_days) # Find files to archive for pattern in ["metrics_*.jsonl", "metrics_*.jsonl.gz"]: for file_path in self.metrics_dir.glob(pattern): try: # Check file age mtime = datetime.fromtimestamp( file_path.stat().st_mtime, tz=timezone.utc ) if mtime < cutoff: # Move to archive archive_dest = self.policy.archive_path / file_path.name shutil.move(str(file_path), str(archive_dest)) stats["archived"] += 1 logger.debug(f"Archived {file_path.name}") except Exception as e: logger.error(f"Failed to archive {file_path}: {e}") return stats async def _apply_retention_policy(self) -> Dict[str, Any]: """Apply retention policy to delete old files.""" stats = {"deleted": 0, "bytes_freed": 0} if self.policy.retention_policy == RetentionPolicy.KEEP_ALL: return stats # Get all metrics files files = [] for pattern in ["metrics_*.jsonl", "metrics_*.jsonl.gz"]: files.extend(self.metrics_dir.glob(pattern)) if self.policy.archive_enabled and self.policy.archive_path: files.extend(self.policy.archive_path.glob(pattern)) # Sort by modification time (newest first) files.sort(key=lambda f: f.stat().st_mtime, reverse=True) if self.policy.retention_policy == RetentionPolicy.KEEP_DAYS: cutoff = datetime.now(timezone.utc) - timedelta(days=self.policy.retention_days) for file_path in files: try: mtime = datetime.fromtimestamp( file_path.stat().st_mtime, tz=timezone.utc ) if mtime < cutoff: size = file_path.stat().st_size file_path.unlink() stats["deleted"] += 1 stats["bytes_freed"] += size logger.debug(f"Deleted {file_path.name} (age: {(datetime.now(timezone.utc) - mtime).days} days)") except Exception as e: logger.error(f"Failed to delete {file_path}: {e}") elif self.policy.retention_policy == RetentionPolicy.KEEP_SIZE: total_size = 0 size_limit = self.policy.retention_size_mb * 1024 * 1024 for file_path in files: try: size = file_path.stat().st_size total_size += size if total_size > size_limit: file_path.unlink() stats["deleted"] += 1 stats["bytes_freed"] += size logger.debug(f"Deleted {file_path.name} (total size exceeded)") except Exception as e: logger.error(f"Failed to delete {file_path}: {e}") elif self.policy.retention_policy == RetentionPolicy.KEEP_COUNT: # Keep only the newest N files for file_path in files[self.policy.retention_count:]: try: size = file_path.stat().st_size file_path.unlink() stats["deleted"] += 1 stats["bytes_freed"] += size logger.debug(f"Deleted {file_path.name} (count exceeded)") except Exception as e: logger.error(f"Failed to delete {file_path}: {e}") return stats async def get_storage_stats(self) -> Dict[str, Any]: """Get current storage statistics.""" stats = { "total_files": 0, "uncompressed_files": 0, "compressed_files": 0, "archived_files": 0, "total_size_bytes": 0, "uncompressed_size_bytes": 0, "compressed_size_bytes": 0, "archived_size_bytes": 0, "oldest_file": None, "newest_file": None } oldest_time = None newest_time = None # Analyze metrics directory for pattern in ["metrics_*.jsonl", "metrics_*.jsonl.gz"]: for file_path in self.metrics_dir.glob(pattern): stats["total_files"] += 1 size = file_path.stat().st_size stats["total_size_bytes"] += size if file_path.suffix == '.gz': stats["compressed_files"] += 1 stats["compressed_size_bytes"] += size else: stats["uncompressed_files"] += 1 stats["uncompressed_size_bytes"] += size # Track oldest/newest mtime = datetime.fromtimestamp( file_path.stat().st_mtime, tz=timezone.utc ) if oldest_time is None or mtime < oldest_time: oldest_time = mtime stats["oldest_file"] = file_path.name if newest_time is None or mtime > newest_time: newest_time = mtime stats["newest_file"] = file_path.name # Analyze archive directory if self.policy.archive_enabled and self.policy.archive_path and self.policy.archive_path.exists(): for pattern in ["metrics_*.jsonl", "metrics_*.jsonl.gz"]: for file_path in self.policy.archive_path.glob(pattern): stats["archived_files"] += 1 stats["archived_size_bytes"] += file_path.stat().st_size return stats

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server