Skip to main content
Glama
transcript_parser.py4.27 kB
import os import re from datetime import datetime, timedelta, timezone from pydantic import BaseModel class Speaker(BaseModel): index: int name: str role: str class ParsedMessage(BaseModel): speaker_index: int speaker_name: str role: str relative_timestamp: str actual_timestamp: datetime content: str def parse_timestamp(timestamp: str) -> timedelta: if 'm' in timestamp: match = re.match(r'(\d+)m(?:\s*(\d+)s)?', timestamp) if match: minutes = int(match.group(1)) seconds = int(match.group(2)) if match.group(2) else 0 return timedelta(minutes=minutes, seconds=seconds) elif 's' in timestamp: match = re.match(r'(\d+)s', timestamp) if match: seconds = int(match.group(1)) return timedelta(seconds=seconds) return timedelta() # Return 0 duration if parsing fails def parse_conversation_file(file_path: str, speakers: list[Speaker]) -> list[ParsedMessage]: with open(file_path) as file: content = file.read() messages = content.split('\n\n') speaker_dict = {speaker.index: speaker for speaker in speakers} parsed_messages: list[ParsedMessage] = [] # Find the last timestamp to determine podcast duration last_timestamp = timedelta() for message in reversed(messages): lines = message.strip().split('\n') if lines: first_line = lines[0] parts = first_line.split(':', 1) if len(parts) == 2: header = parts[0] header_parts = header.split() if len(header_parts) >= 2: timestamp = header_parts[1].strip('()') last_timestamp = parse_timestamp(timestamp) break # Calculate the start time now = datetime.now(timezone.utc) podcast_start_time = now - last_timestamp for message in messages: lines = message.strip().split('\n') if lines: first_line = lines[0] parts = first_line.split(':', 1) if len(parts) == 2: header, content = parts header_parts = header.split() if len(header_parts) >= 2: speaker_index = int(header_parts[0]) timestamp = header_parts[1].strip('()') if len(lines) > 1: content += '\n' + '\n'.join(lines[1:]) delta = parse_timestamp(timestamp) actual_time = podcast_start_time + delta speaker = speaker_dict.get(speaker_index) if speaker: speaker_name = speaker.name role = speaker.role else: speaker_name = f'Unknown Speaker {speaker_index}' role = 'Unknown' parsed_messages.append( ParsedMessage( speaker_index=speaker_index, speaker_name=speaker_name, role=role, relative_timestamp=timestamp, actual_timestamp=actual_time, content=content.strip(), ) ) return parsed_messages def parse_podcast_messages(): file_path = 'podcast_transcript.txt' script_dir = os.path.dirname(__file__) relative_path = os.path.join(script_dir, file_path) speakers = [ Speaker(index=0, name='Stephen DUBNER', role='Host'), Speaker(index=1, name='Tania Tetlow', role='Guest'), Speaker(index=4, name='Narrator', role='Narrator'), Speaker(index=5, name='Kamala Harris', role='Quoted'), Speaker(index=6, name='Unknown Speaker', role='Unknown'), Speaker(index=7, name='Unknown Speaker', role='Unknown'), Speaker(index=8, name='Unknown Speaker', role='Unknown'), Speaker(index=10, name='Unknown Speaker', role='Unknown'), ] parsed_conversation = parse_conversation_file(relative_path, speakers) print(f'Number of messages: {len(parsed_conversation)}') return parsed_conversation

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/getzep/graphiti'

If you have feedback or need assistance with the MCP directory API, please join our Discord server