#!/usr/bin/env python3
"""
PokerNow CSV log parser.
Reads a PokerNow CSV file and outputs structured JSON to stdout.
Called by the TypeScript MCP server.
Usage: python parse_pokernow.py <csv_file_path>
"""
import csv
import json
import re
import sys
from datetime import datetime, timezone
from typing import Optional
# ── Card normalization ───────────────────────────────────────────────
# PokerNow uses unicode suit symbols OR ASCII letters
SUIT_MAP = {
"♠": "s", "♥": "h", "♦": "d", "♣": "c",
"s": "s", "h": "h", "d": "d", "c": "c",
"S": "s", "H": "h", "D": "d", "C": "c",
"\u2660": "s", "\u2665": "h", "\u2666": "d", "\u2663": "c",
}
RANK_MAP = {
"A": "A", "K": "K", "Q": "Q", "J": "J", "T": "T", "10": "T",
"9": "9", "8": "8", "7": "7", "6": "6", "5": "5", "4": "4",
"3": "3", "2": "2",
}
def normalize_card(card_str: str) -> str:
"""Convert any card representation to standard format like 'Ah', 'Ks', etc."""
card_str = card_str.strip()
if not card_str:
return card_str
# Try to extract rank and suit
# Format could be: "Ah", "A♥", "10h", "10♥", etc.
rank = ""
suit = ""
for i, ch in enumerate(card_str):
if ch in SUIT_MAP:
suit = SUIT_MAP[ch]
rank = card_str[:i]
break
if not suit and len(card_str) >= 2:
# Try last char as suit
if card_str[-1] in SUIT_MAP:
suit = SUIT_MAP[card_str[-1]]
rank = card_str[:-1]
rank = RANK_MAP.get(rank.strip(), rank.strip())
return f"{rank}{suit}"
def parse_cards_from_brackets(text: str) -> list[str]:
"""Extract cards from bracket notation like [Ah, Kd, 7c]."""
match = re.search(r'\[([^\]]+)\]', text)
if not match:
return []
cards_str = match.group(1)
raw_cards = [c.strip() for c in cards_str.split(",")]
return [normalize_card(c) for c in raw_cards if c.strip()]
def parse_shown_cards(text: str) -> list[str]:
"""Extract cards from 'shows a Ah, Kh.' or 'shows a Ah, Kh'."""
match = re.search(r'shows a (.+?)\.?\s*$', text)
if not match:
return []
cards_str = match.group(1)
raw_cards = [c.strip().rstrip('.') for c in cards_str.split(",")]
return [normalize_card(c) for c in raw_cards if c.strip()]
def parse_own_hand(text: str) -> list[str]:
"""Extract cards from 'Your hand is Ah, Kh'."""
match = re.search(r'Your hand is (.+?)\.?\s*$', text)
if not match:
return []
cards_str = match.group(1)
raw_cards = [c.strip().rstrip('.') for c in cards_str.split(",")]
return [normalize_card(c) for c in raw_cards if c.strip()]
# ── Player name parsing ─────────────────────────────────────────────
def extract_players_from_quotes(text: str) -> list[tuple[str, str]]:
"""Extract all 'name @ id' pairs from quoted strings in an entry."""
matches = re.findall(r'"([^"]*)"', text)
results = []
for m in matches:
parts = m.split(" @ ")
if len(parts) == 2:
results.append((parts[0].strip(), parts[1].strip()))
else:
results.append((m.strip(), ""))
return results
def extract_amount(text: str) -> Optional[int]:
"""Extract a numeric amount from an entry line."""
# Match patterns like "bets 500", "calls 200", "raises to 600", "of 100", "collected 1500"
patterns = [
r'raises to (\d+)',
r'bets (\d+)',
r'calls (\d+)',
r'of (\d+)',
r'collected (\d+)',
r'(\d+) from pot',
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return int(match.group(1))
return None
# ── Entry classification ─────────────────────────────────────────────
def classify_entry(text: str) -> str:
"""Classify a PokerNow log entry into a category."""
checks = [
("starting hand", "hand_start"),
("ending hand", "hand_end"),
("Player stacks:", "stacks"),
("posts a small blind", "small_blind"),
("posts a big blind", "big_blind"),
("posts a straddle", "straddle"),
("posts an ante", "ante"),
(" folds", "fold"),
(" calls ", "call"),
(" bets ", "bet"),
(" raises ", "raise"),
(" checks", "check"),
("Flop:", "flop"),
("Turn:", "turn"),
("River:", "river"),
(" collected ", "collect"),
(" shows a ", "show"),
("Your hand is", "own_hand"),
("Uncalled bet", "uncalled"),
("Undealt cards", "rabbit"),
("run it twice", "run_it_twice"),
("all in", "all_in_note"),
]
for keyword, category in checks:
if keyword in text:
return category
return "other"
# ── Stack parsing ────────────────────────────────────────────────────
def parse_stacks(text: str) -> list[dict]:
"""Parse 'Player stacks: "P1 @ id1" (5000) | "P2 @ id2" (10000)'."""
stacks = []
# Split by |
parts = text.split("|")
for part in parts:
players = extract_players_from_quotes(part)
amount_match = re.search(r'\((\d+(?:\.\d+)?)\)', part)
if players and amount_match:
name, pid = players[0]
stacks.append({
"pokernow_id": pid,
"name": name,
"stack": float(amount_match.group(1)),
})
return stacks
# ── Hand start parsing ───────────────────────────────────────────────
def parse_hand_start(text: str) -> dict:
"""Parse 'starting hand #X (dealer: "Player @ id")'."""
hand_num_match = re.search(r'#(\S+)', text)
hand_number = hand_num_match.group(1) if hand_num_match else ""
dealer_players = extract_players_from_quotes(text)
dealer_name = dealer_players[0][0] if dealer_players else ""
dealer_id = dealer_players[0][1] if dealer_players else ""
return {
"hand_number": hand_number,
"dealer_name": dealer_name,
"dealer_id": dealer_id,
}
# ── Main parser ──────────────────────────────────────────────────────
def parse_pokernow_csv(filepath: str) -> dict:
"""Parse a PokerNow CSV log file into structured hand data."""
# Read CSV
rows = []
with open(filepath, encoding="utf-8") as f:
reader = csv.reader(f)
header = None
for row in reader:
if header is None:
header = row
continue
rows.append(row)
# Rows are in reverse chronological order; reverse them
rows.reverse()
# Determine column indices
# PokerNow CSV columns: entry, order, at
# But some versions may vary
entry_idx = 0
time_idx = 2 if len(rows[0]) > 2 else None
# Group entries into hands
hands_raw: list[list[dict]] = []
current_hand: list[dict] = []
players_seen: dict[str, str] = {} # id -> name
for row in rows:
entry_text = row[entry_idx] if row else ""
timestamp = row[time_idx] if time_idx is not None and len(row) > time_idx else ""
entry_type = classify_entry(entry_text)
entry = {
"text": entry_text,
"type": entry_type,
"timestamp": timestamp,
}
# Track all players seen
for name, pid in extract_players_from_quotes(entry_text):
if pid:
players_seen[pid] = name
if entry_type == "hand_start":
if current_hand:
hands_raw.append(current_hand)
current_hand = [entry]
else:
current_hand.append(entry)
if current_hand:
hands_raw.append(current_hand)
# Parse each hand
parsed_hands = []
for hand_entries in hands_raw:
hand = parse_hand(hand_entries)
if hand:
parsed_hands.append(hand)
return {
"hands": parsed_hands,
"session": {
"filename": filepath,
"total_hands": len(parsed_hands),
"players_seen": [
{"pokernow_id": pid, "name": name}
for pid, name in players_seen.items()
],
},
}
def parse_hand(entries: list[dict]) -> Optional[dict]:
"""Parse a single hand from a list of entry dicts."""
if not entries:
return None
# Find the hand start entry
start_entry = None
for e in entries:
if e["type"] == "hand_start":
start_entry = e
break
if not start_entry:
return None
hand_info = parse_hand_start(start_entry["text"])
# Parse the rest
players: list[dict] = []
actions: list[dict] = []
community_cards = {"flop": [], "turn": [], "river": []}
results: list[dict] = []
pot = 0
small_blind = 0
big_blind = 0
ante = 0
hero_cards: list[str] = []
shown_cards: dict[str, list[str]] = {}
rabbit_cards: list[str] = []
timestamp = start_entry.get("timestamp", "")
current_street = "preflop"
action_seq = 0
for entry in entries:
text = entry["text"]
etype = entry["type"]
if etype == "stacks":
players = parse_stacks(text)
elif etype == "small_blind":
amount = extract_amount(text)
if amount is not None:
small_blind = amount
plist = extract_players_from_quotes(text)
if plist:
actions.append({
"street": "preflop",
"player_name": plist[0][0],
"player_id": plist[0][1],
"action": "small_blind",
"amount": amount or 0,
"is_all_in": "all in" in text.lower(),
"sequence": action_seq,
})
action_seq += 1
elif etype == "big_blind":
amount = extract_amount(text)
if amount is not None:
big_blind = amount
plist = extract_players_from_quotes(text)
if plist:
actions.append({
"street": "preflop",
"player_name": plist[0][0],
"player_id": plist[0][1],
"action": "big_blind",
"amount": amount or 0,
"is_all_in": "all in" in text.lower(),
"sequence": action_seq,
})
action_seq += 1
elif etype == "straddle":
amount = extract_amount(text)
plist = extract_players_from_quotes(text)
if plist:
actions.append({
"street": "preflop",
"player_name": plist[0][0],
"player_id": plist[0][1],
"action": "straddle",
"amount": amount or 0,
"is_all_in": "all in" in text.lower(),
"sequence": action_seq,
})
action_seq += 1
elif etype == "ante":
amount = extract_amount(text)
if amount is not None:
ante = amount
plist = extract_players_from_quotes(text)
if plist:
actions.append({
"street": "preflop",
"player_name": plist[0][0],
"player_id": plist[0][1],
"action": "ante",
"amount": amount or 0,
"is_all_in": False,
"sequence": action_seq,
})
action_seq += 1
elif etype in ("fold", "call", "bet", "raise", "check"):
amount = extract_amount(text) or 0
plist = extract_players_from_quotes(text)
if plist:
actions.append({
"street": current_street,
"player_name": plist[0][0],
"player_id": plist[0][1],
"action": etype,
"amount": amount,
"is_all_in": "all in" in text.lower(),
"sequence": action_seq,
})
action_seq += 1
elif etype == "flop":
current_street = "flop"
community_cards["flop"] = parse_cards_from_brackets(text)
elif etype == "turn":
current_street = "turn"
community_cards["turn"] = parse_cards_from_brackets(text)
elif etype == "river":
current_street = "river"
community_cards["river"] = parse_cards_from_brackets(text)
elif etype == "collect":
amount = extract_amount(text) or 0
pot += amount
plist = extract_players_from_quotes(text)
if plist:
# Check if this player already has a result
existing = next((r for r in results if r["player_id"] == plist[0][1]), None)
if existing:
existing["won"] += amount
else:
results.append({
"player_name": plist[0][0],
"player_id": plist[0][1],
"won": amount,
"showed_cards": [],
})
elif etype == "show":
plist = extract_players_from_quotes(text)
cards = parse_shown_cards(text)
if plist and cards:
shown_cards[plist[0][1]] = cards
elif etype == "own_hand":
hero_cards = parse_own_hand(text)
elif etype == "uncalled":
# Track uncalled bets (returned to player)
pass
elif etype == "rabbit":
rabbit_cards = parse_cards_from_brackets(text)
# Attach shown cards to results
for result in results:
if result["player_id"] in shown_cards:
result["showed_cards"] = shown_cards[result["player_id"]]
# Determine table size
table_size = len(players)
# Compute positions based on dealer and player order
# PokerNow stacks are listed in seat order; dealer is identified in hand_start
positions = assign_positions(players, hand_info.get("dealer_id", ""), table_size)
# Enrich players with positions
for i, p in enumerate(players):
p["position"] = positions[i] if i < len(positions) else "unknown"
return {
"hand_number": hand_info["hand_number"],
"timestamp": timestamp,
"table_size": table_size,
"dealer_id": hand_info["dealer_id"],
"dealer_name": hand_info["dealer_name"],
"small_blind": small_blind,
"big_blind": big_blind,
"ante": ante,
"players": players,
"hero_cards": hero_cards,
"community_cards": community_cards,
"actions": actions,
"results": results,
"pot": pot,
"rabbit_cards": rabbit_cards,
}
def assign_positions(players: list[dict], dealer_id: str, table_size: int) -> list[str]:
"""Assign position labels based on dealer and table size."""
if table_size == 0:
return []
# Find dealer index
dealer_idx = 0
for i, p in enumerate(players):
if p["pokernow_id"] == dealer_id:
dealer_idx = i
break
# Position labels by table size
if table_size == 2:
labels = ["BTN", "BB"] # In HU, BTN is also SB
elif table_size == 3:
labels = ["BTN", "SB", "BB"]
elif table_size == 4:
labels = ["BTN", "SB", "BB", "CO"]
elif table_size == 5:
labels = ["BTN", "SB", "BB", "UTG", "CO"]
elif table_size == 6:
labels = ["BTN", "SB", "BB", "UTG", "HJ", "CO"]
elif table_size == 7:
labels = ["BTN", "SB", "BB", "UTG", "UTG1", "HJ", "CO"]
elif table_size == 8:
labels = ["BTN", "SB", "BB", "UTG", "UTG1", "MP", "HJ", "CO"]
elif table_size == 9:
labels = ["BTN", "SB", "BB", "UTG", "UTG1", "UTG2", "MP", "HJ", "CO"]
elif table_size >= 10:
labels = ["BTN", "SB", "BB", "UTG", "UTG1", "UTG2", "MP", "MP1", "HJ", "CO"]
else:
labels = [f"Seat{i}" for i in range(table_size)]
# Trim to actual table size
labels = labels[:table_size]
# Assign starting from dealer position
positions = [""] * table_size
for i in range(table_size):
seat_idx = (dealer_idx + i) % table_size
positions[seat_idx] = labels[i]
return positions
# ── Entry point ──────────────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print(json.dumps({"error": "Usage: python parse_pokernow.py <csv_file_path>"}))
sys.exit(1)
filepath = sys.argv[1]
try:
result = parse_pokernow_csv(filepath)
print(json.dumps(result, indent=2))
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()