#!/usr/bin/env python3
"""
Chroma MCP Server - Exposes Chroma vector database operations as MCP tools
This server provides a comprehensive set of tools for interacting with Chroma databases,
including collection management, document operations, and vector similarity search.
"""
import os
import json
import asyncio
import logging
import warnings
from typing import Any, Dict, List, Optional, Union
from contextlib import asynccontextmanager
from fastmcp import FastMCP
# Load environment from .env if available (before reading env vars)
try:
from dotenv import load_dotenv # type: ignore
load_dotenv()
except Exception:
pass
from pydantic import BaseModel, Field
import chromadb
from chromadb.config import Settings
from chromadb.api.models.Collection import Collection
# Optional embedding function imports
try:
import openai
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SENTENCE_TRANSFORMERS_AVAILABLE = False
try:
import cohere
COHERE_AVAILABLE = True
except ImportError:
COHERE_AVAILABLE = False
try:
from InstructorEmbedding import INSTRUCTOR
INSTRUCTOR_AVAILABLE = True
except ImportError:
INSTRUCTOR_AVAILABLE = False
# Configure logging early so our INFO lines are visible
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="[%(asctime)s] %(levelname)-7s %(message)s",
datefmt="%m/%d/%y %H:%M:%S",
)
# Suppress websockets deprecation warnings (websockets.legacy is deprecated in websockets 14.0+)
warnings.filterwarnings("ignore", category=DeprecationWarning, module="websockets.legacy")
warnings.filterwarnings("ignore", category=DeprecationWarning, module="uvicorn.protocols.websockets.websockets_impl")
logger = logging.getLogger(__name__)
# Check storage backend
try:
import duckdb
DUCKDB_AVAILABLE = True
logger.info("ChromaDB storage backend: DuckDB (vector storage)")
except ImportError:
DUCKDB_AVAILABLE = False
logger.info("ChromaDB storage backend: SQLite (vector storage, fallback)")
# Global ChromaDB client
chroma_client = None
# Default embedding configuration - optimized for LangChain compatibility
DEFAULT_EMBEDDING_CONFIG = {
"provider": "sentence-transformers",
"model": "all-MiniLM-L6-v2",
"api_key": None # For OpenAI, Cohere or other API-based providers
}
def summarize_chroma_config() -> str:
"""Return a one-line summary of the Chroma storage mode and key parameters."""
host = os.getenv("CHROMA_HOST", "localhost")
port_env = os.getenv("CHROMA_PORT")
persist_directory = os.getenv("CHROMA_PERSIST_DIRECTORY")
chroma_mode = os.getenv("CHROMA_MODE", "http") # http, persistent, or memory
if chroma_mode == "persistent" or persist_directory:
path = persist_directory or "./chroma_db"
return f"Chroma storage mode: persistent (local). persist_directory={path}"
elif chroma_mode == "memory":
return "Chroma storage mode: memory (ephemeral)"
else:
port = (port_env.strip() if port_env and port_env.strip() else "8000")
return f"Chroma storage mode: http. host={host}, port={port}"
def make_serializable(obj: Any) -> Any:
"""Convert objects to JSON-serializable types"""
if isinstance(obj, dict):
return {k: make_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [make_serializable(item) for item in obj]
elif isinstance(obj, tuple):
return tuple(make_serializable(item) for item in obj)
elif hasattr(obj, '__dict__'):
# For objects with __dict__, convert to dict
return make_serializable(obj.__dict__)
else:
# For other types, try str() if not basic
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
else:
return str(obj)
def create_embedding_function(config: Optional[Dict[str, Any]] = None) -> Any:
"""
Create an embedding function based on configuration.
Args:
config: Embedding function configuration
Returns:
Embedding function instance or None for default ChromaDB behavior
"""
if not config:
config = DEFAULT_EMBEDDING_CONFIG
provider = config.get("provider", "sentence-transformers")
model = config.get("model", "all-MiniLM-L6-v2")
api_key = config.get("api_key")
if provider == "openai" and OPENAI_AVAILABLE:
if not api_key:
api_key = os.getenv("OPENAI_API_KEY")
if api_key:
logger.info(f"Using OpenAI embeddings with model: {model}")
return chromadb.utils.embedding_functions.OpenAIEmbeddingFunction(
api_key=api_key,
model_name=model
)
else:
logger.warning("OpenAI API key not provided")
elif provider == "sentence-transformers" and SENTENCE_TRANSFORMERS_AVAILABLE:
logger.info(f"Using SentenceTransformer embeddings with model: {model}")
return chromadb.utils.embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=model
)
elif provider == "cohere" and COHERE_AVAILABLE:
if not api_key:
api_key = os.getenv("COHERE_API_KEY")
if api_key:
logger.info(f"Using Cohere embeddings with model: {model}")
return chromadb.utils.embedding_functions.CohereEmbeddingFunction(
api_key=api_key,
model_name=model
)
else:
logger.warning("Cohere API key not provided")
elif provider == "instructor" and INSTRUCTOR_AVAILABLE:
logger.info(f"Using Instructor embeddings with model: {model}")
return chromadb.utils.embedding_functions.InstructorEmbeddingFunction(
model_name=model
)
elif provider == "huggingface":
# For HuggingFace models, you might need transformers and torch installed
try:
logger.info(f"Using HuggingFace embeddings with model: {model}")
return chromadb.utils.embedding_functions.HuggingFaceEmbeddingFunction(
model_name=model
)
except AttributeError:
logger.warning(f"HuggingFace embedding function not available for model: {model}")
# Default: let ChromaDB handle embeddings (no embedding function)
logger.info(f"Using default ChromaDB embeddings (no specific model) for provider: {provider}")
return None
class EmbeddingConfig(BaseModel):
"""Embedding function configuration"""
provider: str = Field(default="sentence-transformers", description="Embedding provider (sentence-transformers, openai, cohere, instructor, huggingface)")
model: str = Field(default="all-MiniLM-L6-v2", description="Model name")
api_key: Optional[str] = Field(default=None, description="API key for provider")
class ChromaConfig(BaseModel):
"""Configuration for ChromaDB connection"""
host: str = Field(default="localhost", description="ChromaDB host")
port: int = Field(default=8000, description="ChromaDB port")
persist_directory: Optional[str] = Field(default=None, description="Persistent directory for local ChromaDB")
ssl: bool = Field(default=False, description="Use SSL connection")
class Document(BaseModel):
"""Document model for adding to collections"""
id: str = Field(description="Unique document identifier")
content: str = Field(description="Document content/text")
metadata: Optional[Dict[str, Any]] = Field(default=None, description="Document metadata")
embedding: Optional[List[float]] = Field(default=None, description="Document embedding vector")
class QueryRequest(BaseModel):
"""Query request model"""
collection_name: str = Field(description="Name of the collection to query")
query_texts: Optional[List[str]] = Field(default=None, description="Text queries")
query_embeddings: Optional[List[List[float]]] = Field(default=None, description="Embedding queries")
n_results: int = Field(default=10, description="Number of results to return")
where: Optional[Dict[str, Any]] = Field(default=None, description="Metadata filters")
where_document: Optional[Dict[str, Any]] = Field(default=None, description="Document content filters")
class UpdateRequest(BaseModel):
"""Update request model"""
collection_name: str = Field(description="Name of the collection")
document_id: str = Field(description="Document ID to update")
content: Optional[str] = Field(default=None, description="New content")
metadata: Optional[Dict[str, Any]] = Field(default=None, description="New metadata")
embedding: Optional[List[float]] = Field(default=None, description="New embedding")
# Create FastMCP server
mcp = FastMCP("ChromaDB MCP Server")
@asynccontextmanager
async def get_chroma_client():
"""Get or create ChromaDB client"""
global chroma_client
if chroma_client is None:
# Try to get configuration from environment or use defaults
host = os.getenv("CHROMA_HOST", "localhost")
port_env = os.getenv("CHROMA_PORT")
persist_directory = os.getenv("CHROMA_PERSIST_DIRECTORY")
chroma_mode = os.getenv("CHROMA_MODE", "http") # http, persistent, or memory
if chroma_mode == "persistent" or persist_directory:
# Use persistent local client (no network port needed)
path = persist_directory or "./chroma_db"
chroma_client = chromadb.PersistentClient(path=path)
logger.info(
f"Connected to ChromaDB persistent client. mode=persistent, persist_directory={path}"
)
elif chroma_mode == "memory":
# Use in-memory client (no network port or persistence)
chroma_client = chromadb.EphemeralClient()
logger.info("Connected to ChromaDB in-memory client. mode=memory")
elif port_env and port_env.strip():
# Use HTTP client with specified port
port = int(port_env)
chroma_client = chromadb.HttpClient(host=host, port=port)
logger.info(
f"Connected to ChromaDB HTTP client. mode=http, host={host}, port={port}"
)
else:
# Default: use HTTP client with default port
chroma_client = chromadb.HttpClient(host=host, port=8000)
logger.info(
f"Connected to ChromaDB HTTP client. mode=http, host={host}, port=8000 (default)"
)
try:
yield chroma_client
except Exception as e:
logger.error(f"Error with ChromaDB client: {e}")
raise
@mcp.tool()
async def list_collections() -> str:
"""
List all collections in the ChromaDB database.
Returns:
JSON string containing list of collection names and their metadata
"""
logger.info("Calling tool: list_collections")
async with get_chroma_client() as client:
try:
collections = client.list_collections()
result = []
for collection in collections:
collection_info = {
"name": collection.name,
"id": str(collection.id),
"metadata": make_serializable(collection.metadata)
}
result.append(collection_info)
return json.dumps(result, indent=2)
except Exception as e:
return f"Error listing collections: {str(e)}"
@mcp.tool()
async def create_collection(
name: str,
metadata: Optional[Dict[str, Any]] = None,
embedding_config: Optional[EmbeddingConfig] = None
) -> str:
"""
Create a new collection in ChromaDB.
Args:
name: Name of the collection to create
metadata: Optional metadata for the collection
embedding_config: Optional embedding function configuration
Returns:
Success message or error description
"""
logger.info(f"Calling tool: create_collection with name={name}")
async with get_chroma_client() as client:
try:
# Create embedding function if configured
embedding_function = None
if embedding_config:
embedding_function = create_embedding_function(embedding_config.dict())
elif os.getenv("DEFAULT_EMBEDDING_PROVIDER"):
# Use environment variable for default embedding
config = DEFAULT_EMBEDDING_CONFIG.copy()
config["provider"] = os.getenv("DEFAULT_EMBEDDING_PROVIDER", "sentence-transformers")
config["model"] = os.getenv("DEFAULT_EMBEDDING_MODEL", "all-MiniLM-L6-v2")
embedding_function = create_embedding_function(config)
# Create collection with or without embedding function
collection = client.create_collection(
name=name,
metadata=metadata or {},
embedding_function=embedding_function
)
embedding_info = f" with {embedding_config.provider} embedding" if embedding_config else " with default embedding"
return f"Successfully created collection '{name}'{embedding_info}"
except Exception as e:
return f"Error creating collection '{name}': {str(e)}"
@mcp.tool()
async def add_documents(
collection_name: str,
documents: List[Document]
) -> str:
"""
Add documents to a collection.
Args:
collection_name: Name of the collection
documents: List of Document objects to add
Returns:
Success message or error description
"""
logger.info(f"Calling tool: add_documents to collection={collection_name} with {len(documents)} documents")
async with get_chroma_client() as client:
try:
collection = client.get_collection(name=collection_name)
# Extract data from documents
ids = [doc.id for doc in documents]
contents = [doc.content for doc in documents]
metadatas = [doc.metadata for doc in documents]
embeddings = [doc.embedding for doc in documents]
# Filter out None embeddings
if all(emb is None for emb in embeddings):
embeddings = None
collection.add(
documents=contents,
metadatas=metadatas,
ids=ids,
embeddings=embeddings
)
return f"Successfully added {len(documents)} documents to collection '{collection_name}'"
except Exception as e:
return f"Error adding documents to collection '{collection_name}': {str(e)}"
@mcp.tool()
async def query_collection(
collection_name: str,
query_texts: Optional[List[str]] = None,
query_embeddings: Optional[List[List[float]]] = None,
n_results: int = 10,
where: Optional[Dict[str, Any]] = None,
where_document: Optional[Dict[str, Any]] = None
) -> str:
"""
Query a collection for similar documents.
Args:
collection_name: Name of the collection to query
query_texts: Text queries (optional)
query_embeddings: Embedding queries (optional)
n_results: Number of results to return (default: 10)
where: Metadata filters (optional)
where_document: Document content filters (optional)
Returns:
JSON string with query results
"""
logger.info(f"Calling tool: query_collection on collection={collection_name} with n_results={n_results}")
async with get_chroma_client() as client:
try:
collection = client.get_collection(name=collection_name)
# Perform query
if query_texts:
results = collection.query(
query_texts=query_texts,
n_results=n_results,
where=where,
where_document=where_document
)
elif query_embeddings:
results = collection.query(
query_embeddings=query_embeddings,
n_results=n_results,
where=where,
where_document=where_document
)
else:
return "Error: Must provide either query_texts or query_embeddings"
# Format results
formatted_results = {
"collection": collection_name,
"results": []
}
# Handle embeddings if they exist
emb_list = results['embeddings'][0] if results['embeddings'] else [None] * len(results['ids'][0])
for i, (doc_id, metadata, content, embedding) in enumerate(zip(
results['ids'][0],
results['metadatas'][0],
results['documents'][0],
emb_list
)):
result_item = {
"rank": i + 1,
"id": doc_id,
"content": content,
"metadata": metadata,
"embedding": embedding,
"distance": results['distances'][0][i] if 'distances' in results and results['distances'] else None
}
formatted_results["results"].append(result_item)
return json.dumps(formatted_results, indent=2)
except Exception as e:
return f"Error querying collection '{collection_name}': {str(e)}"
@mcp.tool()
async def update_document(update: UpdateRequest) -> str:
"""
Update a document in a collection.
Args:
update: UpdateRequest object with update parameters
Returns:
Success message or error description
"""
logger.info(f"Calling tool: update_document on document_id={update.document_id} in collection={update.collection_name}")
async with get_chroma_client() as client:
try:
collection = client.get_collection(name=update.collection_name)
# Update document
collection.update(
ids=[update.document_id],
documents=[update.content] if update.content else None,
metadatas=[update.metadata] if update.metadata else None,
embeddings=[update.embedding] if update.embedding else None
)
return f"Successfully updated document '{update.document_id}' in collection '{update.collection_name}'"
except Exception as e:
return f"Error updating document '{update.document_id}' in collection '{update.collection_name}': {str(e)}"
@mcp.tool()
async def delete_documents(collection_name: str, document_ids: List[str]) -> str:
"""
Delete documents from a collection.
Args:
collection_name: Name of the collection
document_ids: List of document IDs to delete
Returns:
Success message or error description
"""
logger.info(f"Calling tool: delete_documents from collection={collection_name} with ids={document_ids}")
async with get_chroma_client() as client:
try:
collection = client.get_collection(name=collection_name)
collection.delete(ids=document_ids)
return f"Successfully deleted {len(document_ids)} documents from collection '{collection_name}'"
except Exception as e:
return f"Error deleting documents from collection '{collection_name}': {str(e)}"
@mcp.tool()
async def get_document(collection_name: str, document_id: str) -> str:
"""
Get a specific document from a collection.
Args:
collection_name: Name of the collection
document_id: ID of the document to retrieve
Returns:
JSON string with document information or error message
"""
logger.info(f"Calling tool: get_document document_id={document_id} from collection={collection_name}")
async with get_chroma_client() as client:
try:
collection = client.get_collection(name=collection_name)
result = collection.get(ids=[document_id])
if not result['ids']:
return f"Document '{document_id}' not found in collection '{collection_name}'"
document_info = {
"id": result['ids'][0],
"content": result['documents'][0],
"metadata": result['metadatas'][0] if result['metadatas'][0] else {},
"embedding": result['embeddings'][0] if result['embeddings'] else None
}
return json.dumps(document_info, indent=2)
except Exception as e:
return f"Error getting document '{document_id}' from collection '{collection_name}': {str(e)}"
@mcp.tool()
async def collection_stats(collection_name: str) -> str:
"""
Get statistics for a collection.
Args:
collection_name: Name of the collection
Returns:
JSON string with collection statistics
"""
logger.info(f"Calling tool: collection_stats for collection={collection_name}")
async with get_chroma_client() as client:
try:
collection = client.get_collection(name=collection_name)
# Get basic stats
count = collection.count()
# Try to get more detailed stats if available
stats = {
"name": collection.name,
"document_count": count,
"metadata": collection.metadata
}
return json.dumps(stats, indent=2)
except Exception as e:
return f"Error getting stats for collection '{collection_name}': {str(e)}"
@mcp.tool()
async def peek_collection(collection_name: str, limit: int = 10) -> str:
"""
Peek at the first few documents in a collection without querying.
Args:
collection_name: Name of the collection
limit: Maximum number of documents to peek (default: 10)
Returns:
JSON string with documents or error message
"""
logger.info(f"Calling tool: peek_collection for collection={collection_name} with limit={limit}")
async with get_chroma_client() as client:
try:
collection = client.get_collection(name=collection_name)
result = collection.peek(limit=limit)
if not result['ids']:
return f"No documents found in collection '{collection_name}'"
documents = []
for i, (doc_id, metadata, content) in enumerate(zip(
result['ids'],
result['metadatas'],
result['documents']
)):
doc_info = {
"id": doc_id,
"content": content,
"metadata": metadata if metadata else {}
}
documents.append(doc_info)
peek_info = {
"collection": collection_name,
"total_documents": len(documents),
"documents": documents
}
return json.dumps(peek_info, indent=2)
except Exception as e:
return f"Error peeking collection '{collection_name}': {str(e)}"
def main():
"""Main function to run the MCP server"""
import argparse
from fastmcp import FastMCP
# Set up argument parser
parser = argparse.ArgumentParser(description='ChromaDB MCP Server',
usage='%(prog)s [transport] [host] [port] [options]')
# Positional arguments (for backward compatibility)
parser.add_argument('transport', nargs='?', default='stdio',
choices=['stdio', 'http', 'sse'],
help='Transport mode (default: stdio)')
parser.add_argument('host', nargs='?', default=None,
help='Host to bind to (default: 0.0.0.0)')
parser.add_argument('port', nargs='?', type=int, default=None,
help='Port to bind to (default: 8000 for http/sse)')
# Optional arguments (take precedence over positional)
parser.add_argument('--host', dest='host_opt', default=None,
help='Host to bind to (overrides positional arg)')
parser.add_argument('--port', dest='port_opt', type=int, default=None,
help='Port to bind to (overrides positional arg)')
# Chroma backend overrides (CLI should overrule .env)
parser.add_argument('--chroma-mode', dest='chroma_mode', choices=['http', 'persistent', 'memory'], default=None,
help='Chroma backend mode (http | persistent | memory)')
parser.add_argument('--chroma-host', dest='chroma_host', default=None,
help='Chroma HTTP host (when mode=http)')
parser.add_argument('--chroma-port', dest='chroma_port', type=str, default=None,
help='Chroma HTTP port (when mode=http)')
parser.add_argument('--chroma-persist-directory', dest='chroma_persist', default=None,
help='Chroma persistent directory (when mode=persistent)')
# Parse known args, leave the rest for FastMCP
args, _ = parser.parse_known_args()
# Apply Chroma overrides via environment so the client factory picks them up
if args.chroma_mode is not None:
os.environ['CHROMA_MODE'] = args.chroma_mode
if args.chroma_host is not None:
os.environ['CHROMA_HOST'] = args.chroma_host
if args.chroma_port is not None:
os.environ['CHROMA_PORT'] = str(args.chroma_port)
if args.chroma_persist is not None:
os.environ['CHROMA_PERSIST_DIRECTORY'] = args.chroma_persist
# Set final host and port (prefer explicit --host/--port over positional args)
host = args.host_opt or args.host or '0.0.0.0'
port = args.port_opt or args.port or 8000
# Log startup information (clean formatting, no heavy separators)
logger.info(f"Server: ChromaDB MCP Server | Transport: {args.transport.upper()}")
if args.transport in ['http', 'sse']:
endpoint = f"http://{host}:{port}"
if args.transport == 'sse':
endpoint += "/sse"
logger.info(f"Endpoint: {endpoint}")
# Show ChromaDB configuration (concise)
config_summary = summarize_chroma_config()
logger.info(f"Chroma backend: {config_summary}")
# Show default embedding configuration
default_provider = DEFAULT_EMBEDDING_CONFIG["provider"]
default_model = DEFAULT_EMBEDDING_CONFIG["model"]
logger.info(f"Default embedding: {default_provider} ({default_model})")
# Run the server with specified transport
if args.transport == 'stdio':
mcp.run(transport=args.transport)
else:
mcp.run(transport=args.transport, host=host, port=port)
async def run_stdio_server():
"""Run the MCP server in stdio mode (legacy method)"""
# This is the old way - now we use mcp.run() directly
logger.info("Starting ChromaDB MCP Server in stdio mode...")
if __name__ == "__main__":
main()