import pytest
from src.indices.graph import GraphStore
@pytest.fixture
def graph_store():
return GraphStore()
def test_graph_store_add_node(graph_store):
graph_store.add_node("doc1", {"title": "Document 1", "tags": ["test"]})
# Verify node exists by checking it has no neighbors initially
neighbors = graph_store.get_neighbors("doc1", depth=1)
assert neighbors == [] # No edges yet, so no neighbors
def test_graph_store_add_edge(graph_store):
graph_store.add_node("doc1", {})
graph_store.add_node("doc2", {})
graph_store.add_edge("doc1", "doc2", "link")
# Verify edge exists by checking neighbors
neighbors = graph_store.get_neighbors("doc1", depth=1)
assert "doc2" in neighbors
def test_graph_store_add_edge_transclusion(graph_store):
graph_store.add_node("doc1", {})
graph_store.add_node("doc2", {})
graph_store.add_edge("doc1", "doc2", "transclusion")
# Verify edge exists by checking neighbors (edge type is internal detail)
neighbors = graph_store.get_neighbors("doc1", depth=1)
assert "doc2" in neighbors
def test_graph_store_remove_node(graph_store):
graph_store.add_node("doc1", {})
graph_store.add_node("doc2", {})
graph_store.add_edge("doc1", "doc2", "link")
# Verify edge exists before removal
neighbors_before = graph_store.get_neighbors("doc1", depth=1)
assert "doc2" in neighbors_before
graph_store.remove_node("doc1")
# Verify node removed: get_neighbors returns empty for nonexistent node
neighbors_after = graph_store.get_neighbors("doc1", depth=1)
assert neighbors_after == []
def test_graph_store_remove_nonexistent_node(graph_store):
graph_store.remove_node("nonexistent")
# Verify operation completed without error (no assertion needed)
# Public API: get_neighbors returns empty list for nonexistent nodes
neighbors = graph_store.get_neighbors("nonexistent", depth=1)
assert neighbors == []
def test_graph_store_get_neighbors_depth_1(graph_store):
graph_store.add_node("doc1", {})
graph_store.add_node("doc2", {})
graph_store.add_node("doc3", {})
graph_store.add_edge("doc1", "doc2", "link")
graph_store.add_edge("doc1", "doc3", "link")
neighbors = graph_store.get_neighbors("doc1", depth=1)
assert set(neighbors) == {"doc2", "doc3"}
def test_graph_store_get_neighbors_depth_2(graph_store):
graph_store.add_node("doc1", {})
graph_store.add_node("doc2", {})
graph_store.add_node("doc3", {})
graph_store.add_node("doc4", {})
graph_store.add_edge("doc1", "doc2", "link")
graph_store.add_edge("doc2", "doc3", "link")
graph_store.add_edge("doc3", "doc4", "link")
neighbors = graph_store.get_neighbors("doc1", depth=2)
assert set(neighbors) == {"doc2", "doc3"}
def test_graph_store_get_neighbors_bidirectional(graph_store):
graph_store.add_node("doc1", {})
graph_store.add_node("doc2", {})
graph_store.add_node("doc3", {})
graph_store.add_edge("doc1", "doc2", "link")
graph_store.add_edge("doc3", "doc1", "link")
neighbors = graph_store.get_neighbors("doc1", depth=1)
assert set(neighbors) == {"doc2", "doc3"}
def test_graph_store_get_neighbors_nonexistent_node(graph_store):
neighbors = graph_store.get_neighbors("nonexistent", depth=1)
assert neighbors == []
def test_graph_store_persist_and_load(tmp_path, graph_store):
graph_store.add_node("doc1", {"title": "Document 1"})
graph_store.add_node("doc2", {"title": "Document 2"})
graph_store.add_edge("doc1", "doc2", "link")
persist_path = tmp_path / "graph"
graph_store.persist(persist_path)
assert persist_path.exists()
assert (persist_path / "graph.json").exists()
new_store = GraphStore()
new_store.load(persist_path)
# Verify loaded data using public API
neighbors_doc1 = new_store.get_neighbors("doc1", depth=1)
assert "doc2" in neighbors_doc1
# Verify both nodes exist
neighbors_doc2 = new_store.get_neighbors("doc2", depth=1)
assert "doc1" in neighbors_doc2 # Bidirectional check
def test_graph_store_load_nonexistent_path(tmp_path):
store = GraphStore()
nonexistent_path = tmp_path / "nonexistent"
store.load(nonexistent_path)
# Verify graph is empty by testing neighbor queries return empty
store.add_node("test", {})
neighbors = store.get_neighbors("test", depth=1)
assert neighbors == [] # No edges in empty graph
def test_graph_store_complex_graph(graph_store):
nodes = [f"doc{i}" for i in range(1, 6)]
for node in nodes:
graph_store.add_node(node, {"index": node})
graph_store.add_edge("doc1", "doc2", "link")
graph_store.add_edge("doc1", "doc3", "link")
graph_store.add_edge("doc2", "doc4", "transclusion")
graph_store.add_edge("doc3", "doc4", "link")
graph_store.add_edge("doc4", "doc5", "link")
neighbors_depth_1 = graph_store.get_neighbors("doc1", depth=1)
assert set(neighbors_depth_1) == {"doc2", "doc3"}
neighbors_depth_2 = graph_store.get_neighbors("doc1", depth=2)
assert set(neighbors_depth_2) == {"doc2", "doc3", "doc4"}
neighbors_depth_3 = graph_store.get_neighbors("doc1", depth=3)
assert set(neighbors_depth_3) == {"doc2", "doc3", "doc4", "doc5"}
def test_graph_store_cyclic_graph(graph_store):
graph_store.add_node("doc1", {})
graph_store.add_node("doc2", {})
graph_store.add_node("doc3", {})
graph_store.add_edge("doc1", "doc2", "link")
graph_store.add_edge("doc2", "doc3", "link")
graph_store.add_edge("doc3", "doc1", "link")
neighbors = graph_store.get_neighbors("doc1", depth=2)
assert set(neighbors) == {"doc2", "doc3"}
# ============================================================================
# Chunk Removal Tests (Phase 2 Delta Indexing)
# ============================================================================
def test_remove_chunk_removes_from_graph(graph_store):
"""Test that remove_chunk() removes chunk node from graph."""
# Add chunks as nodes
graph_store.add_node("doc1#chunk#0", {"content": "First chunk"})
graph_store.add_node("doc1#chunk#1", {"content": "Second chunk"})
graph_store.add_edge("doc1#chunk#0", "doc1#chunk#1", "links_to")
# Verify nodes exist
assert graph_store.has_node("doc1#chunk#0")
assert graph_store.has_node("doc1#chunk#1")
# Remove first chunk
graph_store.remove_chunk("doc1#chunk#0")
# Verify removal
assert not graph_store.has_node("doc1#chunk#0")
assert graph_store.has_node("doc1#chunk#1")
def test_remove_chunk_handles_missing_chunk(graph_store):
"""Test that remove_chunk() handles missing chunk gracefully."""
# Should not raise exception, just log debug message
graph_store.remove_chunk("nonexistent_chunk_id")
def test_remove_chunk_removes_edges(graph_store):
"""Test that remove_chunk() also removes edges connected to the chunk."""
# Add chunks with edges
graph_store.add_node("doc1#chunk#0", {})
graph_store.add_node("doc1#chunk#1", {})
graph_store.add_node("doc1#chunk#2", {})
graph_store.add_edge("doc1#chunk#0", "doc1#chunk#1", "links_to")
graph_store.add_edge("doc1#chunk#1", "doc1#chunk#2", "links_to")
# Verify edges exist
edges_from_0 = graph_store.get_edges_from("doc1#chunk#0")
assert len(edges_from_0) == 1
edges_to_2 = graph_store.get_edges_to("doc1#chunk#2")
assert len(edges_to_2) == 1
# Remove middle chunk
graph_store.remove_chunk("doc1#chunk#1")
# Edges involving chunk 1 should be gone
edges_from_0 = graph_store.get_edges_from("doc1#chunk#0")
assert len(edges_from_0) == 0 # Edge to chunk 1 removed
edges_to_2 = graph_store.get_edges_to("doc1#chunk#2")
assert len(edges_to_2) == 0 # Edge from chunk 1 removed
def test_remove_chunk_thread_safe(graph_store):
"""Test that remove_chunk() is thread-safe with concurrent operations."""
from concurrent.futures import ThreadPoolExecutor
# Add multiple chunks
for i in range(10):
graph_store.add_node(f"concurrent#chunk#{i}", {"index": i})
# Concurrently remove half the chunks
def remove_chunk_task(chunk_id):
graph_store.remove_chunk(chunk_id)
with ThreadPoolExecutor(max_workers=4) as executor:
chunk_ids_to_remove = [f"concurrent#chunk#{i}" for i in range(0, 10, 2)]
list(executor.map(remove_chunk_task, chunk_ids_to_remove))
# Verify correct chunks were removed
for i in range(10):
chunk_id = f"concurrent#chunk#{i}"
if i % 2 == 0:
assert not graph_store.has_node(chunk_id)
else:
assert graph_store.has_node(chunk_id)
def test_get_neighbors_thread_safe_with_concurrent_modifications(graph_store):
"""Test that get_neighbors() is thread-safe with concurrent add/remove operations.
Verifies the structure-snapshot pattern prevents issues when:
- Multiple threads call get_neighbors() repeatedly
- Other threads add/remove nodes concurrently
"""
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
# Setup initial graph structure
for i in range(20):
graph_store.add_node(f"node_{i}", {"index": i})
for i in range(19):
graph_store.add_edge(f"node_{i}", f"node_{i+1}", "link")
errors = []
stop_event = threading.Event()
def reader_task(node_id):
"""Repeatedly call get_neighbors() on various nodes."""
iterations = 0
while not stop_event.is_set() and iterations < 100:
try:
graph_store.get_neighbors(node_id, depth=2)
iterations += 1
except Exception as e:
errors.append(f"Reader error on {node_id}: {e}")
break
def writer_task(base_id):
"""Add and remove nodes concurrently."""
for i in range(20):
if stop_event.is_set():
break
try:
node_id = f"dynamic_{base_id}_{i}"
graph_store.add_node(node_id, {"dynamic": True})
# Connect to existing nodes
graph_store.add_edge(node_id, f"node_{i % 20}", "link")
graph_store.remove_node(node_id)
except Exception as e:
errors.append(f"Writer error: {e}")
break
with ThreadPoolExecutor(max_workers=8) as executor:
# Submit reader tasks for different nodes
reader_futures = [
executor.submit(reader_task, f"node_{i}")
for i in range(0, 20, 4)
]
# Submit writer tasks
writer_futures = [
executor.submit(writer_task, i)
for i in range(3)
]
# Wait for writers to complete
for future in as_completed(writer_futures):
future.result()
# Signal readers to stop
stop_event.set()
# Wait for readers
for future in as_completed(reader_futures):
future.result()
# Verify no exceptions occurred
assert errors == [], f"Thread safety errors: {errors}"