"""
Work Preservation Utilities - Agent Orchestration Platform
This module implements utilities for preserving uncommitted work during
session deletion, including git commits, file backups, and state export.
Architecture:
- Pattern: Strategy pattern for different preservation methods
- Security: Secure file handling with validation
- Performance: Async operations for large file sets
Author: Adder_1 | Created: 2025-06-26 | Last Modified: 2025-06-26
"""
import asyncio
import json
import shutil
import subprocess
import tempfile
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from src.contracts_compat import ensure, require
from src.models.agent import AgentState
# Import type system
from src.models.session import SessionState
@dataclass
class PreservationResult:
"""Result of work preservation operation."""
success: bool
preserved_count: int
preservation_path: Optional[Path] = None
git_commits: List[str] = field(default_factory=list)
preserved_files: List[str] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
class WorkPreservationHandler:
"""
Handles preservation of uncommitted work during session deletion.
Provides comprehensive work preservation including:
- Git repository state and uncommitted changes
- Modified files outside version control
- Agent-specific work directories
- Session metadata and configuration
"""
def __init__(self, preservation_root: Optional[Path] = None):
"""
Initialize work preservation handler.
Args:
preservation_root: Root directory for preserved work
"""
self.preservation_root = (
preservation_root or Path.home() / ".claude_preserved_work"
)
self.preservation_root.mkdir(parents=True, exist_ok=True)
@require(lambda self, session_state: session_state is not None)
async def preserve_session_work(
self,
session_state: SessionState,
include_git: bool = True,
include_files: bool = True,
include_metadata: bool = True,
file_patterns: Optional[List[str]] = None,
) -> PreservationResult:
"""
Preserve all uncommitted work from a session.
Args:
session_state: Session being deleted
include_git: Whether to preserve git state
include_files: Whether to preserve modified files
include_metadata: Whether to preserve session metadata
file_patterns: Optional file patterns to include
Returns:
PreservationResult with preservation details
"""
# Create preservation directory
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
session_name_safe = "".join(
c if c.isalnum() or c in "_-" else "_" for c in session_state.name
)
preservation_dir = self.preservation_root / f"{session_name_safe}_{timestamp}"
preservation_dir.mkdir(parents=True, exist_ok=True)
result = PreservationResult(
success=True, preserved_count=0, preservation_path=preservation_dir
)
try:
# Preserve git state
if include_git:
git_result = await self._preserve_git_state(
session_state.root_path, preservation_dir / "git"
)
result.git_commits.extend(git_result.get("commits", []))
result.preserved_count += git_result.get("count", 0)
if git_result.get("errors"):
result.errors.extend(git_result["errors"])
# Preserve modified files
if include_files:
files_result = await self._preserve_files(
session_state.root_path, preservation_dir / "files", file_patterns
)
result.preserved_files.extend(files_result.get("files", []))
result.preserved_count += len(files_result.get("files", []))
if files_result.get("errors"):
result.errors.extend(files_result["errors"])
# Preserve session metadata
if include_metadata:
await self._preserve_metadata(
session_state, preservation_dir / "metadata.json"
)
result.preserved_count += 1
# Create preservation summary
await self._create_preservation_summary(result, preservation_dir)
result.metadata = {
"preservation_timestamp": timestamp,
"session_id": str(session_state.session_id),
"session_name": session_state.name,
"root_path": str(session_state.root_path),
}
except Exception as e:
result.success = False
result.errors.append(f"Preservation failed: {str(e)}")
return result
async def _preserve_git_state(
self, root_path: Path, preservation_path: Path
) -> Dict[str, Any]:
"""Preserve git repository state."""
preservation_path.mkdir(parents=True, exist_ok=True)
result = {"commits": [], "count": 0, "errors": []}
# Check if directory is a git repository
git_dir = root_path / ".git"
if not git_dir.exists():
return result
try:
# Get uncommitted changes
status_output = await self._run_git_command(
["git", "status", "--porcelain"], cwd=root_path
)
if status_output:
# Create patch of uncommitted changes
diff_output = await self._run_git_command(
["git", "diff", "HEAD"], cwd=root_path
)
if diff_output:
patch_file = preservation_path / "uncommitted_changes.patch"
patch_file.write_text(diff_output)
result["count"] += 1
# Save list of untracked files
untracked = [
line[3:]
for line in status_output.splitlines()
if line.startswith("?? ")
]
if untracked:
untracked_file = preservation_path / "untracked_files.txt"
untracked_file.write_text("\n".join(untracked))
# Copy untracked files
untracked_dir = preservation_path / "untracked"
untracked_dir.mkdir(exist_ok=True)
for file_path in untracked:
src = root_path / file_path
if src.is_file():
dst = untracked_dir / file_path
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
result["count"] += 1
# Save git log
log_output = await self._run_git_command(
["git", "log", "--oneline", "-20"], cwd=root_path
)
if log_output:
log_file = preservation_path / "recent_commits.log"
log_file.write_text(log_output)
result["commits"] = log_output.splitlines()
# Save current branch info
branch_output = await self._run_git_command(
["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=root_path
)
if branch_output:
branch_file = preservation_path / "current_branch.txt"
branch_file.write_text(branch_output.strip())
except Exception as e:
result["errors"].append(f"Git preservation error: {str(e)}")
return result
async def _preserve_files(
self,
root_path: Path,
preservation_path: Path,
file_patterns: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Preserve modified files."""
preservation_path.mkdir(parents=True, exist_ok=True)
result = {"files": [], "errors": []}
# Default patterns if none provided
if not file_patterns:
file_patterns = [
"*.py",
"*.js",
"*.ts",
"*.jsx",
"*.tsx",
"*.json",
"*.yaml",
"*.yml",
"*.toml",
"*.md",
"*.txt",
"*.log",
"Dockerfile",
"docker-compose.yml",
".env*",
"requirements.txt",
"package.json",
]
try:
# Find files matching patterns
preserved_files = []
for pattern in file_patterns:
for file_path in root_path.rglob(pattern):
if file_path.is_file() and not self._should_ignore(file_path):
# Skip if in .git directory
if ".git" in file_path.parts:
continue
# Calculate relative path
rel_path = file_path.relative_to(root_path)
# Copy file preserving structure
dst_path = preservation_path / rel_path
dst_path.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.copy2(file_path, dst_path)
preserved_files.append(str(rel_path))
except Exception as e:
result["errors"].append(
f"Failed to preserve {rel_path}: {e}"
)
result["files"] = preserved_files
except Exception as e:
result["errors"].append(f"File preservation error: {str(e)}")
return result
def _should_ignore(self, file_path: Path) -> bool:
"""Check if file should be ignored."""
ignore_patterns = [
"__pycache__",
".pytest_cache",
"node_modules",
".venv",
"venv",
"env",
".env",
"*.pyc",
"*.pyo",
"*.so",
"*.dylib",
".DS_Store",
"Thumbs.db",
]
path_str = str(file_path)
return any(pattern in path_str for pattern in ignore_patterns)
async def _preserve_metadata(
self, session_state: SessionState, metadata_path: Path
) -> None:
"""Preserve session metadata."""
metadata = {
"session_id": str(session_state.session_id),
"session_name": session_state.name,
"root_path": str(session_state.root_path),
"created_at": session_state.created_at.isoformat(),
"last_activity": session_state.last_activity.isoformat(),
"security_level": session_state.security_level.value,
"agent_count": len(session_state.agents),
"preservation_timestamp": datetime.now().isoformat(),
}
# Add agent information
if session_state.agents:
metadata["agents"] = [
{
"agent_id": str(agent_id),
"agent_name": f"Agent_{i+1}", # Reconstruct name
}
for i, agent_id in enumerate(session_state.agents)
]
# Write metadata
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
async def _create_preservation_summary(
self, result: PreservationResult, preservation_dir: Path
) -> None:
"""Create human-readable preservation summary."""
summary = f"""Work Preservation Summary
========================
Preservation Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Preservation Path: {preservation_dir}
Preserved Items:
- Total Count: {result.preserved_count}
- Git Commits: {len(result.git_commits)}
- Files: {len(result.preserved_files)}
"""
if result.git_commits:
summary += "Recent Git Commits:\n"
for commit in result.git_commits[:10]:
summary += f" - {commit}\n"
summary += "\n"
if result.preserved_files:
summary += f"Preserved Files ({len(result.preserved_files)}):\n"
for file_path in sorted(result.preserved_files)[:20]:
summary += f" - {file_path}\n"
if len(result.preserved_files) > 20:
summary += f" ... and {len(result.preserved_files) - 20} more\n"
summary += "\n"
if result.errors:
summary += "Errors:\n"
for error in result.errors:
summary += f" - {error}\n"
summary_path = preservation_dir / "PRESERVATION_SUMMARY.txt"
summary_path.write_text(summary)
async def _run_git_command(self, command: List[str], cwd: Path) -> Optional[str]:
"""Run git command and return output."""
try:
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
return stdout.decode("utf-8")
else:
return None
except Exception:
return None
async def list_preserved_sessions(self) -> List[Dict[str, Any]]:
"""List all preserved session work."""
preserved = []
for path in self.preservation_root.iterdir():
if path.is_dir():
metadata_file = path / "metadata.json"
if metadata_file.exists():
try:
with open(metadata_file) as f:
metadata = json.load(f)
preserved.append(
{
"path": path,
"session_name": metadata.get("session_name"),
"preservation_time": metadata.get(
"preservation_timestamp"
),
"session_id": metadata.get("session_id"),
}
)
except Exception:
pass
return sorted(preserved, key=lambda x: x["preservation_time"], reverse=True)
# Export public interface
__all__ = ["WorkPreservationHandler", "PreservationResult"]