Skip to main content
Glama

Video RAG MCP Server

by FiloHany
main.py•4.08 kB
import os import time import logging from pathlib import Path from dotenv import load_dotenv from ragie import Ragie from moviepy import VideoFileClip load_dotenv() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # initialize ragie client ragie = Ragie( auth=os.getenv('RAGIE_API_KEY'), ) # Remove previous docs from index def clear_index(): while True: try: # List all documents response = ragie.documents.list() documents = response.result.documents # Process each document for document in documents: try: ragie.documents.delete( document_id=document.id ) logger.info(f"Deleted document {document.id}") except Exception as e: logger.error(f"Failed to delete document {document.id}: {str(e)}") raise # Check if there are more documents if not response.result.pagination.next_cursor: logger.warning("No more documents\n") break except Exception as e: logger.error(f"Failed to retrieve or process documents: {str(e)}") raise # Ingest data from a directory into the Ragie index def ingest_data(directory): # Get list of files in directory directory_path = Path(directory) files = os.listdir(directory_path) for file in files: try: file_path = directory_path / file # Read file content with open(file_path, mode='rb') as f: file_content = f.read() # Create document in Ragie response = ragie.documents.create(request={ "file": { "file_name": file, "content": file_content, }, "mode": { "video": "audio_video", "audio": True } }) # Wait for document to be ready while True: res = ragie.documents.get(document_id=response.id) if res.status == "ready": break time.sleep(2) logger.info(f"Successfully uploaded {file}") except Exception as e: logger.error(f"Failed to process file {file}: {str(e)}") continue # Retrieve data from the Ragie index def retrieve_data(query): try: logger.info(f"Retrieving data for query: {query}") retrieval_response = ragie.retrievals.retrieve(request={ "query": query }) content = [ { **chunk.document_metadata, "text": chunk.text, "document_name": chunk.document_name, "start_time": chunk.metadata.get("start_time"), "end_time": chunk.metadata.get("end_time") } for chunk in retrieval_response.scored_chunks ] logger.info(f"Successfully retrieved {len(content)} chunks") return content except Exception as e: logger.error(f"Failed to retrieve data: {str(e)}") raise def chunk_video(document_name, start_time, end_time, directory="videos"): # Create output filename output_dir = Path("video_chunks") output_dir.mkdir(parents=True, exist_ok=True) chunk_filename = f"video_chunk_{start_time:.1f}_{end_time:.1f}.mp4" output_path = output_dir / chunk_filename with VideoFileClip(directory + "/" + document_name) as video: video_duration = video.duration actual_end_time = min(end_time, video_duration) if end_time is not None else video_duration video_chunk = video.subclipped(start_time, actual_end_time) video_chunk.write_videofile(str(output_path)) return output_path if __name__ == "__main__": clear_index() ingest_data("video") print(retrieve_data("What is the main topic of the video?"))

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FiloHany/Video_RAG_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server