Skip to main content
Glama

Voice Mode

by mbailey
exchanges.py•16.8 kB
""" Exchanges command group for voice-mode CLI. """ import sys import json from datetime import datetime, timedelta from pathlib import Path from typing import Optional import click from voice_mode.exchanges import ( ExchangeReader, ExchangeFormatter, ExchangeFilter, ConversationGrouper, ExchangeStats ) @click.group() @click.help_option('-h', '--help', help='Show this message and exit') def exchanges(): """Manage and view conversation exchange logs.""" pass @exchanges.command() @click.help_option('-h', '--help') @click.option('-f', '--format', type=click.Choice(['simple', 'pretty', 'json', 'raw']), default='simple', help='Output format') @click.option('--stt', is_flag=True, help='Show only STT entries') @click.option('--tts', is_flag=True, help='Show only TTS entries') @click.option('-F', '--full', is_flag=True, help='Show full metadata') @click.option('--no-color', is_flag=True, help='Disable colored output') @click.option('-d', '--date', type=click.DateTime(formats=['%Y-%m-%d']), help='Tail specific date (default: today)') @click.option('--transport', type=click.Choice(['local', 'livekit', 'speak-only', 'all']), help='Filter by transport type') @click.option('--provider', help='Filter by provider') def tail(format, stt, tts, full, no_color, date, transport, provider): """Real-time following of exchange logs.""" reader = ExchangeReader() formatter = ExchangeFormatter() filter_obj = ExchangeFilter() # Apply filters if stt: filter_obj.by_type('stt') elif tts: filter_obj.by_type('tts') if transport and transport != 'all': filter_obj.by_transport(transport) if provider: filter_obj.by_provider(provider) # Handle color use_color = not no_color and sys.stdout.isatty() try: # Tail the logs for exchange in filter_obj.apply(reader.tail(follow=True)): if format == 'simple': output = formatter.simple(exchange, color=use_color, show_timing=not full) elif format == 'pretty': output = formatter.pretty(exchange, show_metadata=full) elif format == 'json': output = formatter.json(exchange) elif format == 'raw': output = exchange.to_jsonl() print(output) if format == 'pretty': print() # Extra line between pretty entries sys.stdout.flush() except KeyboardInterrupt: # Clean exit on Ctrl+C pass @exchanges.command() @click.help_option('-h', '--help') @click.option('-n', '--lines', type=int, default=20, help='Number of exchanges to show') @click.option('-c', '--conversation', help='Show specific conversation') @click.option('--today', is_flag=True, help="Show today's exchanges") @click.option('--yesterday', is_flag=True, help="Show yesterday's exchanges") @click.option('-d', '--date', type=click.DateTime(formats=['%Y-%m-%d']), help='Show specific date') @click.option('-f', '--format', type=click.Choice(['simple', 'pretty', 'json']), default='simple', help='Output format') @click.option('--reverse', is_flag=True, help='Show oldest first') @click.option('--no-color', is_flag=True, help='Disable colored output') def view(lines, conversation, today, yesterday, date, format, reverse, no_color): """View recent exchanges without tailing.""" reader = ExchangeReader() formatter = ExchangeFormatter() # Determine which exchanges to show if conversation: exchanges = reader.read_conversation(conversation) elif today: exchanges = list(reader.read_date(datetime.now().date())) elif yesterday: yesterday_date = datetime.now().date() - timedelta(days=1) exchanges = list(reader.read_date(yesterday_date)) elif date: exchanges = list(reader.read_date(date.date())) else: exchanges = reader.get_latest_exchanges(lines) # Apply reverse if requested if not reverse: exchanges = list(reversed(exchanges)) # Handle color use_color = not no_color and sys.stdout.isatty() # Format and output for exchange in exchanges[-lines:] if not conversation else exchanges: if format == 'simple': output = formatter.simple(exchange, color=use_color) elif format == 'pretty': output = formatter.pretty(exchange) elif format == 'json': output = formatter.json(exchange) print(output) if format == 'pretty': print() # Extra line between pretty entries @exchanges.command() @click.help_option('-h', '--help') @click.argument('query') @click.option('-n', '--max-results', type=int, default=50, help='Maximum results to show') @click.option('-d', '--days', type=int, default=7, help='Number of days to search') @click.option('--type', 'exchange_type', type=click.Choice(['stt', 'tts', 'all']), default='all', help='Search specific type') @click.option('--regex', is_flag=True, help='Use regex search') @click.option('-i', '--ignore-case', is_flag=True, default=True, help='Case insensitive search') @click.option('--conversation', is_flag=True, help='Show full conversations') @click.option('-f', '--format', type=click.Choice(['simple', 'json']), default='simple', help='Output format') @click.option('--no-color', is_flag=True, help='Disable colored output') def search(query, max_results, days, exchange_type, regex, ignore_case, conversation, format, no_color): """Search through exchange logs.""" reader = ExchangeReader() formatter = ExchangeFormatter() filter_obj = ExchangeFilter() grouper = ConversationGrouper() # Set up filters filter_obj.by_text(query, regex=regex, ignore_case=ignore_case) if exchange_type != 'all': filter_obj.by_type(exchange_type) # Read exchanges from recent days exchanges = list(filter_obj.apply(reader.read_recent(days))) if conversation: # Group by conversation and show full conversations conversations = grouper.group_exchanges(exchanges) shown_conversations = set() results_count = 0 for exchange in exchanges: if exchange.conversation_id not in shown_conversations: conv = conversations[exchange.conversation_id] # Show conversation header print(f"\n=== Conversation {conv.id} ===") print(f"Duration: {conv.duration} | Exchanges: {conv.exchange_count}") print() # Show all exchanges in conversation for conv_exchange in conv.exchanges: if format == 'simple': output = formatter.simple(conv_exchange, color=not no_color and sys.stdout.isatty()) else: output = formatter.json(conv_exchange) print(output) shown_conversations.add(exchange.conversation_id) results_count += 1 if results_count >= max_results: break else: # Show individual matching exchanges for i, exchange in enumerate(exchanges[:max_results]): if format == 'simple': output = formatter.simple(exchange, color=not no_color and sys.stdout.isatty()) else: output = formatter.json(exchange) print(output) # Summary total_found = len(exchanges) shown = min(total_found, max_results) print(f"\n{shown} of {total_found} results shown", file=sys.stderr) @exchanges.command() @click.help_option('-h', '--help') @click.option('-d', '--days', type=int, help='Stats for last N days') @click.option('--by-hour', is_flag=True, help='Group by hour') @click.option('--by-provider', is_flag=True, help='Group by provider') @click.option('--by-transport', is_flag=True, help='Group by transport') @click.option('--timing', is_flag=True, help='Show timing statistics') @click.option('--conversations', is_flag=True, help='Show conversation stats') @click.option('--errors', is_flag=True, help='Show error statistics') @click.option('--silence', is_flag=True, help='Show silence detection stats') @click.option('--all', 'show_all', is_flag=True, help='Show all statistics') def stats(days, by_hour, by_provider, by_transport, timing, conversations, errors, silence, show_all): """Show statistics about exchanges.""" reader = ExchangeReader() # Read exchanges if days: exchanges = list(reader.read_recent(days)) else: # Default to last 7 days exchanges = list(reader.read_recent(7)) if not exchanges: click.echo("No exchanges found in the specified period.", err=True) return stats_obj = ExchangeStats(exchanges) # If no specific stats requested, show summary if not any([by_hour, by_provider, by_transport, timing, conversations, errors, silence]) or show_all: print(stats_obj.get_summary_report()) return # Show specific stats if by_hour or show_all: print("\nHourly Distribution:") print("-" * 30) hourly = stats_obj.hourly_distribution() for hour, count in sorted(hourly.items()): bar = 'ā–ˆ' * (count // 5) if count > 0 else '' print(f"{hour:02d}:00 {count:4d} {bar}") if by_provider or show_all: print("\nProvider Breakdown:") print("-" * 30) for provider, count in stats_obj.provider_breakdown().items(): print(f"{provider:20s} {count:6d}") if by_transport or show_all: print("\nTransport Breakdown:") print("-" * 30) for transport, count in stats_obj.transport_breakdown().items(): print(f"{transport:20s} {count:6d}") if timing or show_all: print("\nTiming Statistics:") print("-" * 30) timing_stats = stats_obj.timing_stats() if 'overall' in timing_stats and timing_stats['overall']: print("Overall:") if 'avg_turnaround' in timing_stats['overall']: print(f" Avg Turnaround: {timing_stats['overall']['avg_turnaround']:.2f}s") if 'tts' in timing_stats and timing_stats['tts']: print("\nTTS:") for metric, values in timing_stats['tts'].items(): if isinstance(values, dict) and 'avg' in values: print(f" {metric}: avg={values['avg']:.2f}s, " f"min={values['min']:.2f}s, max={values['max']:.2f}s") if 'stt' in timing_stats and timing_stats['stt']: print("\nSTT:") for metric, values in timing_stats['stt'].items(): if isinstance(values, dict) and 'avg' in values: print(f" {metric}: avg={values['avg']:.2f}s, " f"min={values['min']:.2f}s, max={values['max']:.2f}s") if conversations or show_all: print("\nConversation Statistics:") print("-" * 30) conv_stats = stats_obj.conversation_stats() print(f"Total Conversations: {conv_stats['total_conversations']}") print(f"Avg Exchanges/Conv: {conv_stats['exchanges_per_conversation']['avg']:.1f}") print(f"Avg Duration: {conv_stats['duration_seconds']['avg']:.1f}s") print(f"Avg Word Count: {conv_stats['word_count']['avg']:.0f}") if errors or show_all: print("\nError Statistics:") print("-" * 30) error_stats = stats_obj.error_stats() print(f"Total Errors: {error_stats['total_errors']}") print(f"Error Rate: {error_stats['error_rate']:.1%}") if error_stats['error_types']: print("Error Types:") for error_type, count in error_stats['error_types'].items(): print(f" {error_type}: {count}") if silence or show_all: print("\nSilence Detection Statistics:") print("-" * 30) vad_stats = stats_obj.silence_detection_stats() print(f"VAD Enabled: {vad_stats['vad_enabled_count']}") print(f"VAD Disabled: {vad_stats['vad_disabled_count']}") print(f"VAD Usage Rate: {vad_stats['vad_usage_rate']:.1%}") if 'avg_record_time_with_vad' in vad_stats: print(f"Avg Record Time (VAD): {vad_stats['avg_record_time_with_vad']:.1f}s") if 'avg_record_time_without_vad' in vad_stats: print(f"Avg Record Time (No VAD): {vad_stats['avg_record_time_without_vad']:.1f}s") @exchanges.command() @click.help_option('-h', '--help') @click.option('-c', '--conversation', help='Export specific conversation') @click.option('-d', '--date', type=click.DateTime(formats=['%Y-%m-%d']), help='Export date range') @click.option('--days', type=int, help='Export last N days') @click.option('--format', type=click.Choice(['json', 'csv', 'markdown', 'html']), default='json', help='Export format') @click.option('--include-audio', is_flag=True, help='Include audio files (creates zip)') @click.option('--output', '-o', type=click.Path(), help='Output file/directory') def export(conversation, date, days, format, include_audio, output): """Export conversations in various formats.""" reader = ExchangeReader() formatter = ExchangeFormatter() grouper = ConversationGrouper() # Determine what to export if conversation: exchanges = reader.read_conversation(conversation) elif date: exchanges = list(reader.read_date(date.date())) elif days: exchanges = list(reader.read_recent(days)) else: # Default to today exchanges = list(reader.read_date(datetime.now().date())) if not exchanges: click.echo("No exchanges found to export.", err=True) return # Group into conversations conversations = grouper.group_exchanges(exchanges) # Determine output file if not output: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') if format == 'csv': output = f"exchanges_{timestamp}.csv" elif format == 'markdown': output = f"conversations_{timestamp}.md" elif format == 'html': output = f"conversations_{timestamp}.html" else: output = f"exchanges_{timestamp}.json" # Export based on format if format == 'json': data = [] for conv in conversations.values(): data.append(conv.to_dict()) with open(output, 'w') as f: json.dump(data, f, indent=2, default=str) elif format == 'csv': with open(output, 'w') as f: # Write header f.write(formatter.csv_header() + '\n') # Write exchanges for exchange in exchanges: f.write(formatter.csv(exchange) + '\n') elif format == 'markdown': with open(output, 'w') as f: for conv in conversations.values(): f.write(formatter.markdown(conv, include_metadata=True)) f.write('\n\n---\n\n') elif format == 'html': # If multiple conversations, create an index if len(conversations) > 1: # Create directory for multiple files output_dir = Path(output).with_suffix('') output_dir.mkdir(exist_ok=True) # Write individual conversation files for conv in conversations.values(): conv_file = output_dir / f"{conv.id}.html" with open(conv_file, 'w') as f: f.write(formatter.html(conv)) click.echo(f"Exported {len(conversations)} conversations to {output_dir}/") else: # Single conversation conv = list(conversations.values())[0] with open(output, 'w') as f: f.write(formatter.html(conv)) if include_audio: click.echo("Audio export not yet implemented", err=True) click.echo(f"Exported to {output}") if __name__ == '__main__': exchanges()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mbailey/voicemode'

If you have feedback or need assistance with the MCP directory API, please join our Discord server