Skip to main content
Glama

Scribe MCP Server

by paxocial
audit.py14.5 kB
""" Audit trail management for Scribe MCP log rotation system. Provides persistent storage and retrieval of rotation metadata, maintaining complete audit trails for log integrity verification. """ import json import os import uuid from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from datetime import datetime import threading class AuditTrailManager: """ Manages audit trail storage and retrieval for log rotation events. Provides thread-safe operations for storing, retrieving, and verifying rotation metadata with JSON-based persistence. """ def __init__(self, state_dir: str = None): """ Initialize audit trail manager. Args: state_dir: Directory for storing audit trail files (defaults to ../state relative to this file) """ # Determine state directory if state_dir is None: current_file = Path(__file__) state_dir = current_file.parent.parent / "state" self.state_dir = Path(state_dir) self.state_dir.mkdir(parents=True, exist_ok=True) # Thread lock for atomic operations self._lock = threading.RLock() def _get_audit_file_path(self, project_name: str) -> Path: """Get the audit trail file path for a project.""" # Sanitize project name for filesystem safety safe_name = "".join(c if c.isalnum() or c in "-_" else "_" for c in project_name) return self.state_dir / f"rotation_audit_{safe_name}.json" def store_rotation_metadata(self, project_name: str, metadata: Dict[str, Any]) -> bool: """ Store rotation metadata in the audit trail. Args: project_name: Name of the project metadata: Rotation metadata to store Returns: True if storage successful, False otherwise """ try: with self._lock: audit_file = self._get_audit_file_path(project_name) # Load existing audit trail or create new audit_trail = self._load_audit_trail(project_name) # Add timestamp if not present if "stored_timestamp" not in metadata: metadata["stored_timestamp"] = datetime.utcnow().isoformat() + " UTC" # Add to audit trail audit_trail["rotations"].append(metadata) audit_trail["total_rotations"] = len(audit_trail["rotations"]) audit_trail["last_updated"] = metadata["stored_timestamp"] # Atomic write to file temp_file = audit_file.with_suffix(".tmp") try: with open(temp_file, 'w', encoding='utf-8') as f: json.dump(audit_trail, f, indent=2, ensure_ascii=False) # Atomic rename temp_file.rename(audit_file) except Exception as e: # Cleanup temp file if write fails if temp_file.exists(): temp_file.unlink() raise e return True except Exception as e: # Log error but don't raise - rotation should continue print(f"Error storing rotation metadata for {project_name}: {e}") return False def _load_audit_trail(self, project_name: str) -> Dict[str, Any]: """ Load audit trail from file or create new structure. Args: project_name: Name of the project Returns: Audit trail dictionary """ audit_file = self._get_audit_file_path(project_name) if not audit_file.exists(): # Create new audit trail structure return { "project_name": project_name, "created_timestamp": datetime.utcnow().isoformat() + " UTC", "rotations": [], "total_rotations": 0, "last_updated": None, "version": "1.0" } try: with open(audit_file, 'r', encoding='utf-8') as f: audit_trail = json.load(f) # Validate structure if not isinstance(audit_trail, dict): raise ValueError("Invalid audit trail format") # Ensure required fields audit_trail.setdefault("project_name", project_name) audit_trail.setdefault("rotations", []) audit_trail.setdefault("total_rotations", len(audit_trail["rotations"])) audit_trail.setdefault("version", "1.0") return audit_trail except (json.JSONDecodeError, ValueError) as e: print(f"Error loading audit trail for {project_name}: {e}") # Return fresh structure if file is corrupted return { "project_name": project_name, "created_timestamp": datetime.utcnow().isoformat() + " UTC", "rotations": [], "total_rotations": 0, "last_updated": None, "version": "1.0", "corruption_note": f"File corrupted at {datetime.utcnow().isoformat() + ' UTC'}" } def get_rotation_history(self, project_name: str, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ Get rotation history for a project. Args: project_name: Name of the project limit: Maximum number of rotations to return (most recent first) Returns: List of rotation metadata dictionaries """ try: audit_trail = self._load_audit_trail(project_name) rotations = audit_trail.get("rotations", []) # Sort by timestamp (most recent first) rotations.sort(key=lambda x: x.get("rotation_timestamp", ""), reverse=True) if limit: rotations = rotations[:limit] return rotations except Exception as e: print(f"Error getting rotation history for {project_name}: {e}") return [] def get_rotation_by_uuid(self, project_name: str, rotation_uuid: str) -> Optional[Dict[str, Any]]: """ Get specific rotation by UUID. Args: project_name: Name of the project rotation_uuid: UUID of the rotation to find Returns: Rotation metadata dictionary or None if not found """ try: audit_trail = self._load_audit_trail(project_name) rotations = audit_trail.get("rotations", []) for rotation in rotations: if rotation.get("rotation_uuid") == rotation_uuid: return rotation return None except Exception as e: print(f"Error getting rotation {rotation_uuid} for {project_name}: {e}") return None def verify_rotation_integrity(self, project_name: str, rotation_uuid: str) -> Tuple[bool, str]: """ Verify the integrity of a specific rotation. Args: project_name: Name of the project rotation_uuid: UUID of the rotation to verify Returns: Tuple of (is_valid: bool, verification_message: str) """ try: rotation = self.get_rotation_by_uuid(project_name, rotation_uuid) if not rotation: return False, f"Rotation {rotation_uuid} not found" archived_file_path = rotation.get("archived_file_path") expected_hash = rotation.get("file_hash") if not archived_file_path or not expected_hash: return False, "Missing file path or hash in rotation metadata" # Check if file still exists if not Path(archived_file_path).exists(): return False, f"Archived file not found: {archived_file_path}" # Import integrity utilities from .integrity import verify_file_integrity # Verify file integrity is_valid, actual_hash = verify_file_integrity(archived_file_path, expected_hash) if is_valid: return True, f"File integrity verified: {actual_hash[:16]}..." else: return False, f"File integrity compromised: expected {expected_hash[:16]}..., got {actual_hash[:16]}..." except Exception as e: return False, f"Error verifying rotation integrity: {e}" def get_audit_summary(self, project_name: str) -> Dict[str, Any]: """ Get audit trail summary for a project. Args: project_name: Name of the project Returns: Audit summary dictionary """ try: audit_trail = self._load_audit_trail(project_name) rotations = audit_trail.get("rotations", []) if not rotations: return { "project_name": project_name, "total_rotations": 0, "oldest_rotation": None, "newest_rotation": None, "total_entries_archived": 0, "total_size_archived": 0, "last_updated": audit_trail.get("last_updated") } # Calculate summary statistics total_entries = sum(r.get("entry_count", 0) for r in rotations if r.get("entry_count", 0) > 0) total_size = sum(r.get("file_size", 0) for r in rotations if r.get("file_size", 0) > 0) # Sort by timestamp for oldest/newest sorted_rotations = sorted(rotations, key=lambda x: x.get("rotation_timestamp", "")) return { "project_name": project_name, "total_rotations": len(rotations), "oldest_rotation": sorted_rotations[0].get("rotation_timestamp") if sorted_rotations else None, "newest_rotation": sorted_rotations[-1].get("rotation_timestamp") if sorted_rotations else None, "total_entries_archived": total_entries, "total_size_archived": total_size, "last_updated": audit_trail.get("last_updated"), "audit_file_path": str(self._get_audit_file_path(project_name)) } except Exception as e: return { "project_name": project_name, "error": str(e), "total_rotations": 0 } def cleanup_old_rotations(self, project_name: str, keep_count: int = 50) -> Tuple[int, bool]: """ Clean up old rotation records, keeping only the most recent ones. Args: project_name: Name of the project keep_count: Number of recent rotations to keep Returns: Tuple of (rotations_removed: int, success: bool) """ try: with self._lock: audit_trail = self._load_audit_trail(project_name) rotations = audit_trail.get("rotations", []) if len(rotations) <= keep_count: return 0, True # Sort by timestamp (most recent first) rotations.sort(key=lambda x: x.get("rotation_timestamp", ""), reverse=True) # Keep only the most recent rotations kept_rotations = rotations[:keep_count] removed_count = len(rotations) - keep_count # Update audit trail audit_trail["rotations"] = kept_rotations audit_trail["total_rotations"] = len(kept_rotations) audit_trail["last_updated"] = datetime.utcnow().isoformat() + " UTC" audit_trail["cleanup_note"] = f"Removed {removed_count} old rotation records" # Save updated audit trail audit_file = self._get_audit_file_path(project_name) with open(audit_file, 'w', encoding='utf-8') as f: json.dump(audit_trail, f, indent=2, ensure_ascii=False) return removed_count, True except Exception as e: print(f"Error cleaning up old rotations for {project_name}: {e}") return 0, False def list_audited_projects(self) -> List[str]: """ List all projects that have audit trails. Returns: List of project names """ try: projects = [] for file_path in self.state_dir.glob("rotation_audit_*.json"): # Extract project name from filename filename = file_path.stem # Remove .json extension if filename.startswith("rotation_audit_"): project_name = filename[len("rotation_audit_"):] # Reverse the sanitization project_name = project_name.replace("_", " ") projects.append(project_name) return sorted(projects) except Exception as e: print(f"Error listing audited projects: {e}") return [] # Global audit manager instance _audit_manager = None def get_audit_manager(state_dir: str = None) -> AuditTrailManager: """ Get global audit manager instance. Args: state_dir: Directory for storing audit trail files Returns: AuditTrailManager instance """ global _audit_manager if _audit_manager is None: _audit_manager = AuditTrailManager(state_dir) return _audit_manager # Convenience functions for direct usage def store_rotation_metadata(project_name: str, metadata: Dict[str, Any]) -> bool: """Store rotation metadata for a project.""" return get_audit_manager().store_rotation_metadata(project_name, metadata) def get_rotation_history(project_name: str, limit: Optional[int] = None) -> List[Dict[str, Any]]: """Get rotation history for a project.""" return get_audit_manager().get_rotation_history(project_name, limit) def verify_rotation_integrity(project_name: str, rotation_uuid: str) -> Tuple[bool, str]: """Verify the integrity of a specific rotation.""" return get_audit_manager().verify_rotation_integrity(project_name, rotation_uuid) def get_audit_summary(project_name: str) -> Dict[str, Any]: """Get audit trail summary for a project.""" return get_audit_manager().get_audit_summary(project_name)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/paxocial/scribe_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server