delete_document
Remove a document from Chroma vector database using its unique ID. This tool ensures efficient document management and cleanup in the MCP server environment.
Instructions
Delete a document from the Chroma vector database by its ID
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| document_id | Yes |
Implementation Reference
- src/chroma/server.py:496-573 (handler)Primary execution handler for delete_document tool: validates document_id, checks existence, deletes via ChromaDB with retry and backoff, verifies removal, returns success/error text content.@retry_operation("delete_document") async def handle_delete_document(arguments: dict) -> list[types.TextContent]: """Handle document deletion with retry logic and network interruption handling""" doc_id = arguments.get("document_id") if not doc_id: raise DocumentOperationError("Missing document_id") logger.info(f"Attempting to delete document: {doc_id}") # First verify the document exists to avoid network retries for non-existent documents try: logger.info(f"Verifying document existence: {doc_id}") existing = collection.get(ids=[doc_id]) if not existing or not existing.get('ids') or len(existing['ids']) == 0: raise DocumentOperationError(f"Document not found [id={doc_id}]") logger.info(f"Document found, proceeding with deletion: {doc_id}") except Exception as e: if "not found" in str(e).lower(): raise DocumentOperationError(f"Document not found [id={doc_id}]") raise DocumentOperationError(str(e)) # Attempt deletion with exponential backoff max_attempts = MAX_RETRIES current_attempt = 0 last_error = None delay = RETRY_DELAY while current_attempt < max_attempts: try: logger.info(f"Delete attempt {current_attempt + 1}/{max_attempts} for document: {doc_id}") collection.delete(ids=[doc_id]) # Verify deletion was successful try: check = collection.get(ids=[doc_id]) if not check or not check.get('ids') or len(check['ids']) == 0: logger.info(f"Successfully deleted document: {doc_id}") return [ types.TextContent( type="text", text=f"Deleted document '{doc_id}' successfully" ) ] else: raise Exception("Document still exists after deletion") except Exception as e: if "not found" in str(e).lower(): # This is good - means deletion was successful logger.info(f"Successfully deleted document: {doc_id}") return [ types.TextContent( type="text", text=f"Deleted document '{doc_id}' successfully" ) ] raise except Exception as e: last_error = e current_attempt += 1 if current_attempt < max_attempts: logger.warning( f"Delete attempt {current_attempt} failed for document {doc_id}. " f"Retrying in {delay} seconds. Error: {str(e)}" ) await asyncio.sleep(delay) delay *= BACKOFF_FACTOR else: logger.error( f"All delete attempts failed for document {doc_id}. " f"Final error: {str(e)}", exc_info=True ) raise DocumentOperationError(str(e)) # This shouldn't be reached, but just in case raise DocumentOperationError("Operation failed")
- src/chroma/server.py:284-294 (registration)Tool registration in @server.list_tools(): defines name, description, and JSON schema for input validation (requires document_id string).types.Tool( name="delete_document", description="Delete a document from the Chroma vector database by its ID", inputSchema={ "type": "object", "properties": { "document_id": {"type": "string"} }, "required": ["document_id"] } ),
- src/chroma/server.py:337-338 (registration)Dispatch routing in @server.call_tool() handler: matches tool name and invokes the specific delete handler.elif name == "delete_document": return await handle_delete_document(arguments)
- src/chroma/server.py:165-171 (schema)Input schema definition in server.command_options dictionary (matches tool schema)."delete_document": { "type": "object", "properties": { "document_id": {"type": "string"} }, "required": ["document_id"] },
- src/chroma/server.py:41-84 (helper)Retry decorator applied to delete_document handler (@retry_operation("delete_document")), handles retries, error mapping, and exponential backoff for robust operation.def retry_operation(operation_name: str): """Decorator to retry document operations with exponential backoff""" def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): max_retries = 3 for attempt in range(max_retries): try: return await func(*args, **kwargs) except DocumentOperationError as e: if attempt == max_retries - 1: raise e await asyncio.sleep(2 ** attempt) except Exception as e: if attempt == max_retries - 1: # Clean up error message msg = str(e) if msg.lower().startswith(operation_name.lower()): msg = msg[len(operation_name):].lstrip(': ') if msg.lower().startswith('failed'): msg = msg[7:].lstrip(': ') if msg.lower().startswith('search failed'): msg = msg[13:].lstrip(': ') # Map error patterns to friendly messages error_msg = msg.lower() doc_id = kwargs.get('arguments', {}).get('document_id') if "not found" in error_msg: error = f"Document not found{f' [id={doc_id}]' if doc_id else ''}" elif "already exists" in error_msg: error = f"Document already exists{f' [id={doc_id}]' if doc_id else ''}" elif "invalid" in error_msg: error = "Invalid input" elif "filter" in error_msg: error = "Invalid filter" else: error = "Operation failed" raise DocumentOperationError(error) await asyncio.sleep(2 ** attempt) return None return wrapper return decorator