npm-search-mcp-server
by btwiuse
- src
import chromadb # pip install chromadb
import json # pip install json
import hashlib # pip install hashlib
import logging # pip install logging
import asyncio # pip install asyncio
import os # pip install os
from typing import List, Tuple, Dict, Optional, Any # pip install typing
from dataclasses import dataclass # pip install dataclasses
from sentence_transformers import SentenceTransformer # pip install sentence-transformers
from chromadb.utils import embedding_functions # pip install chromadb
from mcp_memory_service.models.memory import Memory,MemoryQueryResult # pip install mcp-memory-service
import shutil # For removing the storage directory
os.environ["CHROMA_DISABLE_TELEMETRY"] = "1"
import time # pip install time
start_timestamp = time.time() # Current time
end_timestamp = start_timestamp + 60 * 60 # One hour from now
# Or for specific dates:
from datetime import datetime, date, timedelta # pip install datetime
def create_datetime(year, month, day, hour, minute, second):
return datetime(year, month, day, hour, minute, second)
# Create datetime objects for January 1 and December 31 of the current year
dt = create_datetime(datetime.now().year, 1, 1, 12, 0, 0) # January 1, current year, 12:00:00
start_timestamp = dt.timestamp()
dt2 = create_datetime(datetime.now().year, 12, 31, 12, 0, 0) # December 31, current year, 12:00:00
end_timestamp = dt2.timestamp()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Memory:
content: str
content_hash: str
tags: List[str]
memory_type: str
metadata: Optional[Dict[str, Any]] = None
timestamp: float = time.time() # Add timestamp with default current time
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
# Ensure metadata is JSON serializable
self.metadata = {
key: (value.isoformat() if isinstance(value, date) else value)
for key, value in self.metadata.items()
}
@dataclass
class MemoryQueryResult:
memory: Memory
similarity: Optional[float] = None
def generate_content_hash(content: str, metadata: Dict[str, Any]) -> str:
"""Generates a SHA-256 hash of the content and metadata."""
# Convert non-serializable fields in metadata to serializable formats
serializable_metadata = {
key: (value.isoformat() if isinstance(value, date) else value)
for key, value in metadata.items()
}
combined = content + json.dumps(serializable_metadata, sort_keys=True) # Sort keys for consistent hashing
return hashlib.sha256(combined.encode()).hexdigest()
class ChromaMemoryStorage:
def __init__(self, path: str = "chroma_db"):
"""Initializes ChromaDB storage."""
self.path = path
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name='all-MiniLM-L6-v2'
)
# Attention!!! Remove the storage directory before initializing
shutil.rmtree(self.path, ignore_errors=True)
try:
self.client = chromadb.PersistentClient(path=self.path)
self.collection = self.client.get_or_create_collection(
name="memory_collection",
metadata={"hnsw:space": "cosine"},
embedding_function=self.embedding_function
)
logger.info("ChromaDB collection initialized successfully.")
except Exception as e:
logger.exception("Error initializing ChromaDB:")
raise
def sanitize_tags(self, tags: Optional[Any]) -> List[str]:
"""Sanitizes tags to ensure they are a list of non-empty strings."""
if tags is None:
return []
if isinstance(tags, str):
tags = [tags]
elif not isinstance(tags, list):
logger.warning(f"Invalid tag format: {type(tags)}. Expected string or list.")
return []
sanitized_tags = [str(tag).strip() for tag in tags if str(tag).strip()]
return sanitized_tags
def _format_metadata_for_chroma(self, memory: Memory) -> Dict[str, Any]:
"""Formats metadata for ChromaDB."""
metadata = {
"type": memory.memory_type,
"content_hash": memory.content_hash,
"tags": json.dumps(memory.tags) if memory.tags else None, # Store tags as list in ChromaDB
"timestamp": memory.timestamp, # Include the timestamp in the metadata
}
metadata.update(memory.metadata)
return metadata
@staticmethod
def get_timestamp_range(start_date: date, end_date: Optional[date] = None) -> Dict[str, Any]:
"""
Generates a timestamp range for querying based on start and end dates.
Args:
start_date: The start date (mandatory).
end_date: The end date (optional). If None, it defaults to the end of the start date.
Returns:
A dictionary representing the where clause for ChromaDB, or an empty dictionary if start_date is invalid.
"""
if not isinstance(start_date, date):
raise TypeError("start_date must be a datetime.date object")
if end_date is not None and not isinstance(end_date, date):
raise TypeError("end_date must be a datetime.date object or None")
if end_date is None:
end_date = start_date
start_datetime = datetime(start_date.year, start_date.month, start_date.day, 0, 0, 0) # Start of the day
end_datetime = datetime(end_date.year, end_date.month, end_date.day, 23, 59, 59) # End of the day
start_timestamp = start_datetime.timestamp()
end_timestamp = end_datetime.timestamp()
where_clause = {
"$and": [
{"timestamp": {"$gte": start_timestamp}},
{"timestamp": {"$lte": end_timestamp}}
]
}
return where_clause
async def store(self, memory: Memory) -> Tuple[bool, Optional[str]]:
"""Stores a memory in ChromaDB."""
try:
existing = self.collection.get(where={"content_hash": memory.content_hash})
if existing["ids"]:
return False, "Duplicate content detected."
metadata = self._format_metadata_for_chroma(memory)
self.collection.add(
documents=[memory.content],
metadatas=[metadata],
ids=[memory.content_hash]
)
return True, None
except Exception as e:
logger.exception("Error storing memory:")
return False, str(e)
async def search_by_tag(self, tags: List[str]) -> List[Memory]:
"""Retrieves memories matching any of the specified tags."""
try:
results = self.collection.get(include=["metadatas", "documents"])
memories = []
for i, doc in enumerate(results["documents"]):
metadata = results["metadatas"][i]
logger.debug(f"Retrieved memory timestamp: {metadata.get('timestamp')}")
try:
retrieved_tags = json.loads(metadata.get("tags", "[]"))
except json.JSONDecodeError:
retrieved_tags = []
logger.warning(f"Invalid JSON in tags metadata: {metadata.get('tags')}")
if any(tag in retrieved_tags for tag in tags):
mem = Memory(
content=doc,
content_hash=metadata["content_hash"],
tags=retrieved_tags,
memory_type=metadata.get("type"),
metadata={k: v for k, v in metadata.items() if k not in ["type", "content_hash", "tags"]}
)
memories.append(mem)
return memories
except Exception as e:
logger.exception("Error searching by tags:")
return []
async def delete_by_tag(self, tag: str) -> Tuple[int, Optional[str]]:
"""Deletes memories with the specified tag."""
try:
results = self.collection.get(include=["metadatas"])
ids_to_delete = []
for i, meta in enumerate(results["metadatas"]):
try:
retrieved_tags = json.loads(meta.get("tags", "[]"))
except json.JSONDecodeError:
retrieved_tags = []
logger.warning(f"Invalid JSON in tags metadata: {meta.get('tags')}")
# The crucial fix is here: compare with elements of the list
if tag in retrieved_tags:
ids_to_delete.append(results["ids"][i])
if not ids_to_delete:
return 0, f"No memories found with tag: {tag}"
self.collection.delete(ids=ids_to_delete)
return len(ids_to_delete), None
except Exception as e:
logger.exception("Error deleting memories by tag:")
return 0, str(e)
async def recall(self, n_results: int = 5, start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None) -> List["MemoryQueryResult"]:
try:
where_clause = {}
if start_timestamp is not None and end_timestamp is not None:
where_clause = {
"$and": [
{"timestamp": {"$gte": start_timestamp}},
{"timestamp": {"$lte": end_timestamp}}
]
}
logger.info(f"Where clause: {where_clause}")
results = self.collection.get(
where=where_clause,
limit=n_results,
include=["metadatas", "documents"] # Removed "distances" here
)
if not results.get("ids") or not results["ids"]:
logger.info("No memories found matching the criteria.")
return []
memory_results = []
for i in range(len(results["ids"])):
metadata = results["metadatas"][i]
try:
retrieved_tags = json.loads(metadata.get("tags", "[]"))
logger.debug(f"Retrieved memory timestamp: {metadata.get('timestamp')}")
except json.JSONDecodeError:
retrieved_tags = []
logger.warning(f"Invalid JSON in tags metadata: {metadata.get('tags')}")
memory = Memory(
content=results["documents"][i],
content_hash=metadata["content_hash"],
tags=retrieved_tags,
memory_type=metadata.get("type", ""),
timestamp=metadata.get("timestamp"),
metadata={k: v for k, v in metadata.items() if k not in ["type", "content_hash", "tags", "timestamp"]}
)
memory_results.append(MemoryQueryResult(memory)) # Removed similarity here
return memory_results
except Exception as e:
logger.error(f"Error retrieving memories: {str(e)}")
return []
async def delete_by_timeframe(self, start_date: Optional[datetime.date] = None, end_date: Optional[datetime.date] = None, tag: Optional[str] = None) -> Tuple[int, Optional[str]]:
"""Deletes memories within a timeframe and optionally filtered by tag."""
try:
if start_date is None:
raise ValueError("start_date is mandatory")
where_clause = self.get_timestamp_range(start_date, end_date)
logger.debug(f"Where clause: {where_clause}")
results = self.collection.get(include=["metadatas"], where=where_clause)
ids_to_delete = []
if results.get("ids"): # Check if there are any results before iterating
for i, meta in enumerate(results["metadatas"]):
try:
retrieved_tags = json.loads(meta.get("tags", "[]"))
except json.JSONDecodeError:
retrieved_tags = []
logger.warning(f"Invalid JSON in tags metadata: {meta.get('tags')}")
if tag is None or tag in retrieved_tags: # Check if tag is None or if the tag is in the retrieved tags
ids_to_delete.append(results["ids"][i])
if not ids_to_delete:
return 0, "No memories found matching the criteria."
self.collection.delete(ids=ids_to_delete)
return len(ids_to_delete), None
except Exception as e:
logger.exception("Error deleting memories by timeframe:")
return 0, str(e)
async def delete_before_date(self, before_date: datetime.date, tag: Optional[str] = None) -> Tuple[int, Optional[str]]:
"""Deletes memories before a given date and optionally filtered by tag."""
try:
before_datetime = datetime(before_date.year, before_date.month, before_date.day, 23, 59, 59) # End of the day
before_timestamp = before_datetime.timestamp()
where_clause = {"timestamp": {"$lt": before_timestamp}} # $lt: less than
results = self.collection.get(include=["metadatas"], where=where_clause)
ids_to_delete = []
if results.get("ids"):
for i, meta in enumerate(results["metadatas"]):
try:
retrieved_tags = json.loads(meta.get("tags", "[]"))
except json.JSONDecodeError:
retrieved_tags = []
logger.warning(f"Invalid JSON in tags metadata: {meta.get('tags')}")
if tag is None or tag in retrieved_tags:
ids_to_delete.append(results["ids"][i])
if not ids_to_delete:
return 0, "No memories found matching the criteria."
self.collection.delete(ids=ids_to_delete)
return len(ids_to_delete), None
except Exception as e:
logger.exception("Error deleting memories before date:")
return 0, str(e)
async def cleanup(self):
"""Cleans up all data in the ChromaDB."""
try:
# Delete the collection
self.client.delete_collection(name="memory_collection")
logger.info("ChromaDB collection deleted successfully.")
# Recreate the collection
self.collection = self.client.get_or_create_collection(
name="memory_collection",
metadata={"hnsw:space": "cosine"},
embedding_function=self.embedding_function
)
logger.info("ChromaDB collection recreated successfully.")
except Exception as e:
logger.exception("Error during ChromaDB cleanup:")
raise
async def main():
storage = ChromaMemoryStorage()
today = datetime.today()
yesterday = today - timedelta(days=1)
two_days_ago = today - timedelta(days=2)
tomorrow = today + timedelta(days=1)
mem_data = [
{"content": "Meeting with team tomorrow at 10 AM", "memory_type": "calendar", "tags": ["meeting", "important"]},
{"content": "Review ML model performance metrics", "memory_type": "todo", "tags": ["ml", "review"]},
{"content": "Another meeting", "memory_type": "calendar", "tags": ["meeting", "test"]},
{"content": "unrelated", "memory_type": "note", "tags": ["note","important"]},
{"content": "Tagging test", "memory_type": "info", "tags": ['technical', 'test']},
{"content": "Review ML model performance metrics", "memory_type": "todo", "tags": "ml,review"},
{"content": "Today's memory", "memory_type": "test", "tags": ["test"], "date": today},
{"content": "Yesterday's memory", "memory_type": "test", "tags": ["test,important"], "date": yesterday},
{"content": "Memory two days ago", "memory_type": "test", "tags": ["test"], "date": two_days_ago},
{"content": "Tomorrow's memory", "memory_type": "test", "tags": ["test"], "date": tomorrow},
{"content": "Old Important memory", "memory_type": "test", "tags": ["important"], "date": two_days_ago}
] # checking for different tag formats even if it's not a list or string
for data in mem_data:
tags = storage.sanitize_tags(data["tags"]) # Sanitize before creating the hash
date = data.get("date") # Get the date or None if it does not exist
if date: # Check if the date exists
dt = datetime(date.year, date.month, date.day, 12, 0, 0)
timestamp = dt.timestamp()
mem = Memory(content=data["content"], content_hash=generate_content_hash(data["content"], data), tags=tags, memory_type=data["memory_type"], timestamp=timestamp)
else:
mem = Memory(content=data["content"], content_hash=generate_content_hash(data["content"], data), tags=tags, memory_type=data["memory_type"])
success, err = await storage.store(mem)
logger.info(f"Store result for '{mem.content}': {success}, {err}")
# Search for "meeting"
res = await storage.search_by_tag(["meeting"])
assert len(res) == 2, f'Expected 2 "meeting" memories, but found {len(res)}'
logger.info(f"Search results for tag 'meeting' before deletion: {len(res)}")
for r in res:
logger.debug(f" {r}")
# Delete "meeting"
count, err = await storage.delete_by_tag("meeting")
logger.info(f"Delete count for tag 'meeting': {count}, {err}")
assert count == 2, f'Expected 2 "meeting" memories, but deleted {count}'
# Search again
res = await storage.search_by_tag(["meeting"])
logger.info(f"Search results for tag 'meeting' after deletion: {len(res)}")
assert len(res) == 0, f'Expected 0 "meeting" memories, but found {len(res)}'
for r in res:
logger.debug(f" {r}")
# Search for "ml"
res = await storage.search_by_tag(["ml"])
logger.info(f"Search results for tag 'ml' after deletion of 'meeting': {len(res)}")
assert len(res) == 1, f'Expected 1 "ml" memory, but found {len(res)}'
for r in res:
logger.debug(f" {r}")
# Search for "important"
res = await storage.search_by_tag(["important"])
logger.info(f"Search results for tag 'important' after deletion of 'meeting': {len(res)}")
assert len(res) == 2, f'Expected 2 "important" memory, but found {len(res)}'
for r in res:
logger.debug(f" {r}")
# Search for "note"
res = await storage.search_by_tag(["note"])
logger.info(f"Search results for tag 'note' after deletion of 'meeting': {len(res)}")
assert len(res) == 1, f'Expected 1 "note" memory, but found {len(res)}'
for r in res:
logger.debug(f" {r}")
# Search for "test"
res = await storage.search_by_tag(["test"])
logger.info(f"Search results for tag 'test' after deletion of 'meeting': {len(res)}")
assert len(res) == 4, f'Expected 4 "test" memories, but found {len(res)}'
for r in res:
logger.debug(f" {r}")
# Search for "technical"
res = await storage.search_by_tag(["technical"])
logger.info(f"Search results for tag 'technical' after deletion of 'meeting': {len(res)}")
assert len(res) == 1, f'Expected 1 "technical" memory, but found {len(res)}'
for r in res:
logger.debug(f" {r}")
# Search by timestamp
res = await storage.recall(start_timestamp=start_timestamp, end_timestamp=end_timestamp)
logger.info(f"Search results for start_timestamp: {start_timestamp}, end_timestamp: {end_timestamp}: {len(res)}")
assert len(res) == 5, f'Expected 5 memories, but found {len(res)}'
for r in res:
logger.debug(f" {r}")
# Delete memories from today with the tag "important"
count, err = await storage.delete_by_timeframe(start_date=today, tag="important")
assert count == 1, f"Expected 1 'important' memory to be deleted, but {count} were deleted."
logger.info(f"Method delete_by_timeframe with Tag 'important' - Deleted {count} memories")
# Delete all memories from today (regardless of tags)
count, err = await storage.delete_by_timeframe(start_date=today)
assert count == 4, f"Expected 4 memory to be deleted, but {count} were deleted."
logger.info(f"Method delete_by_timeframe without Tag - Deleted {count} memories")
# Test deleting before today with tag "important"
count, err = await storage.delete_before_date(before_date=today, tag="important")
assert count == 1, f"Expected 1 'important' memory to be deleted, but {count} were deleted."
logger.info(f"Deleted {count} 'important' memories before today")
# Test deleting before yesterday
count, err = await storage.delete_before_date(before_date=yesterday)
assert count == 2, f"Expected 1 memory to be deleted, but {count} were deleted."
logger.info(f"Deleted {count} memories before yesterday")
# Test deleting before tomorrow
count, err = await storage.delete_before_date(before_date=tomorrow)
assert count == 1, f"Expected 1 memory to be deleted, but {count} were deleted."
logger.info(f"Deleted {count} memories before tomorrow")
await storage.cleanup()
logger.info("Cleanup complete")
results = storage.collection.get()
logger.info(f"Collection size after cleanup: {len(results.get('ids', []))}")
assert err == None, f'Expected no error, but got {err}'
if __name__ == "__main__":
asyncio.run(main())