#!/usr/bin/env python3
"""
Preprocessing script for Lenny podcast transcripts.
Uses GPT-OSS 20B via LM Studio (OpenAI-compatible API) to extract
hierarchical structure from transcripts.
Usage:
python scripts/preprocess.py # Process all unprocessed
python scripts/preprocess.py --file "Guest.txt" # Process specific file
python scripts/preprocess.py --reprocess # Reprocess all files
"""
import argparse
import json
import sys
import time
from pathlib import Path
import httpx
# LM Studio settings
LM_STUDIO_URL = "http://localhost:1234/v1/chat/completions"
# Project paths
PROJECT_ROOT = Path(__file__).parent.parent
TRANSCRIPTS_DIR = PROJECT_ROOT / "transcripts"
PREPROCESSED_DIR = PROJECT_ROOT / "preprocessed"
PROMPTS_DIR = PROJECT_ROOT / "prompts"
def load_extraction_prompt() -> str:
"""Load the extraction prompt template."""
prompt_path = PROMPTS_DIR / "extraction.md"
if not prompt_path.exists():
raise FileNotFoundError(f"Extraction prompt not found at {prompt_path}")
return prompt_path.read_text()
def add_line_numbers(text: str) -> str:
"""Add line numbers to transcript text for reference."""
lines = text.split("\n")
numbered_lines = [f"{i+1}: {line}" for i, line in enumerate(lines)]
return "\n".join(numbered_lines)
def process_transcript(transcript_path: Path, model: str = "gpt-oss-20b") -> dict:
"""
Process a single transcript and extract structured data.
Args:
transcript_path: Path to the transcript file
model: Model identifier (for logging, LM Studio uses loaded model)
Returns:
Extracted JSON structure
"""
# Read transcript
transcript_text = transcript_path.read_text(encoding="utf-8")
# Add line numbers for reference
numbered_transcript = add_line_numbers(transcript_text)
# Load prompt and combine
prompt_template = load_extraction_prompt()
full_prompt = prompt_template + "\n\n" + numbered_transcript
# Call LM Studio API
print(f" Sending to LM Studio...")
payload = {
"model": "openai/gpt-oss-20b",
"messages": [
{
"role": "system",
"content": "Reasoning: disabled. Do not output thinking or reasoning. Output only the final JSON response."
},
{"role": "user", "content": full_prompt}
],
"temperature": 0.1,
"max_tokens": 32768,
"top_p": 0.9,
"stream": False,
}
# Use longer timeout for large transcripts
with httpx.Client(timeout=600.0) as client:
response = client.post(LM_STUDIO_URL, json=payload)
response.raise_for_status()
data = response.json()
message = data["choices"][0]["message"]
print(f" Finish reason: {data.get('choices', [{}])[0].get('finish_reason', 'unknown')}")
print(f" Usage: {data.get('usage', {})}")
# Try content first, fall back to reasoning if empty
response_text = message.get("content", "")
if not response_text and message.get("reasoning"):
print(f" Content empty, checking reasoning field...")
reasoning = message.get("reasoning", "")
print(f" Reasoning length: {len(reasoning)} chars")
print(f" Reasoning end: ...{reasoning[-1000:]}")
# Try to find JSON in reasoning
if "{" in reasoning:
response_text = reasoning
try:
result = json.loads(response_text)
except json.JSONDecodeError:
print(f" Warning: Failed to parse JSON, attempting repair...")
print(f" Response length: {len(response_text)} chars")
print(f" Response start: {response_text[:1000]}")
print(f" Response end: {response_text[-500:]}")
result = attempt_json_repair(response_text)
return result
def attempt_json_repair(text: str) -> dict:
"""Attempt to repair malformed JSON output."""
# Try to find JSON object boundaries
start = text.find("{")
end = text.rfind("}") + 1
if start == -1 or end == 0:
raise ValueError("No JSON object found in response")
json_text = text[start:end]
try:
return json.loads(json_text)
except json.JSONDecodeError:
raise ValueError(f"Could not repair JSON: {json_text[:200]}...")
def validate_output(data: dict, transcript_name: str) -> list[str]:
"""Validate the extracted data structure. Returns list of warnings."""
warnings = []
required_keys = ["episode", "topics", "insights", "examples"]
for key in required_keys:
if key not in data:
warnings.append(f"Missing required key: {key}")
if "episode" in data:
ep = data["episode"]
if not ep.get("guest"):
warnings.append("Episode missing guest name")
if not ep.get("summary"):
warnings.append("Episode missing summary")
if "topics" in data:
if len(data["topics"]) < 3:
warnings.append(f"Only {len(data['topics'])} topics found (expected 6-15)")
elif len(data["topics"]) > 20:
warnings.append(f"Too many topics: {len(data['topics'])} (expected 6-15)")
if "insights" in data:
if len(data["insights"]) < 5:
warnings.append(f"Only {len(data['insights'])} insights found")
if "examples" in data:
if len(data["examples"]) < 3:
warnings.append(f"Only {len(data['examples'])} examples found")
return warnings
def get_unprocessed_transcripts() -> list[Path]:
"""Get list of transcripts that haven't been processed yet."""
all_transcripts = list(TRANSCRIPTS_DIR.glob("*.txt"))
unprocessed = []
for transcript in all_transcripts:
output_path = PREPROCESSED_DIR / f"{transcript.stem}.json"
if not output_path.exists():
unprocessed.append(transcript)
return sorted(unprocessed)
def main():
global LM_STUDIO_URL
parser = argparse.ArgumentParser(description="Preprocess Lenny podcast transcripts")
parser.add_argument(
"--file", "-f",
type=str,
help="Process a specific transcript file (by filename)"
)
parser.add_argument(
"--reprocess",
action="store_true",
help="Reprocess all files, overwriting existing"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="List files to process without processing"
)
parser.add_argument(
"--url",
type=str,
default=LM_STUDIO_URL,
help=f"LM Studio API URL (default: {LM_STUDIO_URL})"
)
args = parser.parse_args()
# Update URL if provided
LM_STUDIO_URL = args.url
# Ensure output directory exists
PREPROCESSED_DIR.mkdir(exist_ok=True)
# Determine which files to process
if args.file:
# Process specific file
transcript_path = TRANSCRIPTS_DIR / args.file
if not transcript_path.exists():
# Try with .txt extension
transcript_path = TRANSCRIPTS_DIR / f"{args.file}.txt"
if not transcript_path.exists():
print(f"Error: Transcript not found: {args.file}")
sys.exit(1)
transcripts_to_process = [transcript_path]
elif args.reprocess:
# Process all files
transcripts_to_process = sorted(TRANSCRIPTS_DIR.glob("*.txt"))
else:
# Process only unprocessed files
transcripts_to_process = get_unprocessed_transcripts()
if not transcripts_to_process:
print("All transcripts have been processed. Use --reprocess to reprocess.")
return
print(f"Found {len(transcripts_to_process)} transcript(s) to process")
print(f"Using LM Studio at: {LM_STUDIO_URL}")
if args.dry_run:
print("\nFiles to process:")
for t in transcripts_to_process:
print(f" - {t.name}")
return
# Process each transcript
total = len(transcripts_to_process)
successful = 0
failed = []
for i, transcript_path in enumerate(transcripts_to_process, 1):
print(f"\n[{i}/{total}] Processing: {transcript_path.name}")
output_path = PREPROCESSED_DIR / f"{transcript_path.stem}.json"
start_time = time.time()
try:
result = process_transcript(transcript_path)
# Validate output
warnings = validate_output(result, transcript_path.name)
if warnings:
print(f" Warnings:")
for w in warnings:
print(f" - {w}")
# Save result
with open(output_path, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
elapsed = time.time() - start_time
print(f" Saved to: {output_path.name} ({elapsed:.1f}s)")
successful += 1
except Exception as e:
elapsed = time.time() - start_time
print(f" ERROR: {e} ({elapsed:.1f}s)")
failed.append((transcript_path.name, str(e)))
# Summary
print(f"\n{'='*50}")
print(f"Processing complete!")
print(f" Successful: {successful}/{total}")
print(f" Failed: {len(failed)}/{total}")
if failed:
print(f"\nFailed transcripts:")
for name, error in failed:
print(f" - {name}: {error}")
if __name__ == "__main__":
main()