from pathlib import Path
import json
import pytest
from domin8.storage.index import (
init_index,
get_index_path,
index_artifact,
search_artifacts,
reindex_all,
get_index_stats,
)
def _agent_root(tmp_path: Path) -> Path:
return Path(tmp_path) / '.domin8' / 'agent_data'
def test_init_and_stats(tmp_path, monkeypatch):
monkeypatch.setattr(
'domin8.storage.artifact_storage.get_agent_data_root',
lambda: _agent_root(tmp_path),
)
db_path = get_index_path()
if db_path.exists():
db_path.unlink()
init_index()
stats = get_index_stats()
assert stats['index_exists'] is True
assert stats['total_artifacts'] == 0
def test_index_and_search(tmp_path, monkeypatch):
base = _agent_root(tmp_path)
monkeypatch.setattr('domin8.storage.artifact_storage.get_agent_data_root', lambda: base)
artifact_dir = base / 'file.txt' / 'artifacts' / 'u1'
artifact_dir.mkdir(parents=True)
meta = {
'uuid': 'u1',
'repo_relative_path': 'file.txt',
'action': 'edit',
'status': 'pending',
'actor': {'id': 'agent', 'type': 'agent'},
'created_at': '2026-01-12T00:00:00+00:00',
}
index_artifact(artifact_dir, meta)
res = search_artifacts(repo_path='file.txt')
assert len(res) == 1
assert res[0]['uuid'] == 'u1'
# no intent.json present, preview should be empty
assert res[0]['summary_preview'] == ''
res2 = search_artifacts(status='pending')
assert any(r['uuid'] == 'u1' for r in res2)
def test_reindex_all(tmp_path, monkeypatch):
base = _agent_root(tmp_path)
monkeypatch.setattr('domin8.storage.artifact_storage.get_agent_data_root', lambda: base)
# Create two artifact directories with meta.json
a1 = base / 'a' / 'artifacts' / 'aa'
a1.mkdir(parents=True)
(a1 / 'meta.json').write_text(
json.dumps(
{
'uuid': 'aa',
'repo_relative_path': 'a',
'action': 'edit',
'status': 'pending',
'actor': {'id': 'agent', 'type': 'agent'},
'created_at': '2026-01-12T00:00:00+00:00',
}
)
)
a2 = base / 'b' / 'artifacts' / 'bb'
a2.mkdir(parents=True)
(a2 / 'meta.json').write_text(
json.dumps(
{
'uuid': 'bb',
'repo_relative_path': 'b',
'action': 'create',
'status': 'pending',
'actor': {'id': 'agent', 'type': 'agent'},
'created_at': '2026-01-12T00:00:00+00:00',
}
)
)
count = reindex_all()
assert count == 2
stats = get_index_stats()
assert stats['total_artifacts'] == 2
def test_reindex_all_handles_invalid_meta(tmp_path, monkeypatch):
base = _agent_root(tmp_path)
monkeypatch.setattr('domin8.storage.artifact_storage.get_agent_data_root', lambda: base)
# Valid artifact
a1 = base / 'a' / 'artifacts' / 'aa'
a1.mkdir(parents=True)
(a1 / 'meta.json').write_text(
json.dumps(
{
'uuid': 'aa',
'repo_relative_path': 'a',
'action': 'edit',
'status': 'pending',
'actor': {'id': 'agent', 'type': 'agent'},
'created_at': '2026-01-12T00:00:00+00:00',
}
)
)
# Invalid artifact (bad JSON)
a2 = base / 'b' / 'artifacts' / 'bb'
a2.mkdir(parents=True)
(a2 / 'meta.json').write_text('not-json')
count = reindex_all()
assert count == 1
def test_search_and_stats_no_db(tmp_path, monkeypatch):
base = _agent_root(tmp_path)
monkeypatch.setattr('domin8.storage.artifact_storage.get_agent_data_root', lambda: base)
# Ensure no db exists
db = get_index_path()
if db.exists():
db.unlink()
res = search_artifacts()
assert res == []
# search_artifacts will initialize the DB if missing, so index should now exist
stats = get_index_stats()
assert stats['index_exists'] is True
def test_index_artifact_initializes_db(tmp_path, monkeypatch):
base = _agent_root(tmp_path)
monkeypatch.setattr('domin8.storage.artifact_storage.get_agent_data_root', lambda: base)
db = get_index_path()
if db.exists():
db.unlink()
# Create an artifact dir and metadata
a = base / 'x' / 'artifacts' / 'x1'
a.mkdir(parents=True)
meta = {
'uuid': 'x1',
'repo_relative_path': 'x',
'action': 'edit',
'status': 'pending',
'actor': {'id': 'agent', 'type': 'agent'},
'created_at': '2026-01-12T00:00:00+00:00',
'short_summary': 's',
}
index_artifact(a, meta)
assert db.exists()
def test_get_index_stats_no_db_returns_false(tmp_path, monkeypatch):
base = _agent_root(tmp_path)
monkeypatch.setattr('domin8.storage.artifact_storage.get_agent_data_root', lambda: base)
db = get_index_path()
if db.exists():
db.unlink()
stats = get_index_stats()
assert stats['index_exists'] is False
def test_search_action_filter(tmp_path, monkeypatch):
base = _agent_root(tmp_path)
monkeypatch.setattr('domin8.storage.artifact_storage.get_agent_data_root', lambda: base)
a = base / 'x' / 'artifacts' / 'x1'
a.mkdir(parents=True)
(a / 'meta.json').write_text(
json.dumps(
{
'uuid': 'x1',
'repo_relative_path': 'x',
'action': 'edit',
'status': 'pending',
'actor': {'id': 'agent', 'type': 'agent'},
'created_at': '2026-01-12T00:00:00+00:00',
'short_summary': 's',
}
)
)
# Reindex
reindex_all()
res = search_artifacts(action='edit')
assert any(r['uuid'] == 'x1' for r in res)