"""
Unit tests for automatic context archival.
GitHub Issue #9: Token Embedding for Context Efficiency
"""
import json
import tempfile
import time
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import patch
import pytest
from amicus.archival import ContextArchiver, run_archival
from amicus.core import write_with_lock, read_with_lock
@pytest.fixture
def temp_context_dir(tmp_path):
"""Create a temporary context directory."""
context_dir = tmp_path / ".amicus"
context_dir.mkdir()
with patch("amicus.archival.get_context_bus_dir", return_value=context_dir):
with patch("amicus.archival.get_state_file", return_value=context_dir / "state.json"):
with patch("amicus.core.get_state_file", return_value=context_dir / "state.json"):
yield context_dir
class TestShouldArchive:
"""Test suite for should_archive logic."""
def test_should_archive_old_unix_timestamp(self, temp_context_dir):
"""Test that old Unix timestamps are marked for archival."""
archiver = ContextArchiver(archive_after_hours=1.0)
# 2 hours ago
old_timestamp = time.time() - (2 * 60 * 60)
assert archiver.should_archive(old_timestamp) is True
def test_should_not_archive_recent_unix_timestamp(self, temp_context_dir):
"""Test that recent Unix timestamps are not archived."""
archiver = ContextArchiver(archive_after_hours=1.0)
# 30 minutes ago
recent_timestamp = time.time() - (30 * 60)
assert archiver.should_archive(recent_timestamp) is False
def test_should_archive_old_iso_timestamp(self, temp_context_dir):
"""Test that old ISO timestamps are marked for archival."""
archiver = ContextArchiver(archive_after_hours=1.0)
old_time = datetime.now() - timedelta(hours=2)
old_iso = old_time.isoformat()
assert archiver.should_archive(old_iso) is True
def test_should_not_archive_recent_iso_timestamp(self, temp_context_dir):
"""Test that recent ISO timestamps are not archived."""
archiver = ContextArchiver(archive_after_hours=1.0)
recent_time = datetime.now() - timedelta(minutes=30)
recent_iso = recent_time.isoformat()
assert archiver.should_archive(recent_iso) is False
def test_should_not_archive_none(self, temp_context_dir):
"""Test that None timestamps are not archived."""
archiver = ContextArchiver(archive_after_hours=1.0)
assert archiver.should_archive(None) is False
def test_should_not_archive_invalid_string(self, temp_context_dir):
"""Test that invalid timestamps are not archived."""
archiver = ContextArchiver(archive_after_hours=1.0)
assert archiver.should_archive("not-a-timestamp") is False
def test_custom_archive_hours(self, temp_context_dir):
"""Test custom archive_after_hours setting."""
archiver = ContextArchiver(archive_after_hours=24.0)
# 12 hours ago - should NOT be archived with 24h threshold
twelve_hours_ago = time.time() - (12 * 60 * 60)
assert archiver.should_archive(twelve_hours_ago) is False
# 25 hours ago - should be archived
twenty_five_hours_ago = time.time() - (25 * 60 * 60)
assert archiver.should_archive(twenty_five_hours_ago) is True
class TestArchiveOldContext:
"""Test suite for archive_old_context functionality."""
def test_archive_completed_old_tasks(self, temp_context_dir):
"""Test that completed old tasks are archived."""
state_file = temp_context_dir / "state.json"
# Create state with old completed task
old_time = time.time() - (2 * 60 * 60) # 2 hours ago
state = {
"summary": "Test",
"next_steps": [
{
"id": "task-1",
"task": "Old completed task",
"status": "completed",
"completed_at": old_time,
"created_at": old_time - 3600,
},
{
"id": "task-2",
"task": "Recent task",
"status": "pending",
},
],
"active_files": [],
"messages": [],
}
write_with_lock(state_file, state)
archiver = ContextArchiver(archive_after_hours=1.0)
stats = archiver.archive_old_context()
assert stats["tasks"] == 1
# Verify task removed from state
updated_state = read_with_lock(state_file)
assert len(updated_state["next_steps"]) == 1
assert updated_state["next_steps"][0]["id"] == "task-2"
# Verify archived to embedding DB
assert archiver.embedding_manager.get_task_count() == 1
def test_do_not_archive_recent_completed_tasks(self, temp_context_dir):
"""Test that recent completed tasks are not archived."""
state_file = temp_context_dir / "state.json"
# Create state with recent completed task
recent_time = time.time() - (30 * 60) # 30 minutes ago
state = {
"summary": "Test",
"next_steps": [
{
"id": "task-1",
"task": "Recent completed task",
"status": "completed",
"completed_at": recent_time,
},
],
"active_files": [],
"messages": [],
}
write_with_lock(state_file, state)
archiver = ContextArchiver(archive_after_hours=1.0)
stats = archiver.archive_old_context()
assert stats["tasks"] == 0
# Task should still be in state
updated_state = read_with_lock(state_file)
assert len(updated_state["next_steps"]) == 1
def test_do_not_archive_pending_tasks(self, temp_context_dir):
"""Test that pending tasks are never archived."""
state_file = temp_context_dir / "state.json"
# Create state with old but pending task
old_time = time.time() - (24 * 60 * 60) # 24 hours ago
state = {
"summary": "Test",
"next_steps": [
{
"id": "task-1",
"task": "Old pending task",
"status": "pending",
"created_at": old_time,
},
],
"active_files": [],
"messages": [],
}
write_with_lock(state_file, state)
archiver = ContextArchiver(archive_after_hours=1.0)
stats = archiver.archive_old_context()
assert stats["tasks"] == 0
# Task should still be in state
updated_state = read_with_lock(state_file)
assert len(updated_state["next_steps"]) == 1
def test_archive_old_messages(self, temp_context_dir):
"""Test that old messages are archived."""
state_file = temp_context_dir / "state.json"
# Create state with old message (dict format)
old_time = datetime.now() - timedelta(hours=2)
state = {
"summary": "Test",
"next_steps": [],
"active_files": [],
"messages": [
{
"id": "msg-1",
"sender": "Agent-A",
"message": "Old message content",
"timestamp": old_time.isoformat(),
},
{
"id": "msg-2",
"sender": "Agent-B",
"message": "Recent message",
"timestamp": datetime.now().isoformat(),
},
],
}
write_with_lock(state_file, state)
archiver = ContextArchiver(archive_after_hours=1.0)
stats = archiver.archive_old_context()
assert stats["messages"] == 1
# Verify old message removed, recent kept
updated_state = read_with_lock(state_file)
assert len(updated_state["messages"]) == 1
assert updated_state["messages"][0]["id"] == "msg-2"
def test_updates_archival_stats(self, temp_context_dir):
"""Test that archival statistics are updated."""
state_file = temp_context_dir / "state.json"
old_time = time.time() - (2 * 60 * 60)
state = {
"summary": "Test",
"next_steps": [
{"id": "t1", "task": "Task 1", "status": "completed", "completed_at": old_time},
{"id": "t2", "task": "Task 2", "status": "completed", "completed_at": old_time},
],
"active_files": [],
"messages": [],
}
write_with_lock(state_file, state)
archiver = ContextArchiver(archive_after_hours=1.0)
archiver.archive_old_context()
updated_state = read_with_lock(state_file)
assert "archival_stats" in updated_state
assert updated_state["archival_stats"]["total_archived_tasks"] == 2
assert "last_archived" in updated_state["archival_stats"]
def test_empty_state_file(self, temp_context_dir):
"""Test handling of empty state file."""
state_file = temp_context_dir / "state.json"
write_with_lock(state_file, {})
archiver = ContextArchiver(archive_after_hours=1.0)
stats = archiver.archive_old_context()
assert stats == {"tasks": 0, "messages": 0}
def test_missing_state_file(self, temp_context_dir):
"""Test handling of missing state file."""
archiver = ContextArchiver(archive_after_hours=1.0)
stats = archiver.archive_old_context()
assert stats == {"tasks": 0, "messages": 0}
class TestGetArchivalStats:
"""Test suite for get_archival_stats."""
def test_get_stats_with_data(self, temp_context_dir):
"""Test getting archival stats after archiving."""
state_file = temp_context_dir / "state.json"
old_time = time.time() - (2 * 60 * 60)
state = {
"summary": "Test",
"next_steps": [
{"id": "t1", "task": "Task", "status": "completed", "completed_at": old_time},
],
"active_files": [],
"messages": [],
}
write_with_lock(state_file, state)
archiver = ContextArchiver(archive_after_hours=1.0)
archiver.archive_old_context()
stats = archiver.get_archival_stats()
assert stats["total_archived_tasks"] == 1
assert stats["embedded_tasks"] == 1
assert stats["archive_after_hours"] == 1.0
assert stats["last_archived"] is not None
def test_get_stats_empty(self, temp_context_dir):
"""Test getting stats when nothing archived."""
archiver = ContextArchiver(archive_after_hours=1.0)
stats = archiver.get_archival_stats()
assert stats["total_archived_tasks"] == 0
assert stats["total_archived_messages"] == 0
assert stats["embedded_tasks"] == 0
class TestRunArchival:
"""Test the convenience run_archival function."""
def test_run_archival_function(self, temp_context_dir):
"""Test the run_archival convenience function."""
state_file = temp_context_dir / "state.json"
old_time = time.time() - (2 * 60 * 60)
state = {
"summary": "Test",
"next_steps": [
{"id": "t1", "task": "Task", "status": "completed", "completed_at": old_time},
],
"active_files": [],
"messages": [],
}
write_with_lock(state_file, state)
with patch("amicus.archival.get_context_bus_dir", return_value=temp_context_dir):
with patch("amicus.archival.get_state_file", return_value=state_file):
stats = run_archival(archive_after_hours=1.0)
assert stats["tasks"] == 1
class TestMessageParsing:
"""Test message format parsing."""
def test_extract_sender_from_message(self, temp_context_dir):
"""Test extracting sender from message string."""
archiver = ContextArchiver()
# Standard format
sender = archiver._extract_sender("[10:30:45] Agent-A: Hello world")
assert sender == "Agent-A"
# No timestamp
sender = archiver._extract_sender("Agent-B: Message")
assert sender == "Agent-B"
# No colon
sender = archiver._extract_sender("[10:30:45] Just a message")
assert sender == "unknown"
def test_format_timestamp_various_formats(self, temp_context_dir):
"""Test formatting various timestamp types."""
archiver = ContextArchiver()
# Unix timestamp
result = archiver._format_timestamp(1700000000.0)
assert "2023" in result # Should be a valid ISO string
# ISO string passthrough
result = archiver._format_timestamp("2026-02-01T10:00:00")
assert result == "2026-02-01T10:00:00"
# None
result = archiver._format_timestamp(None)
assert result == ""