ingest_data_tool
Loads video data from a specified directory into the RAG index for natural language search and interaction with video content.
Instructions
Loads data from a directory into the Ragie index. Wait until the data is fully ingested before continuing.
Args:
directory (str): The directory to load data from.
Returns:
str: A message indicating that the data was loaded successfully.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| directory | Yes |
Implementation Reference
- server.py:6-22 (handler)The handler function for 'ingest_data_tool', decorated with @mcp.tool() for registration. It clears the existing index, ingests data from the specified directory using helper functions, and returns a success or error message. The function signature and docstring define the input schema.@mcp.tool() def ingest_data_tool(directory: str) -> None: """ Loads data from a directory into the Ragie index. Wait until the data is fully ingested before continuing. Args: directory (str): The directory to load data from. Returns: str: A message indicating that the data was loaded successfully. """ try: clear_index() ingest_data(directory) return "Data loaded successfully" except Exception as e: return f"Failed to load data: {str(e)}"
- main.py:49-84 (helper)Helper function called by the tool handler to ingest files from the directory into the Ragie index. Processes each file, uploads as document with video/audio mode, and waits for readiness.def ingest_data(directory): # Get list of files in directory directory_path = Path(directory) files = os.listdir(directory_path) for file in files: try: file_path = directory_path / file # Read file content with open(file_path, mode='rb') as f: file_content = f.read() # Create document in Ragie response = ragie.documents.create(request={ "file": { "file_name": file, "content": file_content, }, "mode": { "video": "audio_video", "audio": True } }) # Wait for document to be ready while True: res = ragie.documents.get(document_id=response.id) if res.status == "ready": break time.sleep(2) logger.info(f"Successfully uploaded {file}") except Exception as e: logger.error(f"Failed to process file {file}: {str(e)}") continue
- main.py:21-47 (helper)Helper function called by the tool handler to clear all existing documents from the Ragie index before ingesting new data.def clear_index(): while True: try: # List all documents response = ragie.documents.list() documents = response.result.documents # Process each document for document in documents: try: ragie.documents.delete( document_id=document.id ) logger.info(f"Deleted document {document.id}") except Exception as e: logger.error(f"Failed to delete document {document.id}: {str(e)}") raise # Check if there are more documents if not response.result.pagination.next_cursor: logger.warning("No more documents\n") break except Exception as e: logger.error(f"Failed to retrieve or process documents: {str(e)}") raise