#!/usr/bin/env python3
"""
Preprocessing script using Claude CLI with Haiku model.
Usage:
python scripts/preprocess_haiku.py # Process all unprocessed
python scripts/preprocess_haiku.py --file "Guest.txt" # Process specific file
python scripts/preprocess_haiku.py --limit 20 # Process first N unprocessed
"""
import argparse
import json
import subprocess
import sys
import time
from pathlib import Path
# Project paths
PROJECT_ROOT = Path(__file__).parent.parent
TRANSCRIPTS_DIR = PROJECT_ROOT / "transcripts"
PREPROCESSED_DIR = PROJECT_ROOT / "preprocessed"
PROMPTS_DIR = PROJECT_ROOT / "prompts"
def load_extraction_prompt() -> str:
"""Load the extraction prompt template."""
prompt_path = PROMPTS_DIR / "extraction.md"
if not prompt_path.exists():
raise FileNotFoundError(f"Extraction prompt not found at {prompt_path}")
return prompt_path.read_text()
def add_line_numbers(text: str) -> str:
"""Add line numbers to transcript text for reference."""
lines = text.split("\n")
numbered_lines = [f"{i+1}: {line}" for i, line in enumerate(lines)]
return "\n".join(numbered_lines)
def process_transcript(transcript_path: Path) -> dict:
"""
Process a single transcript using Claude CLI with Haiku.
"""
# Read transcript
transcript_text = transcript_path.read_text(encoding="utf-8")
# Add line numbers for reference
numbered_transcript = add_line_numbers(transcript_text)
# Load prompt and combine
prompt_template = load_extraction_prompt()
full_prompt = prompt_template + "\n\n" + numbered_transcript
# Call Claude CLI
print(f" Sending to Claude Haiku...")
result = subprocess.run(
["claude", "--model", "haiku", "-p", "Process this transcript and return ONLY the JSON, no markdown code blocks"],
input=full_prompt,
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
)
if result.returncode != 0:
raise RuntimeError(f"Claude CLI failed: {result.stderr}")
response_text = result.stdout.strip()
# Remove markdown code blocks if present
if "```json" in response_text:
response_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
response_text = response_text.split("```")[1].split("```")[0].strip()
try:
return json.loads(response_text)
except json.JSONDecodeError as e:
print(f" Warning: JSON parse error, attempting repair...")
# Try to find JSON object boundaries
start = response_text.find("{")
end = response_text.rfind("}") + 1
if start != -1 and end > start:
return json.loads(response_text[start:end])
raise ValueError(f"Could not parse JSON: {e}")
def validate_output(data: dict) -> list[str]:
"""Validate the extracted data structure. Returns list of warnings."""
warnings = []
required_keys = ["episode", "topics", "insights", "examples"]
for key in required_keys:
if key not in data:
warnings.append(f"Missing required key: {key}")
if "topics" in data:
count = len(data["topics"])
if count < 10:
warnings.append(f"Only {count} topics (expected 10-20)")
if "insights" in data:
count = len(data["insights"])
if count < 15:
warnings.append(f"Only {count} insights (expected 15-30)")
if "examples" in data:
count = len(data["examples"])
if count < 10:
warnings.append(f"Only {count} examples (expected 10-25)")
return warnings
def get_unprocessed_transcripts() -> list[Path]:
"""Get list of transcripts that haven't been processed yet."""
all_transcripts = list(TRANSCRIPTS_DIR.glob("*.txt"))
unprocessed = []
for transcript in all_transcripts:
output_path = PREPROCESSED_DIR / f"{transcript.stem}.json"
if not output_path.exists():
unprocessed.append(transcript)
return sorted(unprocessed)
def main():
parser = argparse.ArgumentParser(description="Preprocess transcripts with Claude Haiku")
parser.add_argument(
"--file", "-f",
type=str,
help="Process a specific transcript file"
)
parser.add_argument(
"--limit", "-n",
type=int,
help="Limit number of transcripts to process"
)
parser.add_argument(
"--reprocess",
action="store_true",
help="Reprocess all files"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="List files without processing"
)
parser.add_argument(
"--offset", "-o",
type=int,
default=0,
help="Skip first N unprocessed transcripts (for parallel processing)"
)
args = parser.parse_args()
# Ensure output directory exists
PREPROCESSED_DIR.mkdir(exist_ok=True)
# Determine which files to process
if args.file:
transcript_path = TRANSCRIPTS_DIR / args.file
if not transcript_path.exists():
transcript_path = TRANSCRIPTS_DIR / f"{args.file}.txt"
if not transcript_path.exists():
print(f"Error: Transcript not found: {args.file}")
sys.exit(1)
transcripts_to_process = [transcript_path]
elif args.reprocess:
transcripts_to_process = sorted(TRANSCRIPTS_DIR.glob("*.txt"))
else:
transcripts_to_process = get_unprocessed_transcripts()
# Apply offset (for parallel processing)
if args.offset > 0:
transcripts_to_process = transcripts_to_process[args.offset:]
# Apply limit
if args.limit and len(transcripts_to_process) > args.limit:
transcripts_to_process = transcripts_to_process[:args.limit]
if not transcripts_to_process:
print("All transcripts have been processed. Use --reprocess to reprocess.")
return
print(f"Processing {len(transcripts_to_process)} transcript(s) with Claude Haiku")
if args.dry_run:
print("\nFiles to process:")
for t in transcripts_to_process:
print(f" - {t.name}")
return
# Process each transcript
total = len(transcripts_to_process)
successful = 0
failed = []
total_stats = {"topics": 0, "insights": 0, "examples": 0}
for i, transcript_path in enumerate(transcripts_to_process, 1):
print(f"\n[{i}/{total}] Processing: {transcript_path.name}")
output_path = PREPROCESSED_DIR / f"{transcript_path.stem}.json"
start_time = time.time()
try:
result = process_transcript(transcript_path)
# Validate output
warnings = validate_output(result)
# Count stats
t_count = len(result.get("topics", []))
i_count = len(result.get("insights", []))
e_count = len(result.get("examples", []))
total_stats["topics"] += t_count
total_stats["insights"] += i_count
total_stats["examples"] += e_count
if warnings:
print(f" Warnings: {', '.join(warnings)}")
# Save result
with open(output_path, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
elapsed = time.time() - start_time
print(f" Saved: {t_count} topics, {i_count} insights, {e_count} examples ({elapsed:.1f}s)")
successful += 1
except Exception as e:
elapsed = time.time() - start_time
print(f" ERROR: {e} ({elapsed:.1f}s)")
failed.append((transcript_path.name, str(e)))
# Summary
print(f"\n{'='*60}")
print(f"Processing complete!")
print(f" Successful: {successful}/{total}")
print(f" Failed: {len(failed)}/{total}")
print(f"\n Total extracted:")
print(f" Topics: {total_stats['topics']} (avg {total_stats['topics']/max(successful,1):.1f}/episode)")
print(f" Insights: {total_stats['insights']} (avg {total_stats['insights']/max(successful,1):.1f}/episode)")
print(f" Examples: {total_stats['examples']} (avg {total_stats['examples']/max(successful,1):.1f}/episode)")
if failed:
print(f"\nFailed transcripts:")
for name, error in failed:
print(f" - {name}: {error[:50]}...")
if __name__ == "__main__":
main()