Skip to main content
Glama
privetin

Chroma MCP Server

by privetin

delete_document

Remove a document from Chroma vector database using its unique ID. This tool ensures efficient document management and cleanup in the MCP server environment.

Instructions

Delete a document from the Chroma vector database by its ID

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
document_idYes

Implementation Reference

  • Primary execution handler for delete_document tool: validates document_id, checks existence, deletes via ChromaDB with retry and backoff, verifies removal, returns success/error text content.
    @retry_operation("delete_document") async def handle_delete_document(arguments: dict) -> list[types.TextContent]: """Handle document deletion with retry logic and network interruption handling""" doc_id = arguments.get("document_id") if not doc_id: raise DocumentOperationError("Missing document_id") logger.info(f"Attempting to delete document: {doc_id}") # First verify the document exists to avoid network retries for non-existent documents try: logger.info(f"Verifying document existence: {doc_id}") existing = collection.get(ids=[doc_id]) if not existing or not existing.get('ids') or len(existing['ids']) == 0: raise DocumentOperationError(f"Document not found [id={doc_id}]") logger.info(f"Document found, proceeding with deletion: {doc_id}") except Exception as e: if "not found" in str(e).lower(): raise DocumentOperationError(f"Document not found [id={doc_id}]") raise DocumentOperationError(str(e)) # Attempt deletion with exponential backoff max_attempts = MAX_RETRIES current_attempt = 0 last_error = None delay = RETRY_DELAY while current_attempt < max_attempts: try: logger.info(f"Delete attempt {current_attempt + 1}/{max_attempts} for document: {doc_id}") collection.delete(ids=[doc_id]) # Verify deletion was successful try: check = collection.get(ids=[doc_id]) if not check or not check.get('ids') or len(check['ids']) == 0: logger.info(f"Successfully deleted document: {doc_id}") return [ types.TextContent( type="text", text=f"Deleted document '{doc_id}' successfully" ) ] else: raise Exception("Document still exists after deletion") except Exception as e: if "not found" in str(e).lower(): # This is good - means deletion was successful logger.info(f"Successfully deleted document: {doc_id}") return [ types.TextContent( type="text", text=f"Deleted document '{doc_id}' successfully" ) ] raise except Exception as e: last_error = e current_attempt += 1 if current_attempt < max_attempts: logger.warning( f"Delete attempt {current_attempt} failed for document {doc_id}. " f"Retrying in {delay} seconds. Error: {str(e)}" ) await asyncio.sleep(delay) delay *= BACKOFF_FACTOR else: logger.error( f"All delete attempts failed for document {doc_id}. " f"Final error: {str(e)}", exc_info=True ) raise DocumentOperationError(str(e)) # This shouldn't be reached, but just in case raise DocumentOperationError("Operation failed")
  • Tool registration in @server.list_tools(): defines name, description, and JSON schema for input validation (requires document_id string).
    types.Tool( name="delete_document", description="Delete a document from the Chroma vector database by its ID", inputSchema={ "type": "object", "properties": { "document_id": {"type": "string"} }, "required": ["document_id"] } ),
  • Dispatch routing in @server.call_tool() handler: matches tool name and invokes the specific delete handler.
    elif name == "delete_document": return await handle_delete_document(arguments)
  • Input schema definition in server.command_options dictionary (matches tool schema).
    "delete_document": { "type": "object", "properties": { "document_id": {"type": "string"} }, "required": ["document_id"] },
  • Retry decorator applied to delete_document handler (@retry_operation("delete_document")), handles retries, error mapping, and exponential backoff for robust operation.
    def retry_operation(operation_name: str): """Decorator to retry document operations with exponential backoff""" def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): max_retries = 3 for attempt in range(max_retries): try: return await func(*args, **kwargs) except DocumentOperationError as e: if attempt == max_retries - 1: raise e await asyncio.sleep(2 ** attempt) except Exception as e: if attempt == max_retries - 1: # Clean up error message msg = str(e) if msg.lower().startswith(operation_name.lower()): msg = msg[len(operation_name):].lstrip(': ') if msg.lower().startswith('failed'): msg = msg[7:].lstrip(': ') if msg.lower().startswith('search failed'): msg = msg[13:].lstrip(': ') # Map error patterns to friendly messages error_msg = msg.lower() doc_id = kwargs.get('arguments', {}).get('document_id') if "not found" in error_msg: error = f"Document not found{f' [id={doc_id}]' if doc_id else ''}" elif "already exists" in error_msg: error = f"Document already exists{f' [id={doc_id}]' if doc_id else ''}" elif "invalid" in error_msg: error = "Invalid input" elif "filter" in error_msg: error = "Invalid filter" else: error = "Operation failed" raise DocumentOperationError(error) await asyncio.sleep(2 ** attempt) return None return wrapper return decorator
Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/privetin/chroma'

If you have feedback or need assistance with the MCP directory API, please join our Discord server