"""
Automatic archival of old context to embedding storage.
This module provides the ContextArchiver class for automatically moving
old completed tasks and messages to the embedding database, reducing
the size of the active state.json file.
GitHub Issue #9: Token Embedding for Context Efficiency
"""
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any, List, Optional
from .core import read_with_lock, write_with_lock, get_state_file, get_context_bus_dir
from .embeddings import EmbeddingManager
class ContextArchiver:
"""Handles automatic archival of old context items to embedding storage."""
def __init__(self, archive_after_hours: float = 1.0):
"""
Initialize the archiver.
Args:
archive_after_hours: Archive items older than this many hours (default: 1)
"""
self.archive_after_hours = archive_after_hours
db_path = get_context_bus_dir() / "context.db"
self.embedding_manager = EmbeddingManager(db_path)
def should_archive(self, timestamp: Any) -> bool:
"""
Check if an item is old enough to archive.
Args:
timestamp: Can be ISO format string, Unix timestamp (float), or None
Returns:
True if item should be archived, False otherwise
"""
if not timestamp:
return False
try:
# Handle different timestamp formats
if isinstance(timestamp, (int, float)):
# Unix timestamp
item_time = datetime.fromtimestamp(timestamp)
elif isinstance(timestamp, str):
# Try ISO format first
try:
item_time = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
except ValueError:
# Try parsing as Unix timestamp string
item_time = datetime.fromtimestamp(float(timestamp))
else:
return False
cutoff = datetime.now() - timedelta(hours=self.archive_after_hours)
return item_time < cutoff
except (ValueError, TypeError, OSError):
return False
def archive_old_context(self) -> Dict[str, int]:
"""
Archive old completed tasks and messages to embeddings.
Returns:
Statistics about archived items: {'tasks': N, 'messages': N}
"""
state_file = get_state_file()
try:
state = read_with_lock(state_file)
except FileNotFoundError:
return {"tasks": 0, "messages": 0}
if not state:
return {"tasks": 0, "messages": 0}
stats = {"tasks": 0, "messages": 0}
# Archive completed tasks that are old enough
next_steps = state.get("next_steps", [])
remaining_tasks = []
for task in next_steps:
if not isinstance(task, dict):
remaining_tasks.append(task)
continue
# Only archive completed tasks
if task.get("status") == "completed":
# Check if old enough (use completed_at or created_at)
archive_time = task.get("completed_at") or task.get("created_at")
if self.should_archive(archive_time):
# Prepare task for archival
archive_task = {
"id": task.get("id", task.get("task", "")[:50]),
"title": task.get("task", task.get("title", "")),
"description": task.get("description", task.get("outcome", "")),
"created_at": self._format_timestamp(task.get("created_at")),
"completed_at": self._format_timestamp(task.get("completed_at")),
"metadata": {
"assigned_to": task.get("assigned_to"),
"completed_by": task.get("completed_by"),
"priority": task.get("priority"),
},
}
self.embedding_manager.archive_task(archive_task)
stats["tasks"] += 1
continue
remaining_tasks.append(task)
# Archive old messages
messages = state.get("messages", [])
remaining_messages = []
for i, message in enumerate(messages):
if isinstance(message, str):
# Parse timestamp from message format "[HH:MM:SS] content"
msg_timestamp = self._extract_message_timestamp(message)
if self.should_archive(msg_timestamp):
archive_msg = {
"id": f"msg-{i}-{hash(message) % 10000}",
"sender": self._extract_sender(message),
"message": message,
"timestamp": self._format_timestamp(msg_timestamp),
}
self.embedding_manager.archive_message(archive_msg)
stats["messages"] += 1
continue
elif isinstance(message, dict):
msg_timestamp = message.get("timestamp")
if self.should_archive(msg_timestamp):
self.embedding_manager.archive_message(message)
stats["messages"] += 1
continue
remaining_messages.append(message)
# Update state with only non-archived items
state["next_steps"] = remaining_tasks
state["messages"] = remaining_messages
# Update archival statistics
if "archival_stats" not in state:
state["archival_stats"] = {
"total_archived_tasks": 0,
"total_archived_messages": 0,
}
state["archival_stats"]["last_archived"] = datetime.now().isoformat()
state["archival_stats"]["total_archived_tasks"] = (
state["archival_stats"].get("total_archived_tasks", 0) + stats["tasks"]
)
state["archival_stats"]["total_archived_messages"] = (
state["archival_stats"].get("total_archived_messages", 0) + stats["messages"]
)
state["timestamp"] = time.time()
write_with_lock(state_file, state)
return stats
def get_archival_stats(self) -> Dict[str, Any]:
"""
Get current archival statistics.
Returns:
Dictionary with archival stats and embedding counts
"""
state_file = get_state_file()
try:
state = read_with_lock(state_file)
except FileNotFoundError:
state = {}
archival_stats = state.get("archival_stats", {})
return {
"last_archived": archival_stats.get("last_archived"),
"total_archived_tasks": archival_stats.get("total_archived_tasks", 0),
"total_archived_messages": archival_stats.get("total_archived_messages", 0),
"embedded_tasks": self.embedding_manager.get_task_count(),
"embedded_messages": self.embedding_manager.get_message_count(),
"archive_after_hours": self.archive_after_hours,
}
def _format_timestamp(self, timestamp: Any) -> str:
"""Convert various timestamp formats to ISO string."""
if not timestamp:
return ""
if isinstance(timestamp, str):
return timestamp
if isinstance(timestamp, (int, float)):
try:
return datetime.fromtimestamp(timestamp).isoformat()
except (ValueError, OSError):
return ""
return ""
def _extract_message_timestamp(self, message: str) -> Optional[float]:
"""
Extract timestamp from message string format "[HH:MM:SS] content".
Returns None if no timestamp found, or approximates based on current time.
"""
if not message or not message.startswith("["):
return None
try:
# Extract time portion
end_bracket = message.find("]")
if end_bracket == -1:
return None
time_str = message[1:end_bracket]
# Parse HH:MM:SS format
parts = time_str.split(":")
if len(parts) != 3:
return None
# Use today's date with the extracted time
# (This is approximate - messages don't have full timestamps)
today = datetime.now().replace(
hour=int(parts[0]),
minute=int(parts[1]),
second=int(parts[2]),
microsecond=0,
)
return today.timestamp()
except (ValueError, IndexError):
return None
def _extract_sender(self, message: str) -> str:
"""Extract sender from message string format '[HH:MM:SS] Sender: content'."""
if not message:
return "unknown"
# Skip timestamp
end_bracket = message.find("]")
if end_bracket == -1:
content = message
else:
content = message[end_bracket + 1:].strip()
# Find sender (before first colon)
colon_pos = content.find(":")
if colon_pos > 0 and colon_pos < 50: # Reasonable sender name length
return content[:colon_pos].strip()
return "unknown"
def run_archival(archive_after_hours: float = 1.0) -> Dict[str, int]:
"""
Convenience function to run archival.
Args:
archive_after_hours: Archive items older than this many hours
Returns:
Statistics about archived items
"""
archiver = ContextArchiver(archive_after_hours)
return archiver.archive_old_context()