Skip to main content
Glama
video_client.py20.6 kB
#!/usr/bin/env python3 """Standalone client for video analysis using Ollama.""" import asyncio import click import sys from pathlib import Path from typing import Optional import json from rich.console import Console from rich.table import Table from rich.progress import Progress, SpinnerColumn, TextColumn from rich.panel import Panel from rich.markdown import Markdown # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from src.storage.manager import StorageManager from src.processors.video import VideoProcessor from src.llm.ollama_client import OllamaClient from src.utils.config import get_config console = Console() class VideoAnalysisClient: """Standalone client for video analysis.""" def __init__(self): self.storage = StorageManager() self.llm_client = None self.processor = None # Will be initialized with LLM client self.config = get_config() async def __aenter__(self): """Async context manager entry.""" self.llm_client = OllamaClient() self.processor = VideoProcessor(self.storage, llm_client=self.llm_client) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" if self.llm_client: await self.llm_client.close() if self.processor: self.processor.cleanup() async def process_video(self, video_path: str, location: Optional[str] = None, timestamp: Optional[str] = None, show_progress: bool = True) -> str: """Process a video file.""" video_path = Path(video_path).absolute() if not video_path.exists(): raise FileNotFoundError(f"Video not found: {video_path}") # Parse timestamp if provided from datetime import datetime from src.utils.date_parser import DateParser parsed_timestamp = None if timestamp: try: parsed_timestamp = datetime.fromisoformat(timestamp) except: # Try natural language parsing start_time, _ = DateParser.parse_date_query(timestamp) parsed_timestamp = start_time with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console ) as progress: # Store video task = progress.add_task("Storing video...", total=None) metadata = await self.storage.store_video( str(video_path), location=location, recording_timestamp=parsed_timestamp ) progress.update(task, description=f"Video ID: {metadata.video_id}") # Process video progress.update(task, description="Extracting frames...") result = await self.processor.process_video(metadata.video_id) progress.update(task, description="✅ Processing complete!") return metadata.video_id async def ask_question(self, video_id: str, question: str) -> str: """Ask a question about a video.""" if not self.llm_client: self.llm_client = OllamaClient() # Get video data result = self.storage.get_processing_result(video_id) if not result: raise ValueError(f"Video {video_id} not found or not processed") # Build context context_parts = [] if result.timeline: context_parts.append("Visual timeline:") for frame in result.timeline[:30]: context_parts.append(f"[{frame.timestamp:.1f}s] {frame.description}") if result.transcript: context_parts.append("\nAudio transcript:") context_parts.append(result.transcript[:1500]) context = "\n".join(context_parts) # Get answer answer = await self.llm_client.answer_video_question(question, context) return answer or "Unable to generate answer" async def search_videos(self, query: str, limit: int = 10): """Search for videos.""" results = self.storage.search_videos(query, limit) return results async def list_videos(self): """List all videos.""" with self.storage._get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT video_id, filename, duration, status, created_at FROM videos ORDER BY created_at DESC """) videos = [] for row in cursor.fetchall(): videos.append({ 'video_id': row['video_id'], 'filename': row['filename'], 'duration': row['duration'], 'status': row['status'], 'created_at': row['created_at'] }) return videos async def get_summary(self, video_id: str) -> str: """Get video summary.""" if not self.llm_client: self.llm_client = OllamaClient() result = self.storage.get_processing_result(video_id) if not result: raise ValueError(f"Video {video_id} not found") # Get frame descriptions descriptions = [f.description for f in result.timeline if f.description] # Generate summary summary = await self.llm_client.generate_video_summary( descriptions[:20], result.transcript ) return summary or "Unable to generate summary" def _format_time_ago(self, timestamp): """Format timestamp as time ago.""" from datetime import datetime now = datetime.now() diff = now - timestamp if diff.days == 0: if diff.seconds < 3600: return f"{diff.seconds // 60} minutes ago" else: return f"{diff.seconds // 3600} hours ago" elif diff.days == 1: return "yesterday" elif diff.days < 7: return f"{diff.days} days ago" else: return timestamp.strftime("%B %d, %Y") # CLI Commands @click.group() def cli(): """Video Analysis CLI - Analyze videos using local AI models.""" pass @cli.command() @click.argument('video_path', type=click.Path(exists=True)) @click.option('--location', help='Location name (e.g., shed, driveway)') @click.option('--timestamp', help='Recording timestamp (ISO format or natural language)') @click.option('--no-progress', is_flag=True, help='Disable progress display') def process(video_path, location, timestamp, no_progress): """Process a video file for analysis.""" async def run(): async with VideoAnalysisClient() as client: try: video_id = await client.process_video( video_path, location=location, timestamp=timestamp, show_progress=not no_progress ) console.print(Panel( f"✅ Video processed successfully!\n\n" f"Video ID: [bold cyan]{video_id}[/bold cyan]\n\n" f"You can now ask questions about this video using:\n" f"[dim]video_client ask {video_id} \"Your question here\"[/dim]", title="Success", border_style="green" )) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) asyncio.run(run()) @cli.command() @click.argument('video_id') @click.argument('question') def ask(video_id, question): """Ask a question about a processed video.""" async def run(): async with VideoAnalysisClient() as client: try: with console.status("Thinking..."): answer = await client.ask_question(video_id, question) console.print(Panel( Markdown(f"**Question:** {question}\n\n**Answer:** {answer}"), title=f"Video {video_id}", border_style="blue" )) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) asyncio.run(run()) @cli.command() @click.argument('query') @click.option('--limit', default=10, help='Maximum results') def search(query, limit): """Search for videos by content.""" async def run(): async with VideoAnalysisClient() as client: try: results = await client.search_videos(query, limit) if not results: console.print(f"No videos found matching '{query}'") return table = Table(title=f"Search Results for '{query}'") table.add_column("Video ID", style="cyan") table.add_column("Filename", style="green") table.add_column("Matches", style="yellow") for video_id, filename, matches in results: table.add_row(video_id, filename, str(matches)) console.print(table) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) asyncio.run(run()) @cli.command() def list(): """List all videos in the system.""" async def run(): async with VideoAnalysisClient() as client: try: videos = await client.list_videos() if not videos: console.print("No videos found") return table = Table(title="Videos") table.add_column("Video ID", style="cyan") table.add_column("Filename", style="green") table.add_column("Duration", style="yellow") table.add_column("Status", style="magenta") table.add_column("Created", style="blue") for video in videos: duration = f"{video['duration']:.1f}s" if video['duration'] else "N/A" status_color = { 'completed': 'green', 'processing': 'yellow', 'failed': 'red', 'pending': 'dim' }.get(video['status'], 'white') table.add_row( video['video_id'], video['filename'], duration, f"[{status_color}]{video['status']}[/{status_color}]", video['created_at'][:19] # Trim microseconds ) console.print(table) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) asyncio.run(run()) @cli.command() @click.argument('video_id') def summary(video_id): """Get a summary of a video.""" async def run(): async with VideoAnalysisClient() as client: try: with console.status("Generating summary..."): summary_text = await client.get_summary(video_id) console.print(Panel( Markdown(f"**Video Summary**\n\n{summary_text}"), title=f"Video {video_id}", border_style="green" )) except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) asyncio.run(run()) @cli.command() @click.option('--location', help='Location to query (e.g., shed, driveway)') @click.option('--time', 'time_query', help='Time query (e.g., today, yesterday, last week)') @click.option('--content', help='Search for specific content (e.g., car, person)') @click.option('--limit', default=20, help='Maximum results') def query(location, time_query, content, limit): """Query videos by location and time. Examples: video_client query --location shed --time today video_client query --location driveway --time "last week" --content car video_client query --time yesterday # All locations """ async def run(): async with VideoAnalysisClient() as client: try: from src.utils.date_parser import DateParser # Build query description query_parts = [] if location: query_parts.append(f"at [yellow]{location}[/yellow]") if time_query: query_parts.append(f"[cyan]{time_query}[/cyan]") if content: query_parts.append(f"containing [green]{content}[/green]") query_desc = " ".join(query_parts) if query_parts else "all videos" with console.status(f"Searching for videos {query_desc}..."): # Query using storage directly start_time = None end_time = None if time_query: start_time, end_time = DateParser.parse_date_query(time_query) videos = client.storage.query_videos_by_location_and_time( location=location, start_time=start_time, end_time=end_time, content_query=content, limit=limit ) if not videos: console.print(f"No videos found {query_desc}") return # Create table table = Table(title=f"Videos {query_desc}") table.add_column("Time", style="cyan") table.add_column("Location", style="yellow") table.add_column("Video ID", style="dim") table.add_column("Duration", style="green") table.add_column("Summary", style="white", max_width=50) for video in videos: # Get summary result = client.storage.get_processing_result(video.video_id) summary = "Not analyzed" if result and result.timeline: summaries = [] for frame in result.timeline[:2]: if frame.description: summaries.append(frame.description) if summaries: summary = "; ".join(summaries)[:50] + "..." # Format time time_str = video.recording_timestamp.strftime("%m/%d %I:%M %p") from datetime import datetime time_ago = client._format_time_ago(video.recording_timestamp) table.add_row( f"{time_str}\n[dim]{time_ago}[/dim]", video.location, video.video_id[:8] + "...", f"{video.duration:.1f}s" if video.duration else "N/A", summary ) console.print(table) console.print(f"\n[dim]Found {len(videos)} videos[/dim]") except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) asyncio.run(run()) @cli.command() @click.option('--chat-llm', default='llama2:latest', help='Ollama model for chat interactions') @click.option('--server', default='http://localhost:8000', help='MCP server URL for HTTP mode') @click.option('--stdio', is_flag=True, help='Use stdio mode instead of HTTP (legacy)') def chat(chat_llm, server, stdio): """Start interactive chat interface.""" console.print("[bold blue]Starting MCP Video Analysis Chat...[/bold blue]") console.print(f"[dim]Chat LLM: {chat_llm}[/dim]") # Import and run the chat interface import subprocess import os if stdio: # Use legacy stdio client console.print(f"[dim]Mode: stdio (legacy)[/dim]\n") chat_script = Path(__file__).parent / "mcp_chat_client.py" subprocess.run([sys.executable, str(chat_script), '--chat-llm', chat_llm]) else: # Use HTTP client (default) console.print(f"[dim]Server: {server}[/dim]") console.print(f"[dim]Mode: HTTP (recommended)[/dim]\n") # Check if server is reachable try: import httpx import asyncio async def check_server(): try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"{server}/health") return response.status_code == 200 except: return False server_available = asyncio.run(check_server()) if not server_available: console.print(f"[yellow]Warning: Server at {server} is not responding[/yellow]") console.print(f"[yellow]Start the server with: python server.py --http --host 0.0.0.0 --port 8000[/yellow]") console.print(f"[yellow]Or use --stdio flag for legacy mode[/yellow]") return except ImportError: console.print("[red]httpx not available for HTTP mode[/red]") return # Use HTTP client chat_script = Path(__file__).parent / "mcp_http_client.py" subprocess.run([ sys.executable, str(chat_script), '--server', server, '--chat-llm', chat_llm ]) @cli.command() def status(): """Check system status.""" async def run(): async with VideoAnalysisClient() as client: try: # Check Ollama ollama_status = "✅ Running" if await client.llm_client.is_available() else "❌ Not running" # Check models models = await client.llm_client.list_models() model_names = [m["name"] for m in models] required_models = ["llava:latest", "llama2:latest"] model_status = {} for model in required_models: model_status[model] = "✅" if model in model_names else "❌" # Get storage stats stats = client.storage.get_storage_stats() console.print(Panel( f"**Ollama Status:** {ollama_status}\n\n" f"**Required Models:**\n" f" • llava:latest: {model_status.get('llava:latest', '❌')}\n" f" • llama2:latest: {model_status.get('llama2:latest', '❌')}\n\n" f"**Storage:**\n" f" • Total videos: {stats['total_videos']}\n" f" • Processed: {stats['processed_videos']}\n" f" • Storage used: {stats['total_size_gb']:.2f} GB\n" f" • Base path: {client.storage.base_path}", title="System Status", border_style="blue" )) if "❌" in model_status.values(): console.print("\n[yellow]Missing models! Install with:[/yellow]") for model, status in model_status.items(): if status == "❌": console.print(f" ollama pull {model}") except Exception as e: console.print(f"[red]Error: {e}[/red]") sys.exit(1) asyncio.run(run()) if __name__ == "__main__": cli()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/michaelbaker-dev/mcpVideoParser'

If you have feedback or need assistance with the MCP directory API, please join our Discord server