Skip to main content
Glama
writer.py12.2 kB
""" JSONL Writer for Analytics Engine. Handles writing metrics to JSONL files with: - Atomic writes - File rotation - Compression support - Thread-safe operations """ import json import asyncio import aiofiles from pathlib import Path from datetime import datetime, timezone from typing import Dict, Any, Optional, List from dataclasses import dataclass, asdict from contextlib import asynccontextmanager import gzip import uuid from enum import Enum from ..utils.logging import get_logger from ..utils.errors import ShannonError logger = get_logger(__name__) class MetricType(str, Enum): """Types of metrics we track.""" SESSION_START = "session_start" SESSION_END = "session_end" TOOL_USE = "tool_use" AGENT_EXECUTION = "agent_execution" CHECKPOINT_CREATED = "checkpoint_created" HOOK_TRIGGERED = "hook_triggered" COMMAND_EXECUTED = "command_executed" ERROR_OCCURRED = "error_occurred" TOKEN_USAGE = "token_usage" PERFORMANCE = "performance" @dataclass class MetricEntry: """A single metric entry.""" id: str timestamp: datetime type: MetricType session_id: Optional[str] user_id: Optional[str] data: Dict[str, Any] metadata: Dict[str, Any] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { "id": self.id, "timestamp": self.timestamp.isoformat(), "type": self.type.value, "session_id": self.session_id, "user_id": self.user_id, "data": self.data, "metadata": self.metadata } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "MetricEntry": """Create from dictionary.""" return cls( id=data["id"], timestamp=datetime.fromisoformat(data["timestamp"]), type=MetricType(data["type"]), session_id=data.get("session_id"), user_id=data.get("user_id"), data=data.get("data", {}), metadata=data.get("metadata", {}) ) class JSONLWriter: """Writes metrics to JSONL files with rotation and compression.""" def __init__( self, base_path: Path, max_file_size: int = 100 * 1024 * 1024, # 100MB max_files: int = 10, compress_old: bool = True, buffer_size: int = 100 ): """ Initialize JSONL writer. Args: base_path: Base directory for analytics files max_file_size: Maximum size before rotation max_files: Maximum number of files to keep compress_old: Whether to compress rotated files buffer_size: Number of entries to buffer before writing """ self.base_path = Path(base_path) self.max_file_size = max_file_size self.max_files = max_files self.compress_old = compress_old self.buffer_size = buffer_size # Create directory if needed self.base_path.mkdir(parents=True, exist_ok=True) # Current file and buffer self.current_file: Optional[Path] = None self.buffer: List[MetricEntry] = [] self.write_lock = asyncio.Lock() # File handle cache self._file_handle = None @property def metrics_dir(self) -> Path: """Get metrics directory.""" return self.base_path / "metrics" async def initialize(self) -> None: """Initialize writer and ensure current file exists.""" self.metrics_dir.mkdir(parents=True, exist_ok=True) await self._ensure_current_file() async def write(self, entry: MetricEntry) -> None: """ Write a metric entry. Args: entry: Metric entry to write """ async with self.write_lock: self.buffer.append(entry) # Flush if buffer is full if len(self.buffer) >= self.buffer_size: await self._flush_buffer() async def write_batch(self, entries: List[MetricEntry]) -> None: """ Write multiple metric entries. Args: entries: List of metric entries to write """ async with self.write_lock: self.buffer.extend(entries) # Flush if buffer is full if len(self.buffer) >= self.buffer_size: await self._flush_buffer() async def flush(self) -> None: """Force flush the buffer.""" async with self.write_lock: await self._flush_buffer() async def _flush_buffer(self) -> None: """Flush buffer to disk.""" if not self.buffer: return await self._ensure_current_file() # Write entries async with aiofiles.open(self.current_file, 'a') as f: for entry in self.buffer: line = json.dumps(entry.to_dict(), separators=(',', ':')) await f.write(line + '\n') logger.debug(f"Flushed {len(self.buffer)} metrics to {self.current_file}") self.buffer.clear() # Check if rotation needed await self._check_rotation() async def _ensure_current_file(self) -> None: """Ensure we have a current file to write to.""" if self.current_file and self.current_file.exists(): return # Find or create current file existing = sorted( self.metrics_dir.glob("metrics_*.jsonl"), key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True ) if existing and existing[0].stat().st_size < self.max_file_size: self.current_file = existing[0] else: # Create new file timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") self.current_file = self.metrics_dir / f"metrics_{timestamp}.jsonl" self.current_file.touch() logger.info(f"Created new metrics file: {self.current_file}") async def _check_rotation(self) -> None: """Check if file rotation is needed.""" if not self.current_file or not self.current_file.exists(): return size = self.current_file.stat().st_size if size >= self.max_file_size: await self._rotate_files() async def _rotate_files(self) -> None: """Rotate metrics files.""" logger.info(f"Rotating metrics file: {self.current_file}") # Compress current file if needed if self.compress_old and self.current_file: compressed = self.current_file.with_suffix('.jsonl.gz') async with aiofiles.open(self.current_file, 'rb') as f_in: content = await f_in.read() async with aiofiles.open(compressed, 'wb') as f_out: compressed_content = gzip.compress(content) await f_out.write(compressed_content) # Remove original self.current_file.unlink() logger.info(f"Compressed {self.current_file} to {compressed}") # Clean up old files await self._cleanup_old_files() # Reset current file self.current_file = None await self._ensure_current_file() async def _cleanup_old_files(self) -> None: """Remove old files exceeding max_files limit.""" # Get all metrics files files = [] for pattern in ["metrics_*.jsonl", "metrics_*.jsonl.gz"]: files.extend(self.metrics_dir.glob(pattern)) # Sort by modification time files.sort(key=lambda p: p.stat().st_mtime, reverse=True) # Remove excess files for f in files[self.max_files:]: f.unlink() logger.info(f"Removed old metrics file: {f}") async def close(self) -> None: """Close writer and flush remaining data.""" await self.flush() # Context manager support async def __aenter__(self): await self.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() class MetricsWriter: """High-level metrics writer with convenience methods.""" def __init__(self, writer: JSONLWriter): """ Initialize metrics writer. Args: writer: Underlying JSONL writer """ self.writer = writer async def track_session_start( self, session_id: str, user_id: Optional[str] = None, project_path: Optional[str] = None, model: Optional[str] = None, **kwargs ) -> None: """Track session start.""" entry = MetricEntry( id=str(uuid.uuid4()), timestamp=datetime.now(timezone.utc), type=MetricType.SESSION_START, session_id=session_id, user_id=user_id, data={ "project_path": project_path, "model": model, **kwargs }, metadata={} ) await self.writer.write(entry) async def track_session_end( self, session_id: str, user_id: Optional[str] = None, duration_seconds: Optional[float] = None, token_count: Optional[int] = None, **kwargs ) -> None: """Track session end.""" entry = MetricEntry( id=str(uuid.uuid4()), timestamp=datetime.now(timezone.utc), type=MetricType.SESSION_END, session_id=session_id, user_id=user_id, data={ "duration_seconds": duration_seconds, "token_count": token_count, **kwargs }, metadata={} ) await self.writer.write(entry) async def track_tool_use( self, session_id: str, tool_name: str, success: bool, duration_ms: Optional[float] = None, **kwargs ) -> None: """Track tool usage.""" entry = MetricEntry( id=str(uuid.uuid4()), timestamp=datetime.now(timezone.utc), type=MetricType.TOOL_USE, session_id=session_id, user_id=None, data={ "tool_name": tool_name, "success": success, "duration_ms": duration_ms, **kwargs }, metadata={} ) await self.writer.write(entry) async def track_error( self, session_id: Optional[str], error_type: str, error_message: str, stack_trace: Optional[str] = None, **kwargs ) -> None: """Track error occurrence.""" entry = MetricEntry( id=str(uuid.uuid4()), timestamp=datetime.now(timezone.utc), type=MetricType.ERROR_OCCURRED, session_id=session_id, user_id=None, data={ "error_type": error_type, "error_message": error_message, "stack_trace": stack_trace, **kwargs }, metadata={} ) await self.writer.write(entry) async def track_performance( self, operation: str, duration_ms: float, success: bool, session_id: Optional[str] = None, **kwargs ) -> None: """Track performance metrics.""" entry = MetricEntry( id=str(uuid.uuid4()), timestamp=datetime.now(timezone.utc), type=MetricType.PERFORMANCE, session_id=session_id, user_id=None, data={ "operation": operation, "duration_ms": duration_ms, "success": success, **kwargs }, metadata={} ) await self.writer.write(entry)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krzemienski/shannon-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server