"""DuckDB-backed event store used for ad-hoc DPS queries."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from time import perf_counter
from typing import Any, Dict, Iterable, Iterator, List, Sequence, Tuple
import duckdb
import sys
SCHEMA_DEFINITION: Tuple[Tuple[str, str], ...] = (
("run_id", "VARCHAR"),
("ts", "TIMESTAMP"),
("event_type", "VARCHAR"),
("skill_name", "VARCHAR"),
("damage", "BIGINT"),
("is_crit", "BOOLEAN"),
("is_heavy", "BOOLEAN"),
("source_name", "VARCHAR"),
("target_name", "VARCHAR"),
("hit_type", "VARCHAR"),
)
BANNED_KEYWORDS = (
"ATTACH",
"COPY",
"DELETE",
"DROP",
"INSERT",
"MERGE",
"PRAGMA",
"READ",
"UPDATE",
"WRITE",
"ALTER",
"CREATE",
)
MAX_EVENT_ROWS = 1_000_000
INSERT_CHUNK_SIZE = 10_000
PROGRESS_INTERVAL_ROWS = 25_000
class EventStoreError(RuntimeError):
"""Raised when the event store cannot service a query."""
@dataclass(frozen=True)
class QueryResult:
columns: List[str]
rows: List[List[Any]]
def to_payload(self) -> Dict[str, Any]:
return {"columns": self.columns, "rows": self.rows}
class DuckDBEventStore:
"""In-memory DuckDB table that refreshes on every analysis run."""
def __init__(
self,
table_name: str = "events",
row_limit: int = 200,
event_limit: int = MAX_EVENT_ROWS,
insert_chunk_size: int = INSERT_CHUNK_SIZE,
) -> None:
self._conn = duckdb.connect(database=":memory:")
self._table_name = table_name
self._row_limit = row_limit
self._event_limit = event_limit
self._insert_chunk_size = max(1, insert_chunk_size)
self._table_ready = False
@property
def schema(self) -> Tuple[Tuple[str, str], ...]:
return SCHEMA_DEFINITION
def refresh(self, rows: Sequence[Dict[str, Any]]) -> None:
columns_sql = ", ".join(f"{name} {dtype}" for name, dtype in self.schema)
self._conn.execute(f"DROP TABLE IF EXISTS {self._table_name}")
load_start = perf_counter()
total_rows = len(rows)
if total_rows > self._event_limit:
raise EventStoreError(
f"Too many events ({total_rows:,}). Limit is {self._event_limit:,}."
)
self._conn.execute(f"CREATE TABLE {self._table_name} ({columns_sql})")
if rows:
self._bulk_insert_rows(rows, total_rows, load_start)
load_elapsed = perf_counter() - load_start
print(
f"Loaded into DuckDB in {load_elapsed:.3f}s",
file=sys.stderr,
flush=True,
)
self._table_ready = True
def get_schema_payload(self) -> Dict[str, Any]:
return {
"table": self._table_name,
"columns": [{"name": name, "type": dtype} for name, dtype in self.schema],
"row_limit": self._row_limit,
}
def query(self, sql: str) -> QueryResult:
if not self._table_ready:
raise EventStoreError("Event store is empty. Run an analysis first.")
normalized = self._normalize_sql(sql)
self._enforce_select_only(normalized)
wrapped = f"SELECT * FROM ({normalized}) AS sub LIMIT {self._row_limit}"
cursor = self._conn.execute(wrapped)
description = cursor.description or []
columns = [column[0] for column in description]
raw_rows = cursor.fetchall()
rows = [self._serialize_row(row) for row in raw_rows]
return QueryResult(columns=columns, rows=rows)
def _bulk_insert_rows(
self,
rows: Sequence[Dict[str, Any]],
total_rows: int,
load_start: float,
) -> None:
ordered_columns = [name for name, _ in self.schema]
placeholders = ", ".join(["?"] * len(ordered_columns))
insert_sql = f"INSERT INTO {self._table_name} VALUES ({placeholders})"
inserted = 0
for chunk in self._chunk_rows(rows, self._insert_chunk_size):
prepared = [self._dict_to_tuple(row) for row in chunk]
self._conn.executemany(insert_sql, prepared)
inserted += len(prepared)
if self._should_log_progress(inserted, total_rows):
elapsed = perf_counter() - load_start
print(
f"Inserted {inserted:,}/{total_rows:,} rows ({elapsed:.2f}s)",
file=sys.stderr,
flush=True,
)
def _dict_to_tuple(self, row: Dict[str, Any]) -> Tuple[Any, ...]:
return (
row.get("run_id"),
row.get("ts"),
row.get("event_type"),
row.get("skill_name"),
int(row.get("damage", 0) or 0),
bool(row.get("is_crit")),
bool(row.get("is_heavy")),
row.get("source_name"),
row.get("target_name"),
row.get("hit_type"),
)
def _chunk_rows(
self, rows: Sequence[Dict[str, Any]], chunk_size: int
) -> Iterator[Sequence[Dict[str, Any]]]:
total = len(rows)
for start in range(0, total, chunk_size):
yield rows[start : start + chunk_size]
@staticmethod
def _should_log_progress(inserted: int, total_rows: int) -> bool:
if inserted == total_rows:
return True
if total_rows >= PROGRESS_INTERVAL_ROWS and inserted % PROGRESS_INTERVAL_ROWS == 0:
return True
return False
@staticmethod
def _serialize_row(row: Tuple[Any, ...]) -> List[Any]:
serialized: List[Any] = []
for value in row:
if isinstance(value, datetime):
serialized.append(value.isoformat())
else:
serialized.append(value)
return serialized
@staticmethod
def _normalize_sql(sql: str) -> str:
if not sql:
raise EventStoreError("SQL query cannot be empty.")
normalized = sql.strip().rstrip(";")
if not normalized:
raise EventStoreError("SQL query cannot be empty.")
return normalized
@staticmethod
def _enforce_select_only(sql: str) -> None:
upper_sql = sql.upper()
if not (upper_sql.startswith("SELECT") or upper_sql.startswith("WITH")):
raise EventStoreError("Only SELECT queries are supported.")
for keyword in BANNED_KEYWORDS:
if keyword in upper_sql:
raise EventStoreError(f"Keyword '{keyword}' is not allowed in queries.")
if ".." in sql:
raise EventStoreError("Relative path access is not allowed in queries.")
def build_event_rows(run_id: str, events: Iterable[Any]) -> List[Dict[str, Any]]:
"""Convert parser events into serializable rows for DuckDB."""
rows: List[Dict[str, Any]] = []
for event in events:
rows.append(
{
"run_id": run_id,
"ts": event.timestamp,
"event_type": event.event_type,
"skill_name": event.skill,
"damage": event.damage,
"is_crit": bool(event.crit),
"is_heavy": bool(event.heavy),
"source_name": event.source,
"target_name": event.target,
"hit_type": event.hit_type,
}
)
return rows
__all__ = ["DuckDBEventStore", "EventStoreError", "build_event_rows"]