"""Utilities for parsing TL combat log exports into structured events."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterator, List, Optional, Tuple
SUPPORTED_SUFFIXES = {".txt", ".log"}
@dataclass(slots=True)
class LogEvent:
"""Single combat log entry for downstream analytics."""
timestamp: datetime
event_type: str
skill: str
internal_id: int
damage: int
crit: bool
heavy: bool
hit_type: str
source: str
target: str
raw: str
def parse_timestamp(raw_value: str) -> datetime:
"""Parse TL timestamps encoded as YYYYMMDD-HH:MM:SS:ms."""
date_portion, milli_portion = raw_value.rsplit(":", 1)
base_dt = datetime.strptime(date_portion, "%Y%m%d-%H:%M:%S")
if not milli_portion.isdigit():
raise ValueError(f"Invalid millisecond value: {raw_value}")
# TL uses millisecond precision; expand to microseconds for datetime.
digits = milli_portion[:6].ljust(6, "0")
microseconds = int(digits)
return base_dt.replace(microsecond=microseconds)
def parse_line(line: str) -> Optional[LogEvent]:
"""Parse a single log line into a LogEvent, or return None for metadata rows."""
row = line.strip()
if not row or row.startswith("CombatLogVersion"):
return None
columns = [part.strip() for part in row.split(",")]
if len(columns) != 10:
return None
timestamp = parse_timestamp(columns[0])
event_type = columns[1]
skill = columns[2]
internal_id = int(columns[3])
damage = int(columns[4])
crit = columns[5] == "1"
heavy = columns[6] == "1"
hit_type = columns[7]
source = columns[8]
target = columns[9]
return LogEvent(
timestamp=timestamp,
event_type=event_type,
skill=skill,
internal_id=internal_id,
damage=damage,
crit=crit,
heavy=heavy,
hit_type=hit_type,
source=source,
target=target,
raw=row,
)
def load_log_file(path: Path) -> List[LogEvent]:
"""Load a single log file into memory, filtering out metadata rows."""
events: List[LogEvent] = []
with path.open("r", encoding="utf-8") as handle:
for line in handle:
event = parse_line(line)
if event is not None:
events.append(event)
return events
def _is_supported_file(path: Path) -> bool:
return path.is_file() and path.suffix.lower() in SUPPORTED_SUFFIXES
def _describe_path(path: Path) -> str:
resolved = path.expanduser().resolve()
exists_flag = resolved.exists()
return f"{resolved} (exists={exists_flag})"
def select_log_files(input_path: Path | str, limit_runs: Optional[int] = None) -> List[Path]:
"""Return the newest log files that should be parsed before reading any contents."""
path = Path(input_path)
if not path.exists():
raise FileNotFoundError(f"No log source found at {_describe_path(path)}")
if path.is_file():
if not _is_supported_file(path):
raise ValueError(f"Unsupported log extension for {_describe_path(path)}")
return [path]
candidates = [candidate for candidate in path.iterdir() if _is_supported_file(candidate)]
candidates.sort(key=lambda candidate: candidate.stat().st_mtime, reverse=True)
if limit_runs:
return candidates[:limit_runs]
return candidates
def load_runs(input_path: Path | str, limit_runs: Optional[int] = None) -> Iterator[Tuple[str, List[LogEvent]]]:
"""Yield (run_id, events) pairs for single files or *.txt/*.log batches."""
for candidate in select_log_files(input_path, limit_runs=limit_runs):
events = load_log_file(candidate)
if events:
yield candidate.name, events
__all__ = [
"LogEvent",
"parse_line",
"parse_timestamp",
"load_log_file",
"select_log_files",
"load_runs",
]