#!/usr/bin/env python3
"""
Passive Tree Resolver - Resolves poe.ninja node IDs to full passive data.
Provides high-level API for:
- Resolving node IDs to names, stats, and metadata
- Pathfinding between nodes
- Finding nearest notables from a build
- Analyzing build connectivity
Uses data from:
- PSG binary file (authoritative node IDs and connections)
- PoB2 tree.json (names, stats, icons)
"""
import json
import logging
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set, Tuple
from collections import deque
logger = logging.getLogger(__name__)
@dataclass
class ResolvedNode:
"""A fully resolved passive tree node."""
node_id: int
name: str
stats: List[str] = field(default_factory=list)
is_notable: bool = False
is_keystone: bool = False
is_jewel_socket: bool = False
x: float = 0.0
y: float = 0.0
connections: List[int] = field(default_factory=list)
icon: str = ""
@property
def node_type(self) -> str:
"""Return the node type as a string."""
if self.is_keystone:
return "keystone"
elif self.is_notable:
return "notable"
elif self.is_jewel_socket:
return "jewel_socket"
else:
return "small"
@dataclass
class PathResult:
"""Result of a pathfinding operation."""
start: int
end: int
path: List[int]
distance: int
nodes: List[ResolvedNode] = field(default_factory=list)
@dataclass
class BuildAnalysis:
"""Analysis of allocated passive nodes."""
total_nodes: int
keystones: List[ResolvedNode]
notables: List[ResolvedNode]
small_nodes: List[ResolvedNode]
jewel_sockets: List[ResolvedNode]
is_connected: bool
nearest_notables: List[Tuple[ResolvedNode, int]] # (node, distance)
class_start: Optional[str] = None
unresolved_nodes: List[int] = field(default_factory=list) # Node IDs not in database (e.g., ascendancy)
tree_region: Optional[str] = None # Estimated tree region based on coordinate analysis
connectivity_note: Optional[str] = None # Explanation of connectivity status
class PassiveTreeResolver:
"""
Resolves poe.ninja passive node IDs to full node data.
Usage:
resolver = PassiveTreeResolver()
# Resolve single node
node = resolver.resolve(6178)
print(f"{node.name}: {node.stats}")
# Resolve character's passives
analysis = resolver.analyze_build([6178, 41210, ...])
print(f"Notables: {[n.name for n in analysis.notables]}")
# Find path to notable
path = resolver.find_path(50986, 6178)
print(f"Distance: {path.distance} nodes")
"""
# Class starting node IDs (PoE2)
# PoE2 has 8 playable classes but only 6 starting positions (hexagon pattern)
# Classes with the same attribute(s) share a starting position.
# Position naming uses PSG/PoE1 internal names for the node IDs.
#
# Hexagon layout (Y negative = top, Y positive = bottom):
# SORCERESS/WITCH (54447) - INT top
# / \
# DRUID (61525) MONK (44683)
# STR/INT top-left DEX/INT top-right
# | |
# WARRIOR (47175) RANGER/HUNTRESS (50459)
# STR bottom-left DEX bottom-right
# \ /
# MERCENARY (50986) - STR/DEX bottom
CLASS_STARTS = {
47175: "WARRIOR", # MARAUDER position - Pure STR (bottom-left)
50459: "RANGER", # RANGER position - Pure DEX (bottom-right) - shared by Ranger & Huntress
54447: "SORCERESS", # WITCH position - Pure INT (top) - shared by Sorceress & Witch
50986: "MERCENARY", # DUELIST position - STR/DEX hybrid (bottom-center)
61525: "DRUID", # TEMPLAR position - STR/INT hybrid (top-left)
44683: "MONK", # SHADOW position - DEX/INT hybrid (top-right)
}
# Reverse mapping: class name to starting node ID
# Multiple classes can map to the same node (shared positions)
CLASS_TO_START_NODE = {
"WARRIOR": 47175,
"RANGER": 50459,
"HUNTRESS": 50459, # Huntress (DEX) shares Ranger's starting position
"SORCERESS": 54447,
"WITCH": 54447, # Witch (INT) shares Sorceress's starting position
"MERCENARY": 50986,
"DRUID": 61525,
"MONK": 44683,
}
# PoE2 Ascendancies mapped to base class
ASCENDANCY_TO_CLASS = {
# Warrior ascendancies (STR)
"Titan": "WARRIOR",
"Warbringer": "WARRIOR",
"Smith of Kitava": "WARRIOR",
# Ranger ascendancies (DEX)
"Deadeye": "RANGER",
"Pathfinder": "RANGER",
# Huntress ascendancies (DEX) - shares tree position with Ranger
"Amazon": "HUNTRESS",
"Ritualist": "HUNTRESS",
# Sorceress ascendancies (INT)
"Stormweaver": "SORCERESS",
"Chronomancer": "SORCERESS",
"Disciple of Varashta": "SORCERESS",
# Witch ascendancies (INT) - shares tree position with Sorceress
"Infernalist": "WITCH",
"Blood Mage": "WITCH",
"Bloodmage": "WITCH", # Alternate spelling
"Lich": "WITCH",
"Abyssal Lich": "WITCH",
# Mercenary ascendancies (STR/DEX)
"Witchhunter": "MERCENARY",
"Gemling Legionnaire": "MERCENARY",
"Tactician": "MERCENARY",
# Druid ascendancies (STR/INT)
"Oracle": "DRUID",
"Shaman": "DRUID",
# Monk ascendancies (DEX/INT)
"Invoker": "MONK",
"Acolyte of Chayula": "MONK",
}
# Tree region boundaries (computed from Voronoi clustering of 4,958 non-ascendancy nodes)
# Each node is assigned to its nearest class starting position.
# Positive Y = bottom of tree, Negative Y = top
# Positive X = right, Negative X = left
# Boundaries represent the actual min/max coordinates of assigned nodes.
#
# NOTE: Multiple classes share the same region when they share the same starting position.
# E.g., Ranger and Huntress both use the RANGER region, Sorceress and Witch both use SORCERESS.
TREE_REGIONS = {
"MONK": {
"x_range": (1270.4, 17657.2),
"y_range": (-10429.8, -82.2),
"centroid": (7948.5, -4321.4),
"node_count": 739,
"description": "Top-right, DEX/INT hybrid, elemental martial arts"
},
"MERCENARY": {
"x_range": (-8095.0, 4874.5),
"y_range": (1469.8, 20053.8),
"centroid": (-327.4, 9336.1),
"node_count": 816,
"description": "Bottom-center, STR/DEX hybrid, crossbow and grenades"
},
"RANGER": {
"x_range": (1274.7, 21814.3),
"y_range": (-2.4, 9944.3),
"centroid": (8069.1, 4128.0),
"node_count": 840,
"description": "Bottom-right, pure DEX, bows and evasion (also Huntress)"
},
"SORCERESS": {
"x_range": (-6298.3, 6071.3),
"y_range": (-18720.7, -1404.7),
"centroid": (-185.8, -9093.6),
"node_count": 862,
"description": "Top-center, pure INT, elemental spells (also Witch)"
},
"WARRIOR": {
"x_range": (-22167.2, -1271.2),
"y_range": (0.5, 9579.9),
"centroid": (-8128.1, 4300.7),
"node_count": 726,
"description": "Bottom-left, pure STR, melee and armor"
},
"DRUID": {
"x_range": (-22597.4, -1245.2),
"y_range": (-10437.4, -155.3),
"centroid": (-8159.0, -4008.8),
"node_count": 975,
"description": "Top-left, STR/INT hybrid, shapeshifting and nature magic"
},
}
# Alias regions for classes that share positions
# These map the alternate class name to the canonical region
REGION_ALIASES = {
"HUNTRESS": "RANGER", # Huntress (DEX) shares Ranger's region
"WITCH": "SORCERESS", # Witch (INT) shares Sorceress's region
}
# Class starting positions (authoritative coordinates from PSG data)
# Note: Multiple classes share the same starting coordinates
CLASS_START_COORDS = {
"RANGER": {"x": 1274.675, "y": 735.845}, # Also Huntress
"HUNTRESS": {"x": 1274.675, "y": 735.845}, # Same as Ranger
"WARRIOR": {"x": -1271.185, "y": 733.095},
"MERCENARY": {"x": 1.945, "y": 1469.815}, # Bottom center (was DUELIST)
"DRUID": {"x": -1245.175, "y": -728.895}, # Top-left (was TEMPLAR)
"SORCERESS": {"x": 0.005, "y": -1490.585}, # Also Witch
"WITCH": {"x": 0.005, "y": -1490.585}, # Same as Sorceress
"MONK": {"x": 1270.425, "y": -728.835}, # Top-right (was SHADOW)
}
def __init__(self, data_dir: Optional[Path] = None):
"""
Initialize the resolver.
Args:
data_dir: Path to data directory containing psg_passive_nodes.json
"""
if data_dir is None:
# Default to project data directory
data_dir = Path(__file__).parent.parent.parent / "data"
self.data_dir = Path(data_dir)
self._nodes: Dict[int, dict] = {}
self._adjacency: Dict[int, Set[int]] = {}
self._node_regions: Dict[int, str] = {} # Cached node -> region mapping
self._loaded = False
self._regions_loaded = False
def _ensure_loaded(self):
"""Load node database if not already loaded."""
if self._loaded:
return
db_path = self.data_dir / "psg_passive_nodes.json"
if not db_path.exists():
logger.warning(f"PSG database not found at {db_path}")
self._loaded = True
return
try:
with open(db_path, 'r', encoding='utf-8') as f:
raw_nodes = json.load(f)
# Convert string keys to int and build adjacency
for str_id, node_data in raw_nodes.items():
node_id = int(str_id)
self._nodes[node_id] = node_data
# Build adjacency graph
connections = node_data.get('connections', [])
if node_id not in self._adjacency:
self._adjacency[node_id] = set()
for conn in connections:
self._adjacency[node_id].add(conn)
if conn not in self._adjacency:
self._adjacency[conn] = set()
self._adjacency[conn].add(node_id)
logger.info(f"Loaded {len(self._nodes)} passive nodes")
self._loaded = True
except Exception as e:
logger.error(f"Failed to load PSG database: {e}")
self._loaded = True
def _ensure_regions_loaded(self):
"""Load precomputed node region mappings if available."""
if self._regions_loaded:
return
regions_path = self.data_dir / "passive_tree_regions.json"
if not regions_path.exists():
logger.info("Region mapping file not found - will compute regions on-the-fly")
self._regions_loaded = True
return
try:
with open(regions_path, 'r', encoding='utf-8') as f:
regions_data = json.load(f)
# Load precomputed node -> region mapping
node_regions = regions_data.get('node_regions', {})
for str_id, region in node_regions.items():
self._node_regions[int(str_id)] = region
logger.info(f"Loaded {len(self._node_regions)} node region mappings")
self._regions_loaded = True
except Exception as e:
logger.error(f"Failed to load region mappings: {e}")
self._regions_loaded = True
def _compute_node_region(self, x: float, y: float) -> str:
"""
Compute the region for a node based on its coordinates.
Uses Voronoi-style nearest-neighbor assignment to the closest
class starting position.
Args:
x: Node X coordinate
y: Node Y coordinate
Returns:
Region name (class name)
"""
import math
best_region = "UNKNOWN"
best_distance = float('inf')
for region_name, coords in self.CLASS_START_COORDS.items():
distance = math.sqrt((x - coords["x"]) ** 2 + (y - coords["y"]) ** 2)
if distance < best_distance:
best_distance = distance
best_region = region_name
return best_region
def get_node_region(self, node_id: int) -> Optional[str]:
"""
Get the tree region for a specific node.
Uses precomputed region mappings if available, otherwise computes
the region based on distance to class starting positions.
Args:
node_id: The passive node ID
Returns:
Region name (e.g., "RANGER", "WARRIOR") or None if node not found
"""
self._ensure_loaded()
self._ensure_regions_loaded()
# Check precomputed mapping first
if node_id in self._node_regions:
return self._node_regions[node_id]
# Fall back to on-the-fly computation
node_data = self._nodes.get(node_id)
if not node_data:
return None
x = node_data.get('x', 0.0)
y = node_data.get('y', 0.0)
# Check if this is an ascendancy node (they have different coordinate systems)
if node_data.get('is_ascendancy', False):
# Ascendancy nodes don't belong to main tree regions
return None
region = self._compute_node_region(x, y)
# Cache the result
self._node_regions[node_id] = region
return region
def get_nodes_in_region(self, region: str, notable_only: bool = False) -> List[int]:
"""
Get all node IDs that belong to a specific region.
Args:
region: Region name (e.g., "RANGER", "WARRIOR")
notable_only: If True, only return notable nodes
Returns:
List of node IDs in the region
"""
self._ensure_loaded()
self._ensure_regions_loaded()
result = []
for node_id in self._nodes:
node_region = self.get_node_region(node_id)
if node_region == region:
if notable_only:
if self._nodes[node_id].get('is_notable', False):
result.append(node_id)
else:
result.append(node_id)
return result
def resolve(self, node_id: int) -> Optional[ResolvedNode]:
"""
Resolve a single node ID to full data.
Args:
node_id: The poe.ninja/PSG numeric node ID
Returns:
ResolvedNode with full data, or None if not found
"""
self._ensure_loaded()
node_data = self._nodes.get(node_id)
if not node_data:
return None
return ResolvedNode(
node_id=node_id,
name=node_data.get('name', f'Unknown-{node_id}'),
stats=node_data.get('stats', []),
is_notable=node_data.get('is_notable', False),
is_keystone=node_data.get('is_keystone', False),
is_jewel_socket='Jewel Socket' in node_data.get('name', ''),
x=node_data.get('x', 0.0),
y=node_data.get('y', 0.0),
connections=node_data.get('connections', []),
icon=node_data.get('icon', '')
)
def resolve_many(self, node_ids: List[int]) -> List[ResolvedNode]:
"""
Resolve multiple node IDs.
Args:
node_ids: List of node IDs
Returns:
List of ResolvedNode objects (only includes found nodes)
"""
nodes = []
for nid in node_ids:
node = self.resolve(nid)
if node:
nodes.append(node)
return nodes
def find_path(self, start: int, end: int) -> Optional[PathResult]:
"""
Find shortest path between two nodes using BFS.
Args:
start: Starting node ID
end: Target node ID
Returns:
PathResult with path details, or None if no path exists
"""
self._ensure_loaded()
if start not in self._adjacency or end not in self._adjacency:
return None
if start == end:
node = self.resolve(start)
return PathResult(
start=start,
end=end,
path=[start],
distance=0,
nodes=[node] if node else []
)
visited = {start}
queue = deque([(start, [start])])
while queue:
current, path = queue.popleft()
for neighbor in self._adjacency.get(current, []):
if neighbor == end:
full_path = path + [neighbor]
return PathResult(
start=start,
end=end,
path=full_path,
distance=len(full_path) - 1,
nodes=self.resolve_many(full_path)
)
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return None
def find_nearest_notables(self, from_nodes: List[int], limit: int = 5,
exclude: Optional[Set[int]] = None) -> List[Tuple[ResolvedNode, int]]:
"""
Find nearest notable nodes from a set of nodes.
Args:
from_nodes: List of node IDs to search from
limit: Maximum number of notables to return
exclude: Set of node IDs to exclude (e.g., already allocated)
Returns:
List of (ResolvedNode, distance) tuples sorted by distance
"""
self._ensure_loaded()
if exclude is None:
exclude = set()
exclude = set(exclude) | set(from_nodes)
# BFS from all starting nodes simultaneously
visited = set(from_nodes)
queue = deque([(nid, 0) for nid in from_nodes if nid in self._adjacency])
found = []
while queue and len(found) < limit:
current, dist = queue.popleft()
for neighbor in self._adjacency.get(current, []):
if neighbor in visited:
continue
visited.add(neighbor)
node = self.resolve(neighbor)
if node and node.is_notable and neighbor not in exclude:
found.append((node, dist + 1))
if len(found) >= limit:
break
queue.append((neighbor, dist + 1))
return sorted(found, key=lambda x: x[1])
def analyze_build(self, node_ids: List[int], find_recommendations: bool = True) -> BuildAnalysis:
"""
Analyze a character's allocated passive nodes.
Args:
node_ids: List of allocated node IDs from poe.ninja
find_recommendations: Whether to find nearest unallocated notables
Returns:
BuildAnalysis with categorized nodes and recommendations
"""
self._ensure_loaded()
keystones = []
notables = []
small_nodes = []
jewel_sockets = []
unresolved = []
node_set = set(node_ids)
for nid in node_ids:
node = self.resolve(nid)
if not node:
unresolved.append(nid)
continue
if node.is_keystone:
keystones.append(node)
elif node.is_notable:
notables.append(node)
elif node.is_jewel_socket:
jewel_sockets.append(node)
else:
small_nodes.append(node)
# Check connectivity (only among resolved nodes)
is_connected = self._check_connectivity(node_ids)
# Find nearest unallocated notables
nearest_notables = []
if find_recommendations:
nearest_notables = self.find_nearest_notables(
node_ids,
limit=5,
exclude=node_set
)
# Determine likely class start
class_start = None
for class_id, class_name in self.CLASS_STARTS.items():
path = self.find_path(class_id, node_ids[0] if node_ids else 0)
if path and all(nid in node_set or nid == class_id for nid in path.path[:3]):
class_start = class_name
break
# Estimate tree region based on node coordinates
all_resolved = keystones + notables + small_nodes + jewel_sockets
tree_region = self.estimate_tree_region(all_resolved)
# Generate connectivity note
connectivity_note = None
if not is_connected:
if unresolved:
connectivity_note = f"Note: {len(unresolved)} nodes not in database (likely ascendancy/special nodes). Connectivity check only considers known nodes."
else:
connectivity_note = "Some allocated nodes appear disconnected. This may indicate a tree pathing issue."
elif unresolved:
connectivity_note = f"Note: {len(unresolved)} nodes are ascendancy or special nodes not in the main tree database."
return BuildAnalysis(
total_nodes=len(node_ids),
keystones=keystones,
notables=notables,
small_nodes=small_nodes,
jewel_sockets=jewel_sockets,
is_connected=is_connected,
nearest_notables=nearest_notables,
class_start=class_start,
unresolved_nodes=unresolved,
tree_region=tree_region,
connectivity_note=connectivity_note
)
def _check_connectivity(self, node_ids: List[int]) -> bool:
"""
Check if resolved nodes in the list are connected.
Only checks connectivity among nodes that exist in our database.
Nodes missing from database (e.g., ascendancy nodes) are ignored.
"""
if not node_ids:
return True
# Filter to only nodes we have in our adjacency graph
resolved_nodes = [nid for nid in node_ids if nid in self._adjacency]
if not resolved_nodes:
# No nodes in our database - can't determine connectivity
return True
node_set = set(resolved_nodes)
start = resolved_nodes[0]
visited = {start}
queue = deque([start])
while queue:
current = queue.popleft()
for neighbor in self._adjacency.get(current, []):
if neighbor in node_set and neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)
return visited == node_set
def get_node_count(self) -> int:
"""Return total number of nodes in database."""
self._ensure_loaded()
return len(self._nodes)
def get_all_notables(self) -> List[ResolvedNode]:
"""Return all notable nodes."""
self._ensure_loaded()
return [
self.resolve(nid) for nid in self._nodes
if self._nodes[nid].get('is_notable', False)
]
def get_all_keystones(self) -> List[ResolvedNode]:
"""Return all keystone nodes."""
self._ensure_loaded()
return [
self.resolve(nid) for nid in self._nodes
if self._nodes[nid].get('is_keystone', False)
]
def estimate_tree_region(self, nodes: List[ResolvedNode]) -> Optional[str]:
"""
Estimate which tree region a build is primarily located in.
Uses the average coordinates of allocated nodes to determine
which class region the build is centered in.
Returns:
Region name or None if cannot be determined
"""
if not nodes:
return None
# Calculate centroid of allocated nodes
total_x = sum(n.x for n in nodes if n.x != 0)
total_y = sum(n.y for n in nodes if n.y != 0)
count = sum(1 for n in nodes if n.x != 0 or n.y != 0)
if count == 0:
return None
avg_x = total_x / count
avg_y = total_y / count
# Find which region the centroid is closest to
best_region = None
best_score = float('inf')
for region_name, bounds in self.TREE_REGIONS.items():
x_min, x_max = bounds["x_range"]
y_min, y_max = bounds["y_range"]
# Calculate distance to region center
region_center_x = (x_min + x_max) / 2
region_center_y = (y_min + y_max) / 2
distance = ((avg_x - region_center_x) ** 2 + (avg_y - region_center_y) ** 2) ** 0.5
if distance < best_score:
best_score = distance
best_region = region_name
return best_region
def get_region_notables(self, region: str, limit: int = 10) -> List[ResolvedNode]:
"""
Get notable nodes that are within a specific tree region.
Args:
region: Region name (e.g., "WARRIOR", "RANGER", "HUNTRESS")
limit: Maximum notables to return
Returns:
List of notable nodes in that region
"""
self._ensure_loaded()
# Handle region aliases (e.g., HUNTRESS -> RANGER, WITCH -> SORCERESS)
canonical_region = self.REGION_ALIASES.get(region, region)
if canonical_region not in self.TREE_REGIONS:
return []
bounds = self.TREE_REGIONS[canonical_region]
x_min, x_max = bounds["x_range"]
y_min, y_max = bounds["y_range"]
region_notables = []
for nid, node_data in self._nodes.items():
if not node_data.get('is_notable', False):
continue
x = node_data.get('x', 0)
y = node_data.get('y', 0)
if x_min <= x <= x_max and y_min <= y <= y_max:
node = self.resolve(nid)
if node:
region_notables.append(node)
if len(region_notables) >= limit:
break
return region_notables
def get_class_for_ascendancy(self, ascendancy: str) -> Optional[str]:
"""
Get the base class for a given ascendancy.
Args:
ascendancy: Ascendancy name (e.g., "Deadeye", "Infernalist")
Returns:
Base class name or None if unknown
"""
return self.ASCENDANCY_TO_CLASS.get(ascendancy)
# Singleton instance for convenience
_resolver: Optional[PassiveTreeResolver] = None
def get_resolver() -> PassiveTreeResolver:
"""Get the singleton PassiveTreeResolver instance."""
global _resolver
if _resolver is None:
_resolver = PassiveTreeResolver()
return _resolver
if __name__ == '__main__':
# Demo usage
resolver = PassiveTreeResolver()
# Test character nodes
char_nodes = [2455, 2847, 3717, 6178, 7062, 8092, 13081, 19998, 21755, 28556,
30082, 31763, 33415, 41210, 43155, 43578, 44430, 44605, 48588,
48635, 49657, 52125, 53683, 54127, 54282, 55802, 59028, 59881,
59915, 63526]
print("=== PASSIVE TREE RESOLVER DEMO ===\n")
# Resolve single node
power_shots = resolver.resolve(6178)
if power_shots:
print(f"Single node: {power_shots.name}")
print(f" Stats: {power_shots.stats}")
print(f" Type: {power_shots.node_type}")
# Analyze full build
print("\n--- Build Analysis ---")
analysis = resolver.analyze_build(char_nodes)
print(f"Total: {analysis.total_nodes} nodes")
print(f"Keystones: {len(analysis.keystones)}")
print(f"Notables ({len(analysis.notables)}): {[n.name for n in analysis.notables]}")
print(f"Connected: {analysis.is_connected}")
print(f"Class: {analysis.class_start}")
if analysis.nearest_notables:
print(f"\nNearest unallocated notables:")
for node, dist in analysis.nearest_notables:
print(f" - {node.name} ({dist} nodes away)")
# Pathfinding
print("\n--- Pathfinding ---")
path = resolver.find_path(50986, 6178) # DUELIST to Power Shots
if path:
print(f"DUELIST -> Power Shots: {path.distance} nodes")
for i, node in enumerate(path.nodes[:5]):
print(f" {i+1}. {node.name} ({node.node_type})")
if len(path.nodes) > 5:
print(f" ... and {len(path.nodes) - 5} more")