Skip to main content
Glama
ollama_client.py11.8 kB
"""Ollama integration for LLM interactions.""" import base64 import json from pathlib import Path from typing import List, Dict, Any, Optional, Union import httpx import asyncio from PIL import Image import io from ..utils.config import get_config from ..utils.logging import get_logger from ..storage.schemas import FrameAnalysis class OllamaClient: """Client for interacting with Ollama LLM.""" def __init__(self, host: Optional[str] = None): self.config = get_config() self.host = host or self.config.llm.ollama_host self.logger = get_logger(__name__) self.timeout = self.config.llm.timeout_seconds # Create async client self.client = httpx.AsyncClient( base_url=self.host, timeout=httpx.Timeout(self.timeout, connect=10.0) ) async def __aenter__(self): """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.close() async def close(self): """Close the HTTP client.""" await self.client.aclose() async def is_available(self) -> bool: """Check if Ollama is available and running.""" try: response = await self.client.get("/api/tags") return response.status_code == 200 except Exception as e: self.logger.error(f"Ollama not available: {e}") return False async def list_models(self) -> List[Dict[str, Any]]: """List available models in Ollama.""" try: response = await self.client.get("/api/tags") response.raise_for_status() data = response.json() return data.get("models", []) except Exception as e: self.logger.error(f"Error listing models: {e}") return [] async def pull_model(self, model_name: str) -> bool: """Pull a model if not already available.""" try: # Check if model exists models = await self.list_models() if any(m["name"] == model_name for m in models): self.logger.info(f"Model {model_name} already available") return True # Pull model self.logger.info(f"Pulling model {model_name}...") response = await self.client.post( "/api/pull", json={"name": model_name} ) response.raise_for_status() # Wait for pull to complete # In real implementation, stream the response to show progress return True except Exception as e: self.logger.error(f"Error pulling model {model_name}: {e}") return False async def analyze_image( self, image_path: Union[str, Path], prompt: str, model: Optional[str] = None ) -> Optional[str]: """Analyze an image using vision model.""" model = model or self.config.llm.vision_model try: # Read and encode image image_path = Path(image_path) if not image_path.exists(): raise FileNotFoundError(f"Image not found: {image_path}") # Encode image to base64 with open(image_path, "rb") as f: image_data = f.read() image_base64 = base64.b64encode(image_data).decode() # Prepare request request_data = { "model": model, "prompt": prompt, "images": [image_base64], "stream": False } # Send request response = await self.client.post( "/api/generate", json=request_data ) response.raise_for_status() # Parse response result = response.json() return result.get("response", "").strip() except Exception as e: self.logger.error(f"Error analyzing image {image_path}: {e}") return None async def analyze_frame(self, frame_path: Path, frame_number: int, timestamp: float) -> FrameAnalysis: """Analyze a single video frame.""" prompt = ( "Describe what you see in this video frame. " "Be concise but include important details like: " "- Main subjects or people\n" "- Actions or activities\n" "- Objects and their positions\n" "- Scene setting or location\n" "Limit your response to 2-3 sentences." ) description = await self.analyze_image(frame_path, prompt) if description: # Extract objects from description (simplified) objects = self._extract_objects_from_description(description) return FrameAnalysis( frame_number=frame_number, timestamp=timestamp, frame_path=str(frame_path), description=description, objects_detected=objects, confidence=0.8 # Default confidence ) else: return FrameAnalysis( frame_number=frame_number, timestamp=timestamp, frame_path=str(frame_path), description="Failed to analyze frame", objects_detected=[], confidence=0.0 ) def _extract_objects_from_description(self, description: str) -> List[str]: """Extract object names from description (simplified).""" # In a real implementation, use NLP or a separate model # For now, look for common object words common_objects = [ "person", "people", "man", "woman", "child", "car", "vehicle", "truck", "bus", "bicycle", "building", "house", "tree", "road", "street", "table", "chair", "computer", "phone", "book", "dog", "cat", "animal", "bird" ] description_lower = description.lower() found_objects = [] for obj in common_objects: if obj in description_lower: found_objects.append(obj) return found_objects async def generate_text( self, prompt: str, model: Optional[str] = None, system_prompt: Optional[str] = None, temperature: Optional[float] = None ) -> Optional[str]: """Generate text using text model.""" model = model or self.config.llm.text_model temperature = temperature or self.config.llm.temperature try: request_data = { "model": model, "prompt": prompt, "temperature": temperature, "stream": False } if system_prompt: request_data["system"] = system_prompt response = await self.client.post( "/api/generate", json=request_data ) response.raise_for_status() result = response.json() return result.get("response", "").strip() except Exception as e: self.logger.error(f"Error generating text: {e}") return None async def answer_video_question( self, question: str, context: str, model: Optional[str] = None ) -> Optional[str]: """Answer a question about video content.""" system_prompt = ( "You are analyzing video content based on frame descriptions and transcripts. " "Answer questions based solely on the provided context. " "Be concise and accurate. If the information is not in the context, " "say so rather than making assumptions." ) prompt = f"""Video Context: {context} Question: {question} Answer:""" return await self.generate_text( prompt=prompt, model=model, system_prompt=system_prompt ) async def generate_video_summary( self, frame_descriptions: List[str], transcript: Optional[str] = None, model: Optional[str] = None ) -> Optional[str]: """Generate a summary of video content.""" # Prepare context context_parts = [] if frame_descriptions: context_parts.append("Visual content:") for i, desc in enumerate(frame_descriptions[:20]): # Limit to 20 frames context_parts.append(f"- {desc}") if transcript: context_parts.append("\nAudio transcript:") context_parts.append(transcript[:1000]) # Limit transcript length context = "\n".join(context_parts) prompt = f"""Based on the following video content, provide a brief summary (3-5 sentences): {context} Summary:""" return await self.generate_text(prompt=prompt, model=model) async def batch_analyze_frames( self, frame_paths: List[Path], batch_size: int = 5 ) -> List[FrameAnalysis]: """Analyze multiple frames in batches.""" analyses = [] # Process in batches to avoid overwhelming the system for i in range(0, len(frame_paths), batch_size): batch = frame_paths[i:i + batch_size] # Create tasks for parallel processing tasks = [] for j, frame_path in enumerate(batch): frame_number = i + j # Extract timestamp from filename if available timestamp = self._extract_timestamp_from_path(frame_path) task = self.analyze_frame(frame_path, frame_number, timestamp) tasks.append(task) # Wait for batch to complete batch_results = await asyncio.gather(*tasks) analyses.extend(batch_results) # Small delay between batches if i + batch_size < len(frame_paths): await asyncio.sleep(0.5) return analyses def _extract_timestamp_from_path(self, frame_path: Path) -> float: """Extract timestamp from frame filename.""" # Expected format: frame_00001_t1.5s.jpg try: if '_t' in frame_path.stem and 's' in frame_path.stem: timestamp_str = frame_path.stem.split('_t')[1].rstrip('s') return float(timestamp_str) except: pass return 0.0 async def check_model_capabilities(self, model_name: str) -> Dict[str, bool]: """Check what capabilities a model has.""" capabilities = { "vision": False, "text": True, # Most models support text "available": False } try: models = await self.list_models() for model in models: if model["name"] == model_name: capabilities["available"] = True # Check if it's a vision model if "vision" in model_name.lower() or "llava" in model_name.lower(): capabilities["vision"] = True break return capabilities except Exception as e: self.logger.error(f"Error checking model capabilities: {e}") return capabilities

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/michaelbaker-dev/mcpVideoParser'

If you have feedback or need assistance with the MCP directory API, please join our Discord server