npm-search-mcp-server
by btwiuse
- src
- mcp_memory_service
- storage
"""
MCP Memory Service
Copyright (c) 2024 Heinrich Krupp
Licensed under the MIT License. See LICENSE file in the project root for full license text.
"""
from mcp_memory_service.models.memory import Memory
import chromadb
import json
from chromadb.utils import embedding_functions
from sentence_transformers import SentenceTransformer
import logging
from typing import List, Dict, Any, Tuple, Set, Optional
from datetime import datetime, date
from .base import MemoryStorage
from ..models.memory import Memory, MemoryQueryResult
from ..utils.hashing import generate_content_hash
import mcp.types as types
logger = logging.getLogger(__name__)
class ChromaMemoryStorage(MemoryStorage):
def __init__(self, path: str):
"""Initialize ChromaDB storage with proper embedding function."""
self.path = path
# Initialize sentence transformer first
self.model = SentenceTransformer('all-MiniLM-L6-v2')
# Create embedding function for ChromaDB
self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name='all-MiniLM-L6-v2'
)
# Initialize ChromaDB with new client format
self.client = chromadb.PersistentClient(
path=path
)
# Get or create collection with proper embedding function
try:
self.collection = self.client.get_or_create_collection(
name="memory_collection",
metadata={"hnsw:space": "cosine"},
embedding_function=self.embedding_function
)
logger.info("Collection initialized successfully")
except Exception as e:
logger.error(f"Error initializing collection: {str(e)}")
raise
def sanitized(self, tags):
if tags is None:
return json.dumps([])
# If we get a string, split it into an array
if isinstance(tags, str):
tags = [tag.strip() for tag in tags.split(",") if tag.strip()]
# If we get an array, use it directly
elif isinstance(tags, list):
tags = [str(tag).strip() for tag in tags if str(tag).strip()]
else:
return json.dumps([])
# Return JSON string representation of the array
return json.dumps(tags)
async def store(self, memory: Memory) -> Tuple[bool, str]:
"""Store a memory with proper embedding handling."""
try:
# Check for duplicates
existing = self.collection.get(
where={"content_hash": memory.content_hash}
)
if existing["ids"]:
return False, "Duplicate content detected"
# Format metadata properly
metadata = self._format_metadata_for_chroma(memory)
# Add additional metadata
metadata.update(memory.metadata)
# Generate ID based on content hash
memory_id = memory.content_hash
# Add to collection - embedding will be automatically generated
self.collection.add(
documents=[memory.content],
metadatas=[metadata],
ids=[memory_id]
)
return True, f"Successfully stored memory with ID: {memory_id}"
except Exception as e:
logger.error(f"Error storing memory: {str(e)}")
return False
async def search_by_tag(self, tags: List[str]) -> List[Memory]:
try:
results = self.collection.get(
include=["metadatas", "documents"]
)
memories = []
if results["ids"]:
for i, doc in enumerate(results["documents"]):
memory_meta = results["metadatas"][i]
# Always expect JSON string in storage
try:
stored_tags = json.loads(memory_meta.get("tags", "[]"))
stored_tags = [str(tag).strip() for tag in stored_tags]
except (json.JSONDecodeError, TypeError):
logger.debug(f"Invalid tags format in memory: {memory_meta.get('content_hash')}")
continue
# Normalize search tags
search_tags = [str(tag).strip() for tag in tags if str(tag).strip()]
if any(search_tag in stored_tags for search_tag in search_tags):
memory = Memory(
content=doc,
content_hash=memory_meta["content_hash"],
tags=stored_tags,
memory_type=memory_meta.get("type")
)
memories.append(memory)
return memories
except Exception as e:
logger.error(f"Error searching by tags: {e}")
return []
async def delete_by_tag(self, tag: str) -> Tuple[int, str]:
"""Deletes memories that match the specified tag."""
try:
# Get all the documents from ChromaDB
results = self.collection.get(
include=["metadatas"]
)
ids_to_delete = []
if results["ids"]:
for i, meta in enumerate(results["metadatas"]):
try:
retrieved_tags_string = meta.get("tags", "[]")
retrieved_tags = json.loads(retrieved_tags_string)
except json.JSONDecodeError:
retrieved_tags = []
if tag in retrieved_tags:
ids_to_delete.append(results["ids"][i])
if not ids_to_delete:
return 0, f"No memories found with tag: {tag}"
# Delete memories
self.collection.delete(ids=ids_to_delete)
return len(ids_to_delete), f"Successfully deleted {len(ids_to_delete)} memories with tag: {tag}"
except Exception as e:
logger.error(f"Error deleting memories by tag: {e}")
return 0, f"Error deleting memories by tag: {e}"
async def delete(self, content_hash: str) -> Tuple[bool, str]:
"""Delete a memory by its hash."""
try:
# First check if the memory exists
existing = self.collection.get(
where={"content_hash": content_hash}
)
if not existing["ids"]:
return False, f"No memory found with hash {content_hash}"
# Delete the memory
self.collection.delete(
where={"content_hash": content_hash}
)
return True, f"Successfully deleted memory with hash {content_hash}"
except Exception as e:
logger.error(f"Error deleting memory: {str(e)}")
return False, f"Error deleting memory: {str(e)}"
async def cleanup_duplicates(self) -> Tuple[int, str]:
"""Remove duplicate memories based on content hash."""
try:
# Get all memories
results = self.collection.get()
if not results["ids"]:
return 0, "No memories found in database"
# Track seen hashes and duplicates
seen_hashes: Set[str] = set()
duplicates = []
for i, metadata in enumerate(results["metadatas"]):
content_hash = metadata.get("content_hash")
if not content_hash:
# Generate hash if missing
content_hash = generate_content_hash(results["documents"][i], metadata)
if content_hash in seen_hashes:
duplicates.append(results["ids"][i])
else:
seen_hashes.add(content_hash)
# Delete duplicates if found
if duplicates:
self.collection.delete(
ids=duplicates
)
return len(duplicates), f"Successfully removed {len(duplicates)} duplicate memories"
return 0, "No duplicate memories found"
except Exception as e:
logger.error(f"Error cleaning up duplicates: {str(e)}")
return 0, f"Error cleaning up duplicates: {str(e)}"
async def recall(self, n_results: int = 5, start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None) -> List[MemoryQueryResult]:
"""Retrieve memories within a timestamp range."""
try:
where_clause = {}
if start_timestamp is not None and end_timestamp is not None:
where_clause = {
"$and": [
{"timestamp": {"$gte": start_timestamp}},
{"timestamp": {"$lte": end_timestamp}}
]
}
results = self.collection.get(
where=where_clause,
limit=n_results,
include=["metadatas", "documents"]
)
memory_results = []
for i in range(len(results["ids"])):
metadata = results["metadatas"][i]
try:
retrieved_tags = json.loads(metadata.get("tags", "[]"))
except json.JSONDecodeError:
retrieved_tags = []
memory = Memory(
content=results["documents"][i],
content_hash=metadata["content_hash"],
tags=retrieved_tags,
memory_type=metadata.get("type", ""),
timestamp=metadata.get("timestamp"),
metadata={k: v for k, v in metadata.items() if k not in ["type", "content_hash", "tags", "timestamp"]}
)
memory_results.append(MemoryQueryResult(memory))
return memory_results
except Exception as e:
logger.error(f"Error retrieving memories: {str(e)}")
return []
async def delete_by_timeframe(self, start_date: date, end_date: Optional[date] = None, tag: Optional[str] = None) -> Tuple[int, str]:
"""Delete memories within a timeframe and optionally filtered by tag."""
try:
if end_date is None:
end_date = start_date
start_datetime = datetime(start_date.year, start_date.month, start_date.day, 0, 0, 0)
end_datetime = datetime(end_date.year, end_date.month, end_date.day, 23, 59, 59)
start_timestamp = start_datetime.timestamp()
end_timestamp = end_datetime.timestamp()
where_clause = {
"$and": [
{"timestamp": {"$gte": start_timestamp}},
{"timestamp": {"$lte": end_timestamp}}
]
}
results = self.collection.get(include=["metadatas"], where=where_clause)
ids_to_delete = []
if results.get("ids"):
for i, meta in enumerate(results["metadatas"]):
try:
retrieved_tags = json.loads(meta.get("tags", "[]"))
except json.JSONDecodeError:
retrieved_tags = []
if tag is None or tag in retrieved_tags:
ids_to_delete.append(results["ids"][i])
if not ids_to_delete:
return 0, "No memories found matching the criteria."
self.collection.delete(ids=ids_to_delete)
return len(ids_to_delete), None
except Exception as e:
logger.exception("Error deleting memories by timeframe:")
return 0, str(e)
async def delete_before_date(self, before_date: date, tag: Optional[str] = None) -> Tuple[int, str]:
"""Delete memories before a given date and optionally filtered by tag."""
try:
before_datetime = datetime(before_date.year, before_date.month, before_date.day, 23, 59, 59)
before_timestamp = before_datetime.timestamp()
where_clause = {"timestamp": {"$lt": before_timestamp}}
results = self.collection.get(include=["metadatas"], where=where_clause)
ids_to_delete = []
if results.get("ids"):
for i, meta in enumerate(results["metadatas"]):
try:
retrieved_tags = json.loads(meta.get("tags", "[]"))
except json.JSONDecodeError:
retrieved_tags = []
if tag is None or tag in retrieved_tags:
ids_to_delete.append(results["ids"][i])
if not ids_to_delete:
return 0, "No memories found matching the criteria."
self.collection.delete(ids=ids_to_delete)
return len(ids_to_delete), None
except Exception as e:
logger.exception("Error deleting memories before date:")
return 0, str(e)
def _format_metadata_for_chroma(self, memory: Memory) -> Dict[str, Any]:
"""Format metadata to be compatible with ChromaDB requirements."""
metadata = {
"content_hash": memory.content_hash,
"memory_type": memory.memory_type if memory.memory_type else "",
"timestamp": str(memory.timestamp.timestamp())
}
# Properly serialize tags
if memory.tags:
if isinstance(memory.tags, list):
metadata["tags"] = json.dumps([str(tag).strip() for tag in memory.tags if str(tag).strip()])
elif isinstance(memory.tags, str):
tags = [tag.strip() for tag in memory.tags.split(",") if tag.strip()]
metadata["tags"] = json.dumps(tags)
else:
metadata["tags"] = "[]"
# Add any additional metadata
for key, value in memory.metadata.items():
if isinstance(value, (str, int, float, bool)):
metadata[key] = value
return metadata
async def retrieve(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
"""Retrieve memories using semantic search."""
try:
# Query using the embedding function
results = self.collection.query(
query_texts=[query],
n_results=n_results,
include=["documents", "metadatas", "distances"]
)
if not results["ids"] or not results["ids"][0]:
return []
memory_results = []
for i in range(len(results["ids"][0])):
metadata = results["metadatas"][0][i]
# Reconstruct memory object
memory = Memory(
content=results["documents"][0][i],
content_hash=metadata["content_hash"],
tags=metadata.get("tags", []),
memory_type=metadata.get("memory_type", ""),
)
# Calculate cosine similarity from distance
distance = results["distances"][0][i]
similarity = 1 - distance
memory_results.append(MemoryQueryResult(memory, similarity))
return memory_results
except Exception as e:
logger.error(f"Error retrieving memories: {str(e)}")
return []