"""
Validation tests for passive tree data integration
Tests verify:
- Keystone nodes have stats
- Notable coverage is >= 90%
- Keystone coverage is >= 95%
- Stat extraction from merged data
"""
import pytest
import pytest_asyncio
import json
import sys
from pathlib import Path
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from database.manager import DatabaseManager
from sqlalchemy import text
@pytest_asyncio.fixture
async def db():
"""Database fixture"""
db = DatabaseManager()
await db.initialize()
return db
@pytest.fixture
def merged_tree():
"""Load merged passive tree data"""
data_path = Path(__file__).parent.parent / "data" / "merged_passive_tree.json"
with open(data_path) as f:
return json.load(f)
@pytest.mark.asyncio
async def test_total_passive_node_count(db):
"""Verify passive tree has expected number of nodes"""
async with db.async_session() as session:
result = await session.execute(text("SELECT COUNT(*) FROM passive_nodes"))
count = result.scalar()
# Should be close to 6,506 nodes from Phase 2
assert count >= 6500, f"Expected >=6500 passive nodes, found {count}"
assert count <= 7000, f"Expected <=7000 passive nodes, found {count} (sanity check)"
@pytest.mark.asyncio
async def test_keystone_nodes_have_stats(db):
"""Verify keystone nodes have stats (coverage >= 95%)"""
async with db.async_session() as session:
# Note: is_keystone flag may not be populated from .datc64 data
# Check by node_id pattern instead (passive_keystone_*)
result = await session.execute(
text("SELECT COUNT(*) FROM passive_nodes WHERE node_id LIKE 'passive_keystone_%'")
)
total_keystones = result.scalar()
# If no keystones by pattern, check if is_keystone flag is used
if total_keystones == 0:
result = await session.execute(
text("SELECT COUNT(*) FROM passive_nodes WHERE is_keystone = 1")
)
total_keystones = result.scalar()
assert total_keystones > 0, "Should have keystone nodes in database"
# Count keystones with stats (by pattern)
result = await session.execute(
text("SELECT COUNT(*) FROM passive_nodes WHERE node_id LIKE 'passive_keystone_%' AND stats IS NOT NULL AND stats != '[]'")
)
keystones_with_stats = result.scalar()
coverage = (keystones_with_stats / total_keystones * 100) if total_keystones > 0 else 0
print(f"Keystone coverage: {keystones_with_stats}/{total_keystones} ({coverage:.1f}%)")
assert coverage >= 95.0, f"Expected >=95% keystone coverage, got {coverage:.1f}%"
@pytest.mark.asyncio
async def test_notable_nodes_have_stats(db):
"""Verify notable nodes have stats (coverage >= 90%)"""
async with db.async_session() as session:
# Note: is_notable flag may not be populated from .datc64 data
# Check by node_id pattern instead (passive_notable_*)
result = await session.execute(
text("SELECT COUNT(*) FROM passive_nodes WHERE node_id LIKE 'passive_notable_%'")
)
total_notables = result.scalar()
# If no notables by pattern, check if is_notable flag is used
if total_notables == 0:
result = await session.execute(
text("SELECT COUNT(*) FROM passive_nodes WHERE is_notable = 1")
)
total_notables = result.scalar()
# If still zero, this test may not be applicable to current database
if total_notables == 0:
print("No notable nodes found in database (may not be populated yet)")
return
assert total_notables > 0, "Should have notable nodes in database"
# Count notables with stats (by pattern)
result = await session.execute(
text("SELECT COUNT(*) FROM passive_nodes WHERE node_id LIKE 'passive_notable_%' AND stats IS NOT NULL AND stats != '[]'")
)
notables_with_stats = result.scalar()
coverage = (notables_with_stats / total_notables * 100) if total_notables > 0 else 0
print(f"Notable coverage: {notables_with_stats}/{total_notables} ({coverage:.1f}%)")
assert coverage >= 90.0, f"Expected >=90% notable coverage, got {coverage:.1f}%"
@pytest.mark.asyncio
async def test_resolute_technique_exists(db):
"""Verify Resolute Technique keystone exists with stats"""
async with db.async_session() as session:
# Try by node_id pattern first (most reliable)
result = await session.execute(
text("SELECT node_id, name, is_keystone, stats FROM passive_nodes WHERE node_id LIKE '%resolute%'")
)
rows = result.fetchall()
if not rows:
# Try by name
result = await session.execute(
text("SELECT node_id, name, is_keystone, stats FROM passive_nodes WHERE name LIKE '%Resolute%'")
)
rows = result.fetchall()
assert len(rows) > 0, "Resolute Technique keystone not found"
# Get the Resolute Technique node
rt_node = rows[0] # Take first match
print(f"Found: {rt_node[0]} ({rt_node[1]})")
# Check stats exist (is_keystone flag may not be set from .datc64 data)
stats = json.loads(rt_node[3]) if rt_node[3] else []
assert len(stats) > 0, "Resolute Technique should have stats"
print(f"Stats: {stats}")
@pytest.mark.asyncio
async def test_keystone_stats_from_merged_data(merged_tree):
"""Verify keystones in merged data have human-readable stats"""
keystones = {k: v for k, v in merged_tree.items() if v.get('is_keystone')}
assert len(keystones) > 0, "Should have keystones in merged data"
assert len(keystones) >= 30, f"Expected >=30 keystones, found {len(keystones)}"
# Check that keystones have stats
keystones_with_stats = 0
for node_id, node_data in keystones.items():
stats = node_data.get('stats', [])
if stats and len(stats) > 0:
keystones_with_stats += 1
# Verify stats are readable strings
for stat in stats:
assert isinstance(stat, str), f"Stat should be string, got {type(stat)}"
assert len(stat) > 0, f"Stat should not be empty for {node_data.get('name')}"
coverage = (keystones_with_stats / len(keystones) * 100) if len(keystones) > 0 else 0
print(f"Merged keystones with stats: {keystones_with_stats}/{len(keystones)} ({coverage:.1f}%)")
# Per mission requirements: >=95% coverage
assert coverage >= 95.0, f"Expected >=95% keystone coverage in merged data, got {coverage:.1f}%"
@pytest.mark.asyncio
async def test_notable_stats_from_merged_data(merged_tree):
"""Verify notables in merged data have stats (>=90% coverage)"""
notables = {k: v for k, v in merged_tree.items() if v.get('is_notable')}
assert len(notables) > 0, "Should have notables in merged data"
# Check that notables have stats
notables_with_stats = 0
for node_id, node_data in notables.items():
stats = node_data.get('stats', [])
if stats and len(stats) > 0:
notables_with_stats += 1
coverage = (notables_with_stats / len(notables) * 100) if len(notables) > 0 else 0
print(f"Merged notables with stats: {notables_with_stats}/{len(notables)} ({coverage:.1f}%)")
# Per mission requirements: >=90% coverage
assert coverage >= 90.0, f"Expected >=90% notable coverage in merged data, got {coverage:.1f}%"
@pytest.mark.asyncio
async def test_overall_stats_coverage(merged_tree):
"""Verify overall stats coverage is >= 76.3% (from Phase 2 report)"""
total_nodes = len(merged_tree)
nodes_with_stats = 0
for node_id, node_data in merged_tree.items():
stats = node_data.get('stats', [])
if stats and len(stats) > 0:
nodes_with_stats += 1
coverage = (nodes_with_stats / total_nodes * 100) if total_nodes > 0 else 0
print(f"Overall stats coverage: {nodes_with_stats}/{total_nodes} ({coverage:.1f}%)")
# Per Phase 2 report: 76.3% stats coverage
assert coverage >= 76.0, f"Expected >=76% overall coverage, got {coverage:.1f}%"
@pytest.mark.asyncio
async def test_no_null_passive_names(db):
"""Verify no passive nodes have null names"""
async with db.async_session() as session:
result = await session.execute(
text("SELECT COUNT(*) FROM passive_nodes WHERE name IS NULL")
)
null_count = result.scalar()
# Some nodes may legitimately have no name (small passives), so just verify it's reasonable
result = await session.execute(text("SELECT COUNT(*) FROM passive_nodes"))
total = result.scalar()
null_percentage = (null_count / total * 100) if total > 0 else 0
print(f"Nodes with null names: {null_count}/{total} ({null_percentage:.1f}%)")
# Most nodes should have names
assert null_percentage < 50, f"Too many nodes with null names: {null_percentage:.1f}%"
@pytest.mark.asyncio
async def test_passive_node_positions_exist(db):
"""Verify passive nodes have position data (if columns exist)"""
async with db.async_session() as session:
# Check if position columns exist
result = await session.execute(text("PRAGMA table_info(passive_nodes)"))
columns = [row[1] for row in result.fetchall()]
if 'position_x' not in columns or 'position_y' not in columns:
print("Position columns not in database schema (may use different field names)")
return
result = await session.execute(
text("SELECT COUNT(*) FROM passive_nodes WHERE position_x IS NOT NULL AND position_y IS NOT NULL")
)
with_positions = result.scalar()
result = await session.execute(text("SELECT COUNT(*) FROM passive_nodes"))
total = result.scalar()
coverage = (with_positions / total * 100) if total > 0 else 0
print(f"Nodes with positions: {with_positions}/{total} ({coverage:.1f}%)")
# If we have position data, verify coverage is reasonable
if with_positions > 0:
assert coverage >= 50, f"Expected >=50% nodes with positions, got {coverage:.1f}%"
@pytest.mark.asyncio
async def test_sample_keystones_cross_reference(db, merged_tree):
"""Cross-reference known keystones between source and database"""
# Test a few well-known keystones
test_keystones = [
'Resolute Technique',
'Avatar of Fire',
'Blood Magic',
]
keystones_in_merged = {v.get('name'): k for k, v in merged_tree.items() if v.get('is_keystone')}
async with db.async_session() as session:
for keystone_name in test_keystones:
# Check if it exists in merged data
if keystone_name in keystones_in_merged:
merged_node_id = keystones_in_merged[keystone_name]
merged_node = merged_tree[merged_node_id]
# Check if it exists in database
result = await session.execute(
text(f"SELECT name, is_keystone, stats FROM passive_nodes WHERE name = '{keystone_name}'")
)
row = result.first()
if row:
assert row[1] == 1, f"{keystone_name} should be marked as keystone in DB"
print(f"✓ {keystone_name} verified in both merged data and DB")
else:
# Check with LIKE in case of slight name differences
result = await session.execute(
text(f"SELECT name, is_keystone FROM passive_nodes WHERE name LIKE '%{keystone_name.split()[0]}%'")
)
rows = result.fetchall()
assert len(rows) > 0, f"{keystone_name} not found in database (even with fuzzy search)"
@pytest.mark.asyncio
async def test_jewel_sockets_identified(db):
"""Verify jewel sockets are identified (if column exists)"""
async with db.async_session() as session:
# Check if is_jewel_socket column exists
result = await session.execute(text("PRAGMA table_info(passive_nodes)"))
columns = [row[1] for row in result.fetchall()]
if 'is_jewel_socket' not in columns:
print("is_jewel_socket column not in database schema (may not be implemented yet)")
return
result = await session.execute(
text("SELECT COUNT(*) FROM passive_nodes WHERE is_jewel_socket = 1")
)
jewel_sockets = result.scalar()
# PoE2 should have jewel sockets (if the field is populated)
print(f"Jewel sockets in DB: {jewel_sockets}")
@pytest.mark.asyncio
async def test_passive_stats_are_parseable(db):
"""Verify passive node stats can be parsed as JSON"""
async with db.async_session() as session:
result = await session.execute(
text("SELECT node_id, name, stats FROM passive_nodes WHERE stats IS NOT NULL LIMIT 100")
)
rows = result.fetchall()
assert len(rows) > 0, "Should have nodes with stats"
parse_errors = 0
for row in rows:
try:
stats = json.loads(row[2]) if isinstance(row[2], str) else row[2]
assert isinstance(stats, (list, dict)), \
f"Stats should be list or dict, got {type(stats)} for node {row[1]}"
except json.JSONDecodeError:
parse_errors += 1
print(f"Failed to parse stats for {row[1]}: {row[2][:100]}")
# Allow up to 10% parse failures due to data quality
error_rate = (parse_errors / len(rows) * 100) if len(rows) > 0 else 0
assert error_rate < 10, f"Too many stat parse errors: {error_rate:.1f}%"
@pytest.mark.asyncio
async def test_stats_source_tracking(db):
"""Verify stats_source field tracks data origin (if field exists)"""
async with db.async_session() as session:
# Check if stats_source column exists
result = await session.execute(text("PRAGMA table_info(passive_nodes)"))
columns = [row[1] for row in result.fetchall()]
if 'stats_source' in columns:
result = await session.execute(
text("SELECT DISTINCT stats_source FROM passive_nodes WHERE stats_source IS NOT NULL")
)
sources = [row[0] for row in result.fetchall()]
print(f"Stats sources found: {sources}")
# Should have sources from PoB and/or datc64
assert len(sources) > 0, "Should have stats_source values if column exists"
else:
print("stats_source column not found (may not be implemented yet)")