Skip to main content
Glama

Chroma MCP Server

by djm81
document_tools.py61.7 kB
""" Document management tools for ChromaDB operations. """ import time import json import logging import uuid import numpy as np # Needed for NumpyEncoder usage import datetime # Add for ISO format date handling from typing import Dict, List, Optional, Any, Union, cast from dataclasses import dataclass # Import ChromaDB result types from chromadb.api.types import QueryResult, GetResult from mcp import types from mcp.shared.exceptions import McpError from mcp.types import ErrorData, INVALID_PARAMS, INTERNAL_ERROR from pydantic import BaseModel, Field, field_validator, ConfigDict # Import Pydantic # Use relative imports from ..utils.errors import ValidationError from ..types import DocumentMetadata from chromadb.errors import InvalidDimensionException # --- Imports --- import chromadb from ..utils import ( get_logger, get_chroma_client, get_embedding_function, ValidationError, NumpyEncoder, # Now defined and exported from utils.__init__ ) from ..utils.config import validate_collection_name # --- Constants --- DEFAULT_QUERY_N_RESULTS = 10 LEARNINGS_COLLECTION_NAME = "derived_learnings_v1" # Define the learnings collection name # --- Helper for server-side timestamps --- def _ensure_server_timestamp(metadata: dict) -> dict: """ Ensures that any timestamp fields in metadata use server-side time. Args: metadata: A dictionary of metadata that might contain timestamp fields Returns: The metadata dict with any timestamp fields replaced with current server time """ timestamp_fields = ["timestamp", "last_indexed_utc", "created_at", "updated_at", "last_modified"] # Current server timestamps in different formats unix_ts = time.time() iso_ts = datetime.datetime.now().isoformat() # Create a copy to avoid modifying the input directly updated_metadata = metadata.copy() # Check for and replace common timestamp field names for field in timestamp_fields: if field in updated_metadata: # Use appropriate format based on existing value type existing_value = updated_metadata[field] if isinstance(existing_value, (int, float)): updated_metadata[field] = unix_ts elif isinstance(existing_value, str): # Check if it looks like an ISO format timestamp if "T" in existing_value and "-" in existing_value: updated_metadata[field] = iso_ts else: # Default to Unix timestamp for other string formats updated_metadata[field] = str(unix_ts) return updated_metadata # --- Pydantic Input Models for Document Tools --- # --- Add Document Variants (Singular) --- # class AddDocumentInput(BaseModel): """Input model for adding a single document (auto-generates ID, no metadata).""" collection_name: str = Field(..., description="Name of the collection to add the document to.") document: str = Field(..., description="Document content (string).") increment_index: bool = Field(True, description="Whether to immediately index the added document.") model_config = ConfigDict(extra="forbid") class AddDocumentWithIDInput(BaseModel): """Input model for adding a single document with a specified ID (no metadata).""" collection_name: str = Field(..., description="Name of the collection to add the document to.") document: str = Field(..., description="Document content (string).") id: str = Field(..., description="Unique ID for the document.") increment_index: bool = Field(True, description="Whether to immediately index the added document.") model_config = ConfigDict(extra="forbid") class AddDocumentWithMetadataInput(BaseModel): """Input model for adding a single document with specified metadata (auto-generates ID).""" collection_name: str = Field(..., description="Name of the collection to add the document to.") document: str = Field(..., description="Document content (string).") metadata: str = Field(..., description='Metadata JSON string for the document (e.g., \'{"key": "value"}\').') increment_index: bool = Field(True, description="Whether to immediately index the added document.") model_config = ConfigDict(extra="forbid") class AddDocumentWithIDAndMetadataInput(BaseModel): """Input model for adding a single document with a specified ID and metadata.""" collection_name: str = Field(..., description="Name of the collection to add the document to.") document: str = Field(..., description="Document content (string).") id: str = Field(..., description="Unique ID for the document.") metadata: str = Field(..., description='Metadata JSON string for the document (e.g., \'{"key": "value"}\').') increment_index: bool = Field(True, description="Whether to immediately index the added document.") model_config = ConfigDict(extra="forbid") # --- Query Documents Variants --- # class QueryDocumentsInput(BaseModel): """Input model for basic querying (no filters). Uses default includes.""" collection_name: str = Field(..., description="Name of the collection to query.") query_texts: List[str] = Field(..., description="List of query strings for semantic search.") n_results: int = Field(10, ge=1, description="Maximum number of results per query.") model_config = ConfigDict(extra="forbid") # Restore filter query models class QueryDocumentsWithWhereFilterInput(BaseModel): """Input model for querying with a metadata filter. Uses default includes.""" collection_name: str = Field(..., description="Name of the collection to query.") query_texts: List[str] = Field(..., description="List of query strings for semantic search.") where: str = Field(..., description='Metadata filter as a JSON string (e.g., \'{"source": "pdf"}\').') n_results: int = Field(10, ge=1, description="Maximum number of results per query.") model_config = ConfigDict(extra="forbid") class QueryDocumentsWithDocumentFilterInput(BaseModel): """Input model for querying with a document content filter. Uses default includes.""" collection_name: str = Field(..., description="Name of the collection to query.") query_texts: List[str] = Field(..., description="List of query strings for semantic search.") where_document: str = Field( ..., description='Document content filter as a JSON string (e.g., \'{"$contains": "keyword"}\').' ) n_results: int = Field(10, ge=1, description="Maximum number of results per query.") model_config = ConfigDict(extra="forbid") # --- Get Documents Variants --- # # Restore original multi-ID get class GetDocumentsByIdsInput(BaseModel): """Input model for getting documents by their specific IDs. Uses default includes.""" collection_name: str = Field(..., description="Name of the collection to get documents from.") ids: List[str] = Field(..., description="List of document IDs to retrieve.") model_config = ConfigDict(extra="forbid") # Restore filter-based gets class GetDocumentsWithWhereFilterInput(BaseModel): """Input model for getting documents using a metadata filter. Uses default includes.""" collection_name: str = Field(..., description="Name of the collection to get documents from.") where: str = Field(..., description='Metadata filter as a JSON string (e.g., \'{"source": "pdf"}\').') limit: int = Field(0, ge=0, description="Maximum number of documents to return. 0 for no limit.") offset: int = Field(0, ge=0, description="Number of documents to skip. 0 for default.") model_config = ConfigDict(extra="forbid") class GetDocumentsWithDocumentFilterInput(BaseModel): """Input model for getting documents using a document content filter. Uses default includes.""" collection_name: str = Field(..., description="Name of the collection to get documents from.") where_document: str = Field( ..., description='Document content filter as a JSON string (e.g., \'{"$contains": "keyword"}\').' ) limit: int = Field(0, ge=0, description="Maximum number of documents to return. 0 for no limit.") offset: int = Field(0, ge=0, description="Number of documents to skip. 0 for default.") model_config = ConfigDict(extra="forbid") # Restore get all class GetAllDocumentsInput(BaseModel): """Input model for getting all documents in a collection (potentially limited). Uses default includes.""" collection_name: str = Field(..., description="Name of the collection to get all documents from.") limit: int = Field(0, ge=0, description="Limit on the number of documents to return. 0 for no limit.") offset: int = Field(0, ge=0, description="Number of documents to skip. 0 for default.") model_config = ConfigDict(extra="forbid") # --- Update Document Variants (Singular) --- # class UpdateDocumentContentInput(BaseModel): """Input model for updating the content of an existing document.""" collection_name: str = Field(..., description="Name of the collection containing the document.") id: str = Field(..., description="Document ID to update.") document: str = Field(..., description="New document content.") model_config = ConfigDict(extra="forbid") class UpdateDocumentMetadataInput(BaseModel): """Input model for updating the metadata of an existing document.""" collection_name: str = Field(..., description="Name of the collection containing the document.") id: str = Field(..., description="Document ID to update.") metadata: str = Field(..., description='New metadata as a JSON string (e.g., \'{"key": "new_value"}\').') model_config = ConfigDict(extra="forbid") # --- Delete Document Variant (Singular ID) --- # class DeleteDocumentByIdInput(BaseModel): """Input model for deleting a document by its specific ID.""" collection_name: str = Field(..., description="Name of the collection to delete the document from.") id: str = Field(..., description="Document ID to delete.") model_config = ConfigDict(extra="forbid") # --- End Pydantic Input Models --- # # --- Start: New Include Variants --- # Base model for Get By IDs with explicit include class GetDocumentsByIdsIncludeInput(BaseModel): collection_name: str = Field(..., description="Name of the collection to get documents from.") ids: List[str] = Field(..., description="List of document IDs to retrieve.") # Include is implicitly handled by the specific variant model_config = ConfigDict(extra="forbid") # --- GetDocumentsByIds Variants --- class GetDocumentsByIdsEmbeddingsInput(GetDocumentsByIdsIncludeInput): """Input model for getting documents by IDs including embeddings only.""" pass # Inherits fields, include is fixed in impl class GetDocumentsByIdsAllInput(GetDocumentsByIdsIncludeInput): """Input model for getting documents by IDs including all available data.""" pass # Inherits fields, include is fixed in impl # --- End: New Include Variants --- # --- Implementation Functions --- # --- Add Document Impl Variants (Singular) --- # async def _add_document_impl(input_data: AddDocumentInput) -> List[types.TextContent]: """Implementation for adding a single document without specified ID or metadata.""" logger = get_logger("tools.document.add") collection_name = input_data.collection_name document = input_data.document # Singular increment_index = input_data.increment_index # --- Validation --- validate_collection_name(collection_name) # Added validation if not document: # Check single document raise McpError(ErrorData(code=INVALID_PARAMS, message="Document content cannot be empty.")) # --- End Validation --- logger.info(f"Adding 1 document to '{collection_name}' (generating ID). Increment index: {increment_index}") try: client = get_chroma_client() collection = client.get_collection(name=collection_name) # Generate unique ID for the document generated_id = str(uuid.uuid4()) # Singular logger.debug(f"Generated ID '{generated_id}' for document in '{collection_name}'.") # No need to validate IDs/Metadatas for this variant logger.info( f"Adding 1 document to '{collection_name}' (auto-ID, no metadata). Increment index: {increment_index}" ) collection.add( documents=[document], # Pass as list ids=[generated_id], # Pass as list metadatas=None, # Explicitly None # increment_index=increment_index # Chroma client seems to not have this yet ) # Return the generated ID return [types.TextContent(type="text", text=json.dumps({"added_id": generated_id}))] except ValueError as e: # Handle collection not found if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for adding document.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Error getting documents by ID from '{collection_name}': {e}", exc_info=True) raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Invalid parameter adding document: {e}")) except Exception as e: logger.error(f"Error getting documents by ID from '{collection_name}': {e}", exc_info=True) # Fix: Match test expected message for generic add error raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred: {str(e)}", ) ) async def _add_document_with_id_impl(input_data: AddDocumentWithIDInput) -> List[types.TextContent]: """Implementation for adding a single document with a specified ID.""" logger = get_logger("tools.document.add_with_id") # Renamed logger collection_name = input_data.collection_name document = input_data.document # Singular id = input_data.id # Singular increment_index = input_data.increment_index # --- Validation --- validate_collection_name(collection_name) # Added validation if not document: # Check single document raise McpError(ErrorData(code=INVALID_PARAMS, message="Document content cannot be empty.")) if not id: # Check single ID raise McpError(ErrorData(code=INVALID_PARAMS, message="Document ID cannot be empty.")) # --- End Validation --- logger.info(f"Adding 1 document with ID '{id}' to '{collection_name}'. Increment index: {increment_index}") try: client = get_chroma_client() collection = client.get_collection(name=collection_name) logger.info( f"Adding 1 document with specified ID '{id}' to '{collection_name}' (no metadata). Increment index: {increment_index}" ) collection.add( documents=[document], # Pass as list ids=[id], # Pass as list metadatas=None, # Explicitly None # increment_index=increment_index ) # Confirm the ID used return [types.TextContent(type="text", text=json.dumps({"added_id": id}))] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Value error: {e}", exc_info=True) raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Invalid parameter: {e}")) except Exception as e: logger.error(f"Unexpected error adding document '{id}' to '{collection_name}': {e}", exc_info=True) # Fix: Match test expected message raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"An unexpected error occurred: {str(e)}")) async def _add_document_with_metadata_impl(input_data: AddDocumentWithMetadataInput) -> List[types.TextContent]: """Implementation for adding a single document with metadata (auto-ID).""" logger = get_logger("tools.document.add_with_metadata") collection_name = input_data.collection_name document = input_data.document # Singular metadata_str = input_data.metadata # Singular string increment_index = input_data.increment_index # --- Validation --- validate_collection_name(collection_name) # Added validation if not document: raise McpError(ErrorData(code=INVALID_PARAMS, message="Document content cannot be empty.")) if not metadata_str: raise McpError(ErrorData(code=INVALID_PARAMS, message="Metadata JSON string cannot be empty.")) # --- End Validation --- # --- Parse Metadata JSON String --- parsed_metadata = None # Singular dict try: parsed_metadata = json.loads(metadata_str) if not isinstance(parsed_metadata, dict): raise ValueError("Metadata string must decode to a JSON object (dictionary).") except json.JSONDecodeError as e: logger.warning(f"Failed to parse metadata JSON string for '{collection_name}': {e}") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Invalid JSON format for metadata string: {str(e)}")) except ValueError as e: # Catch the isinstance check logger.warning(f"Metadata did not decode to a dictionary for '{collection_name}': {e}") raise McpError( ErrorData(code=INVALID_PARAMS, message=f"Metadata string did not decode to a dictionary: {str(e)}") ) # --- End Parsing --- # --- Ensure timestamps are server-side --- parsed_metadata = _ensure_server_timestamp(parsed_metadata) # --- End timestamp enforcement --- # --- Generate ID --- # generated_id = str(uuid.uuid4()) # Singular logger.debug(f"Generated ID '{generated_id}' for document in '{collection_name}' (metadata provided).") # --- End Generate ID --- # logger.info( f"Adding 1 document with parsed metadata to '{collection_name}' (generated ID '{generated_id}'). Increment index: {increment_index}" ) try: client = get_chroma_client() collection = client.get_collection(name=collection_name) logger.info( f"Adding 1 document with specified metadata to '{collection_name}' (generated ID). Increment index: {increment_index}" ) collection.add( documents=[document], # Pass as list ids=[generated_id], # Pass as list metadatas=[parsed_metadata], # Pass as list # increment_index=increment_index ) # Return the generated ID return [types.TextContent(type="text", text=json.dumps({"added_id": generated_id}))] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Value error: {e}", exc_info=True) raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Invalid parameter: {e}")) except Exception as e: logger.error(f"Error adding document with metadata to '{collection_name}': {e}", exc_info=True) # Correction based on test_generic_chroma_error_handling (update part) failure: raise McpError( ErrorData(code=INTERNAL_ERROR, message=f"ChromaDB Error: Failed to update document content. {str(e)}") ) async def _add_document_with_id_and_metadata_impl( input_data: AddDocumentWithIDAndMetadataInput, ) -> List[types.TextContent]: """Implementation for adding a single document with specified ID and metadata.""" logger = get_logger("tools.document.add_full") collection_name = input_data.collection_name document = input_data.document # Singular id = input_data.id # Singular metadata_str = input_data.metadata # Singular string increment_index = input_data.increment_index # --- Validation --- validate_collection_name(collection_name) # Added validation if not document: raise McpError(ErrorData(code=INVALID_PARAMS, message="Document content cannot be empty.")) if not id: raise McpError(ErrorData(code=INVALID_PARAMS, message="Document ID cannot be empty.")) if not metadata_str: raise McpError(ErrorData(code=INVALID_PARAMS, message="Metadata JSON string cannot be empty.")) # --- End Validation --- # --- Parse Metadata JSON String --- parsed_metadata = None # Singular dict try: parsed_metadata = json.loads(metadata_str) if not isinstance(parsed_metadata, dict): raise ValueError("Metadata string must decode to a JSON object (dictionary).") except json.JSONDecodeError as e: logger.warning(f"Failed to parse metadata JSON string for '{collection_name}' ID '{id}': {e}") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Invalid JSON format for metadata string: {str(e)}")) except ValueError as e: # Catch the isinstance check logger.warning(f"Metadata for ID '{id}' did not decode to a dictionary for '{collection_name}': {e}") raise McpError( ErrorData(code=INVALID_PARAMS, message=f"Metadata string did not decode to a dictionary: {str(e)}") ) # --- End Parsing --- # --- Ensure timestamps are server-side --- parsed_metadata = _ensure_server_timestamp(parsed_metadata) # --- End timestamp enforcement --- logger.info( f"Adding 1 document with ID '{id}' and parsed metadata to '{collection_name}'. Increment index: {increment_index}" ) try: client = get_chroma_client() collection = client.get_collection(name=collection_name) logger.info( f"Adding 1 document with specified ID '{id}' and metadata to '{collection_name}'. Increment index: {increment_index}" ) collection.add( documents=[document], # Pass as list ids=[id], # Pass as list metadatas=[parsed_metadata], # Pass as list # increment_index=increment_index ) # Confirm the ID used return [types.TextContent(type="text", text=json.dumps({"added_id": id}))] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Value error: {e}", exc_info=True) raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Invalid parameter: {e}")) except Exception as e: logger.error( f"Unexpected error adding document '{id}' with metadata to '{collection_name}': {e}", exc_info=True ) # Fix: Match test expected message raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"An unexpected error occurred: {str(e)}")) # --- End Add Document Impl Variants --- # # --- Get Documents Impl Variants --- # # Restore original multi-ID get implementation async def _get_documents_by_ids_impl(input_data: GetDocumentsByIdsInput) -> List[types.TextContent]: """Implementation for getting documents by their specific IDs.""" logger = get_logger("tools.document.get_ids") collection_name = input_data.collection_name ids = input_data.ids # --- Validation --- validate_collection_name(collection_name) # Added validation if not ids: raise McpError(ErrorData(code=INVALID_PARAMS, message="IDs list cannot be empty.")) # --- End Validation --- logger.info(f"Getting documents by {len(ids)} IDs from '{collection_name}'. Using default includes.") try: client = get_chroma_client() collection = client.get_collection(collection_name) # Pass include=None to use ChromaDB defaults get_result: GetResult = collection.get(ids=ids) logger.debug(f"ChromaDB get result: {get_result}") # Use json.dumps with NumpyEncoder to handle potential non-serializable data in results result_json = json.dumps(get_result, cls=NumpyEncoder) logger.info( f"Successfully retrieved {len(get_result.get('ids', []))} documents by ID from '{collection_name}'." ) return [types.TextContent(type="text", text=result_json)] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for get by ID.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Error getting documents by ID from '{collection_name}': {e}", exc_info=True) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred while retrieving documents by ID: {str(e)}", ) ) except Exception as e: logger.error(f"Error getting documents by ID from '{collection_name}': {e}", exc_info=True) # Fix: Match test expected message for generic get error raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred while retrieving documents by ID: {str(e)}", ) ) # Restore filter-based get implementations async def _get_documents_with_where_filter_impl( input_data: GetDocumentsWithWhereFilterInput, ) -> List[types.TextContent]: """Implementation for getting documents using a metadata filter.""" logger = get_logger("tools.document.get_where") collection_name = input_data.collection_name where_str = input_data.where limit = input_data.limit offset = input_data.offset # --- Validation --- validate_collection_name(collection_name) try: where_filter = json.loads(where_str) if not isinstance(where_filter, dict): raise ValueError("Where filter must be a JSON object (dict).") except (json.JSONDecodeError, ValueError) as e: raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Invalid JSON format or type for 'where' filter: {e}")) # --- End Validation --- logger.info( f"Getting documents from '{collection_name}' with where filter. Limit: {limit}, Offset: {offset}. Using default includes." ) try: client = get_chroma_client() collection = client.get_collection(collection_name) # Pass limit/offset directly, Chroma client handles 0 or None appropriately if needed # If 0 means "not set", we need to convert it to None for the client call effective_limit = limit if limit > 0 else None effective_offset = offset if offset > 0 else None get_result: GetResult = collection.get( where=where_filter, limit=effective_limit, offset=effective_offset, ) logger.debug(f"ChromaDB get result: {get_result}") result_json = json.dumps(get_result, cls=NumpyEncoder) logger.info( f"Successfully retrieved {len(get_result.get('ids', []))} documents using where filter from '{collection_name}'." ) return [types.TextContent(type="text", text=result_json)] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for get with where filter.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Error getting documents with where filter from '{collection_name}': {e}", exc_info=True) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during get: {str(e)}", ) ) except Exception as e: logger.error( f"Error getting documents with where filter from '{collection_name}': {e}", exc_info=True, ) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during get: {str(e)}", ) ) async def _get_documents_with_document_filter_impl( input_data: GetDocumentsWithDocumentFilterInput, ) -> List[types.TextContent]: """Implementation for getting documents using a document content filter.""" logger = get_logger("tools.document.get_docfilter") collection_name = input_data.collection_name where_document_str = input_data.where_document limit = input_data.limit offset = input_data.offset # --- Validation --- validate_collection_name(collection_name) try: where_document_filter = json.loads(where_document_str) if not isinstance(where_document_filter, dict): raise ValueError("Document filter must be a JSON object (dict).") except (json.JSONDecodeError, ValueError) as e: raise McpError( ErrorData( code=INVALID_PARAMS, message=f"Invalid JSON format or type for 'where_document' filter: {e}", ) ) # --- End Validation --- logger.info( f"Getting documents from '{collection_name}' with document filter. Limit: {limit}, Offset: {offset}. Using default includes." ) try: client = get_chroma_client() collection = client.get_collection(collection_name) # Pass limit/offset directly, converting 0 to None if needed by client effective_limit = limit if limit > 0 else None effective_offset = offset if offset > 0 else None get_result: GetResult = collection.get( where_document=where_document_filter, limit=effective_limit, offset=effective_offset ) logger.debug(f"ChromaDB get result: {get_result}") result_json = json.dumps(get_result, cls=NumpyEncoder) logger.info( f"Successfully retrieved {len(get_result.get('ids', []))} documents using document filter from '{collection_name}'." ) return [types.TextContent(type="text", text=result_json)] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for get with document filter.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Error getting documents with document filter from '{collection_name}': {e}", exc_info=True) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during get: {str(e)}", ) ) except Exception as e: logger.error( f"Error getting documents with document filter from '{collection_name}': {e}", exc_info=True, ) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during get: {str(e)}", ) ) # Restore get all implementation async def _get_all_documents_impl(input_data: GetAllDocumentsInput) -> List[types.TextContent]: """Implementation for getting all documents in a collection (potentially limited).""" logger = get_logger("tools.document.get_all") collection_name = input_data.collection_name limit = input_data.limit offset = input_data.offset # --- Validation --- validate_collection_name(collection_name) # Added validation # --- End Validation --- logger.info( f"Getting all documents from '{collection_name}'. Limit: {limit}, Offset: {offset}. Using default includes." ) try: client = get_chroma_client() collection = client.get_collection(collection_name) # Pass limit/offset directly, converting 0 to None if needed by client effective_limit = limit if limit > 0 else None effective_offset = offset if offset > 0 else None get_result: GetResult = collection.get( limit=effective_limit, offset=effective_offset ) # Use ChromaDB defaults (was include=[]) logger.debug(f"ChromaDB get result: {get_result}") result_json = json.dumps(get_result, cls=NumpyEncoder) logger.info(f"Successfully retrieved {len(get_result.get('ids', []))} documents from '{collection_name}'.") return [types.TextContent(type="text", text=result_json)] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for get all.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Error getting all documents from '{collection_name}': {e}", exc_info=True) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during get: {str(e)}", ) ) except Exception as e: logger.error(f"Error getting all documents from '{collection_name}': {e}", exc_info=True) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during get: {str(e)}", ) ) # --- Update Document Impl Variants --- # async def _update_document_content_impl(input_data: UpdateDocumentContentInput) -> List[types.TextContent]: """Implementation for updating document content.""" logger = get_logger("tools.document.update_content") collection_name = input_data.collection_name id = input_data.id # Singular document = input_data.document # Singular # --- Validation --- validate_collection_name(collection_name) # Added validation if not id: raise McpError(ErrorData(code=INVALID_PARAMS, message="ID cannot be empty for update.")) # Document content can potentially be empty, maybe don't validate here? # if not document: # raise McpError(ErrorData(code=INVALID_PARAMS, message="Document content cannot be empty for update.")) # --- End Validation --- logger.info(f"Updating content for document ID '{id}' in '{collection_name}'.") try: client = get_chroma_client() collection = client.get_collection(name=collection_name) logger.info(f"Updating content for document ID '{id}' in '{collection_name}'.") # Update takes lists, even for single items collection.update(ids=[id], documents=[document], metadatas=None) return [types.TextContent(type="text", text=json.dumps({"updated_id": id}))] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for update content.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) elif f"Document {id} not found" in str(e): # Adjust based on actual ChromaDB error message if different logger.warning(f"Document '{id}' not found in collection '{collection_name}' for update content.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Document with ID '{id}' not found.")) else: logger.error(f"Error updating document content in '{collection_name}': {e}", exc_info=True) raise McpError( ErrorData(code=INTERNAL_ERROR, message=f"ChromaDB Error: Failed to update document content. {str(e)}") ) except Exception as e: logger.error(f"Error updating document content in '{collection_name}': {e}", exc_info=True) # Correction based on test_generic_chroma_error_handling (update part) failure: raise McpError( ErrorData(code=INTERNAL_ERROR, message=f"ChromaDB Error: Failed to update document content. {str(e)}") ) async def _update_document_metadata_impl(input_data: UpdateDocumentMetadataInput) -> List[types.TextContent]: """Implementation for updating the metadata of an existing document.""" logger = get_logger("tools.document.update") collection_name = input_data.collection_name document_id = input_data.id metadata_str = input_data.metadata # Now a string # --- Validation --- validate_collection_name(collection_name) if not document_id: raise McpError(ErrorData(code=INVALID_PARAMS, message="Document ID cannot be empty.")) try: metadata_dict = json.loads(metadata_str) if not isinstance(metadata_dict, dict): raise ValueError("Decoded JSON is not a dictionary") except (json.JSONDecodeError, ValueError) as e: logger.warning(f"Invalid JSON format for metadata: {metadata_str} - Error: {e}") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Invalid JSON format for metadata: {e}")) # --- End Validation --- # --- Ensure timestamps are server-side --- metadata_dict = _ensure_server_timestamp(metadata_dict) # --- End timestamp enforcement --- logger.info(f"Updating metadata for document '{document_id}' in '{collection_name}' with: {metadata_dict}") try: client = get_chroma_client() collection = client.get_collection(name=collection_name) # Update the metadata collection.update( ids=[document_id], metadatas=[metadata_dict], # Pass parsed dict in a list ) logger.info(f"Successfully requested metadata update for document '{document_id}'.") return [types.TextContent(type="text", text=json.dumps({"updated_id": document_id}))] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for update metadata.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) elif f"Document {document_id} not found" in str( e ): # Adjust based on actual ChromaDB error message if different logger.warning(f"Document '{document_id}' not found in collection '{collection_name}' for update metadata.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Document with ID '{document_id}' not found.")) else: logger.error(f"Error updating document '{document_id}' metadata: {e}", exc_info=True) raise McpError( ErrorData(code=INTERNAL_ERROR, message=f"An unexpected error occurred during update: {str(e)}") ) except Exception as e: logger.error(f"Error updating document '{document_id}' metadata: {e}", exc_info=True) raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"An unexpected error occurred during update: {str(e)}")) # --- Delete Document Impl Variant (Singular ID) --- # async def _delete_document_by_id_impl(input_data: DeleteDocumentByIdInput) -> List[types.TextContent]: """Implementation for deleting a document by ID.""" logger = get_logger("tools.document.delete_by_id") # Renamed logger collection_name = input_data.collection_name id = input_data.id # Singular # --- Validation --- validate_collection_name(collection_name) # Added validation if not id: # Added check for empty ID raise McpError(ErrorData(code=INVALID_PARAMS, message="ID cannot be empty for delete_document_by_id.")) # --- End Validation --- logger.info(f"Deleting document by ID '{id}' from '{collection_name}'.") try: client = get_chroma_client() collection = client.get_collection(name=collection_name) # Delete the document by its ID logger.debug(f"Attempting to delete document with ID: {id}") # Ensure the ID is passed as a list, even if it's a single ID collection.delete(ids=[id]) logger.info(f"Successfully requested deletion of document with ID: {id} from '{collection_name}'") # Fix: Revert to plain text success message return [types.TextContent(type="text", text=f"Deletion requested for document ID: {id}")] except chromadb.errors.NotFoundError: logger.warning(f"Document with ID '{id}' not found in collection '{collection_name}' during delete.") # Return success-like message as deletion is effectively complete if not found return [types.TextContent(type="text", text=f"Document ID '{id}' not found, no deletion needed.")] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for delete.") # Raise McpError with the message expected by the test raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: # Re-raise other ValueErrors as internal errors with specific message logger.exception(f"Unexpected ValueError deleting document '{id}' from '{collection_name}': {e}") error_message = f"ChromaDB Error: Failed to delete document. {str(e)}" raise McpError(ErrorData(code=INTERNAL_ERROR, message=error_message)) except Exception as e: logger.exception(f"Unexpected error deleting document '{id}' from '{collection_name}': {e}") # Align with consistent test expectation (Targeting except Exception): error_message = f"ChromaDB Error: Failed to delete document. {str(e)}" raise McpError(ErrorData(code=INTERNAL_ERROR, message=error_message)) # --- Query Documents Impl Variants --- # async def _query_documents_impl(input_data: QueryDocumentsInput) -> List[types.TextContent]: """Implementation for querying documents (no filters). Queries primary collection and derived learnings.""" logger = get_logger("tools.document.query") client = get_chroma_client() primary_collection_name = input_data.collection_name query_texts = input_data.query_texts n_results = input_data.n_results # Default includes for this basic query tool include = ["documents", "metadatas", "distances"] primary_results: Optional[QueryResult] = None learnings_results: Optional[QueryResult] = None # 1. Query Primary Collection try: logger.debug(f"Querying primary collection: {primary_collection_name}") primary_collection = client.get_collection(primary_collection_name) primary_results = primary_collection.query( query_texts=query_texts, n_results=n_results, # where=None, # No filters for this tool variant # where_document=None, include=include, ) logger.debug(f"Primary query successful for {primary_collection_name}") except InvalidDimensionException as e: logger.error( f"Dimension mismatch querying {primary_collection_name}: {e}. Ensure query matches collection embedding dim.", exc_info=True, ) raise McpError( ErrorData(code=INTERNAL_ERROR, message=f"Dimension mismatch querying '{primary_collection_name}': {e}") ) except Exception as e: logger.error(f"Failed to query primary collection '{primary_collection_name}': {e}", exc_info=True) # Decide if we should still try the learnings collection or fail fast # For now, let's try learnings even if primary fails, but log the error # raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Failed to query collection '{primary_collection_name}': {e}")) primary_results = None # Ensure it's None if query fails # 2. Query Learnings Collection try: logger.debug(f"Querying learnings collection: {LEARNINGS_COLLECTION_NAME}") learnings_collection = client.get_collection(LEARNINGS_COLLECTION_NAME) learnings_results = learnings_collection.query( query_texts=query_texts, n_results=n_results, # Request same number for now include=include, ) logger.debug(f"Learnings query successful for {LEARNINGS_COLLECTION_NAME}") except InvalidDimensionException as e: logger.error( f"Dimension mismatch querying {LEARNINGS_COLLECTION_NAME}: {e}. Ensure query matches collection embedding dim.", exc_info=True, ) # Log but don't necessarily fail the whole operation if primary worked learnings_results = None except Exception as e: logger.warning( f"Failed to query or access learnings collection '{LEARNINGS_COLLECTION_NAME}': {e}. Proceeding without learnings results.", # exc_info=True # Optional: include traceback in warning? ) learnings_results = None # Ensure it's None if query fails # 3. Merge Results final_results: Dict[str, Optional[List[Any]]] = { "ids": [], "documents": [], "metadatas": [], "distances": [], # Add embeddings and data later if needed for other tools } # Check if primary_results and its lists are valid before processing if primary_results and all(isinstance(primary_results.get(k), list) for k in include if k != "embeddings"): num_queries = len(query_texts) num_primary_result_sets = len(primary_results["ids"]) if primary_results.get("ids") else 0 if num_primary_result_sets == num_queries: for i in range(num_queries): ids = primary_results["ids"][i] metas = primary_results.get("metadatas", [[]] * num_queries)[i] or [] docs = primary_results.get("documents", [[]] * num_queries)[i] or [] dists = primary_results.get("distances", [[]] * num_queries)[i] or [] # Ensure all retrieved lists have the same length for this query result set result_count = len(ids) if not (len(metas) == result_count and len(docs) == result_count and len(dists) == result_count): logger.warning( f"Inconsistent result lengths from primary collection '{primary_collection_name}' for query {i}. Skipping this result set." ) continue # Skip to the next query result set # Initialize combined lists for this query if first time if len(final_results["ids"]) <= i: final_results["ids"].append([]) final_results["metadatas"].append([]) final_results["documents"].append([]) final_results["distances"].append([]) for j in range(result_count): meta = metas[j] if metas[j] is not None else {} meta["source_collection"] = primary_collection_name final_results["ids"][i].append(ids[j]) final_results["metadatas"][i].append(meta) final_results["documents"][i].append(docs[j]) final_results["distances"][i].append(dists[j]) else: logger.warning( f"Primary query result structure mismatch. Expected {num_queries} result sets, got {num_primary_result_sets}." ) # Check if learnings_results and its lists are valid before processing if learnings_results and all(isinstance(learnings_results.get(k), list) for k in include if k != "embeddings"): num_queries = len(query_texts) num_learnings_result_sets = len(learnings_results["ids"]) if learnings_results.get("ids") else 0 if num_learnings_result_sets == num_queries: for i in range(num_queries): ids = learnings_results["ids"][i] metas = learnings_results.get("metadatas", [[]] * num_queries)[i] or [] docs = learnings_results.get("documents", [[]] * num_queries)[i] or [] dists = learnings_results.get("distances", [[]] * num_queries)[i] or [] result_count = len(ids) if not (len(metas) == result_count and len(docs) == result_count and len(dists) == result_count): logger.warning( f"Inconsistent result lengths from learnings collection '{LEARNINGS_COLLECTION_NAME}' for query {i}. Skipping this result set." ) continue # Initialize combined lists for this query if they don't exist yet if len(final_results["ids"]) <= i: final_results["ids"].append([]) final_results["metadatas"].append([]) final_results["documents"].append([]) final_results["distances"].append([]) for j in range(result_count): meta = metas[j] if metas[j] is not None else {} meta["source_collection"] = LEARNINGS_COLLECTION_NAME final_results["ids"][i].append(ids[j]) final_results["metadatas"][i].append(meta) final_results["documents"][i].append(docs[j]) final_results["distances"][i].append(dists[j]) else: logger.warning( f"Learnings query result structure mismatch. Expected {num_queries} result sets, got {num_learnings_result_sets}." ) # Check if any results were gathered at all if not any(final_results.values()): logger.info(f"No results found for query in either collection.") # Return empty but valid structure based on ChromaDB's QueryResult type hints empty_results: QueryResult = { "ids": [[] for _ in query_texts], "embeddings": None, "documents": [[] for _ in query_texts], "metadatas": [[] for _ in query_texts], "distances": [[] for _ in query_texts], } # Serialize using NumpyEncoder to handle potential numpy types if they ever appear result_json = json.dumps(empty_results, cls=NumpyEncoder) return [types.TextContent(type="text", text=result_json)] # TODO: Implement optional re-ranking based on distance or other factors? # TODO: Implement optional truncation if the combined list is too long? # Serialize the final merged results # Ensure all lists within final_results have the same length (number of queries) num_queries = len(query_texts) for key in ["ids", "documents", "metadatas", "distances"]: if len(final_results[key]) < num_queries: # Pad with empty lists if some queries yielded no results from either collection final_results[key].extend([[] for _ in range(num_queries - len(final_results[key]))]) # Construct the final QueryResult dictionary explicitly matching the type hint final_query_result: QueryResult = { "ids": cast(List[List[str]], final_results["ids"]), "embeddings": None, # Not included by default "documents": cast(Optional[List[List[str]]], final_results["documents"]), "metadatas": cast(Optional[List[List[Dict[str, Any]]]], final_results["metadatas"]), "distances": cast(Optional[List[List[float]]], final_results["distances"]), } result_json = json.dumps(final_query_result, cls=NumpyEncoder) return [types.TextContent(type="text", text=result_json)] # Restore filter query implementations async def _query_documents_with_where_filter_impl( input_data: QueryDocumentsWithWhereFilterInput, ) -> List[types.TextContent]: """Implementation for querying documents with a metadata filter.""" logger = get_logger("tools.document.query_where") collection_name = input_data.collection_name query_texts = input_data.query_texts where_str = input_data.where n_results = input_data.n_results # --- Validation --- validate_collection_name(collection_name) if not query_texts: raise McpError(ErrorData(code=INVALID_PARAMS, message="Query texts cannot be empty.")) try: where_filter = json.loads(where_str) if not isinstance(where_filter, dict): raise ValueError("Where filter must be a JSON object (dict).") except (json.JSONDecodeError, ValueError) as e: raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Invalid JSON format or type for 'where' filter: {e}")) # --- End Validation --- logger.info(f"Querying '{collection_name}' with where filter. N_results: {n_results}. Using default includes.") try: client = get_chroma_client() collection = client.get_collection(collection_name) query_result: QueryResult = collection.query( query_texts=query_texts, where=where_filter, n_results=n_results, include=[], # Default include (empty list passes validation) ) logger.debug(f"ChromaDB query result: {query_result}") result_json = json.dumps(query_result, cls=NumpyEncoder) num_result_sets = len(query_result.get("ids") or []) logger.info( f"Query with where filter successful on '{collection_name}', returning {num_result_sets} result sets." ) return [types.TextContent(type="text", text=result_json)] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for query with where filter.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Error querying collection '{collection_name}' with where filter: {e}", exc_info=True) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during query with where filter: {str(e)}", ) ) except Exception as e: logger.error( f"Error querying collection '{collection_name}' with where filter: {e}", exc_info=True, ) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during query with where filter: {str(e)}", ) ) async def _query_documents_with_document_filter_impl( input_data: QueryDocumentsWithDocumentFilterInput, ) -> List[types.TextContent]: """Implementation for querying documents with a document content filter.""" logger = get_logger("tools.document.query_docfilter") collection_name = input_data.collection_name query_texts = input_data.query_texts where_document_str = input_data.where_document n_results = input_data.n_results # --- Validation --- validate_collection_name(collection_name) if not query_texts: raise McpError(ErrorData(code=INVALID_PARAMS, message="Query texts cannot be empty.")) try: where_document_filter = json.loads(where_document_str) if not isinstance(where_document_filter, dict): raise ValueError("Document filter must be a JSON object (dict).") except (json.JSONDecodeError, ValueError) as e: raise McpError( ErrorData( code=INVALID_PARAMS, message=f"Invalid JSON format or type for 'where_document' filter: {e}", ) ) # --- End Validation --- logger.info(f"Querying '{collection_name}' with document filter. N_results: {n_results}. Using default includes.") try: client = get_chroma_client() collection = client.get_collection(collection_name) query_result: QueryResult = collection.query( query_texts=query_texts, where_document=where_document_filter, n_results=n_results, ) logger.debug(f"ChromaDB query result: {query_result}") result_json = json.dumps(query_result, cls=NumpyEncoder) num_result_sets = len(query_result.get("ids") or []) logger.info( f"Query with document filter successful on '{collection_name}', returning {num_result_sets} result sets." ) return [types.TextContent(type="text", text=result_json)] except ValueError as e: if f"Collection {collection_name} does not exist" in str(e): logger.warning(f"Collection '{collection_name}' not found for query with document filter.") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) else: logger.error(f"Error querying collection '{collection_name}' with document filter: {e}", exc_info=True) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during query with document filter: {str(e)}", ) ) except Exception as e: logger.error( f"Error querying collection '{collection_name}' with document filter: {e}", exc_info=True, ) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred during query with document filter: {str(e)}", ) ) # --- Start: New Include Variant Implementations --- # Reusable implementation for Get By IDs with specific includes async def _get_documents_by_ids_include_impl( input_data: GetDocumentsByIdsIncludeInput, include_fields: List[str], tool_name: str ) -> List[types.TextContent]: logger = get_logger(f"tools.document.{tool_name}") collection_name = input_data.collection_name ids = input_data.ids validate_collection_name(collection_name) if not ids: raise McpError(ErrorData(code=INVALID_PARAMS, message="IDs list cannot be empty.")) logger.info(f"Getting documents by {len(ids)} IDs from '{collection_name}'. Including: {include_fields}") try: client = get_chroma_client() collection = client.get_collection(collection_name) get_result: GetResult = collection.get(ids=ids, include=include_fields) logger.debug(f"ChromaDB get result: {get_result}") result_json = json.dumps(get_result, cls=NumpyEncoder) logger.info( f"Successfully retrieved {len(get_result.get('ids', []))} documents by ID from '{collection_name}' (include: {include_fields})." ) return [types.TextContent(type="text", text=result_json)] except ValueError as e: error_str = str(e) if f"Collection {collection_name} does not exist" in error_str: logger.warning(f"Collection '{collection_name}' not found for get by ID ({tool_name}).") raise McpError(ErrorData(code=INVALID_PARAMS, message=f"Collection '{collection_name}' not found.")) elif "You must set a data loader on the collection if loading from URIs" in error_str: logger.warning( f"ValueError getting URIs/data in '{tool_name}' for '{collection_name}': {error_str}", exc_info=False ) # Raise a specific error indicating the cause raise McpError( ErrorData( code=INVALID_PARAMS, # Or a more specific code if available message=f"Failed to get URIs/data: Collection '{collection_name}' does not have a data loader configured. Requested includes: {include_fields}", ) ) else: # Re-raise other ValueErrors logger.error(f"ValueError during get by ID ('{tool_name}') from '{collection_name}': {e}", exc_info=True) raise McpError( ErrorData(code=INTERNAL_ERROR, message=f"Invalid parameter during get ('{tool_name}'): {str(e)}") ) except Exception as e: logger.error(f"Error getting documents by ID ('{tool_name}') from '{collection_name}': {e}", exc_info=True) raise McpError( ErrorData( code=INTERNAL_ERROR, message=f"An unexpected error occurred ('{tool_name}'): {str(e)}", ) ) # --- Implementations for Specific Get Include Variants --- async def _get_documents_by_ids_embeddings_impl( input_data: GetDocumentsByIdsEmbeddingsInput, ) -> List[types.TextContent]: return await _get_documents_by_ids_include_impl( input_data=input_data, include_fields=["embeddings"], tool_name="get_ids_embeddings" ) async def _get_documents_by_ids_all_impl(input_data: GetDocumentsByIdsAllInput) -> List[types.TextContent]: return await _get_documents_by_ids_include_impl( input_data=input_data, # Revert: Request all standard fields for the 'all' variant include_fields=["documents", "metadatas", "embeddings", "uris", "data"], tool_name="get_ids_all", ) # --- End: New Include Variant Implementations ---

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/djm81/chroma_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server