import os
from pathlib import Path
import re
import pytest
from domin8.storage.artifact_storage import atomic_write, verify_artifact, list_artifacts
def test_atomic_write_cleanup_on_rename_failure(tmp_path, monkeypatch):
target = tmp_path / 'x' / 'file.txt'
target.parent.mkdir(parents=True, exist_ok=True)
# Force os.rename to raise
monkeypatch.setattr('os.rename', lambda src, dst: (_ for _ in ()).throw(Exception('rename failed')))
with pytest.raises(Exception):
atomic_write(target, 'content')
# Ensure no leftover temp files in the directory starting with .file.txt.
temps = [p for p in target.parent.iterdir() if p.name.startswith('.file.txt.') and p.suffix == '.tmp']
assert temps == []
def test_atomic_write_inner_unlink_failure(tmp_path, monkeypatch):
target = tmp_path / 'y' / 'file.txt'
target.parent.mkdir(parents=True, exist_ok=True)
# Force os.rename to raise and os.unlink to also raise (inner except branch)
monkeypatch.setattr('os.rename', lambda src, dst: (_ for _ in ()).throw(Exception('rename failed')))
monkeypatch.setattr('os.unlink', lambda p: (_ for _ in ()).throw(Exception('unlink failed')))
with pytest.raises(Exception):
atomic_write(target, 'content')
def test_verify_artifact_missing_returns_false(tmp_path):
artifact_dir = tmp_path / 'artifacts' / 'nope'
artifact_dir.mkdir(parents=True)
assert not verify_artifact(artifact_dir)
def test_list_artifacts_filters(tmp_path, monkeypatch):
base = tmp_path / '.domin8' / 'agent_data'
# No directory yet
monkeypatch.setattr('domin8.storage.artifact_storage.get_agent_data_root', lambda: base)
assert list_artifacts('nonexistent') == []
# Create some artifacts
a = base / 'file.txt' / 'artifacts' / 'u1'
a.mkdir(parents=True)
b = base / 'other.md' / 'artifacts' / 'u2'
b.mkdir(parents=True)
all_artifacts = list_artifacts()
assert len(all_artifacts) >= 2
by_file = list_artifacts('file.txt')
assert len(by_file) == 1
assert by_file[0].name == 'u1'