Memex
by narphorium
import json
import os
from pathlib import Path
from datetime import datetime
from pymilvus import MilvusClient, CollectionSchema, FieldSchema, DataType
from pymilvus.milvus_client import IndexParams
from voyageai.embeddings_utils import get_embedding
from mcp_memex.notes import get_frontmatter
from .chunking import MarkdownSection, split_markdown_sections
MODEL_NAME = "voyage-3" # Which embedding model to use
DIMENSION = 1024 # Dimension of vector embedding
COLLECTION_NAME = "memex"
class VectorDB:
def __init__(self, index_dir: Path):
"""Initialize Milvus vector store with Voyage embeddings"""
self.index_dir = index_dir
self.index_dir.mkdir(parents=True, exist_ok=True)
db_path = self.index_dir / "index.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Connecting to Milvus at {db_path}")
self.client = MilvusClient(str(db_path))
# Initialize Voyage embedding function
self.voyage_api_key = os.getenv("VOYAGE_API_KEY")
if not self.voyage_api_key:
raise ValueError("VOYAGE_API_KEY environment variable is required")
# Create collection if it doesn't exist
if not self.client.has_collection(COLLECTION_NAME):
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=500),
FieldSchema(name="timestamp", dtype=DataType.VARCHAR, max_length=30),
FieldSchema(name="index", dtype=DataType.INT64),
FieldSchema(name="level", dtype=DataType.INT64),
FieldSchema(name="header", dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=10000),
FieldSchema(name="parent_index", dtype=DataType.INT64),
FieldSchema(name="child_indices", dtype=DataType.VARCHAR, max_length=200)
]
schema = CollectionSchema(fields)
self.client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema
)
# Create index immediately after collection creation
self.client.create_index(
collection_name=COLLECTION_NAME,
index_params=IndexParams(
field_name="vector",
index_type="FLAT",
metric_type="COSINE",
params={"nprobe": 10}
)
)
self.client.load_collection(COLLECTION_NAME)
else:
# Always load the collection when initializing
self.client.load_collection(COLLECTION_NAME)
def add_document(self, content: str, path: Path) -> None:
"""Add or update a document in the vector store"""
metadata, remaining_content = get_frontmatter(content)
timestamp = datetime.utcnow().isoformat()
# Chunk the content
chunks = split_markdown_sections(remaining_content, str(path))
# Get embeddings for all chunks
chunk_texts = [chunk.content for chunk in chunks]
vectors = [get_embedding(text, model=MODEL_NAME, api_key=self.voyage_api_key) for text in chunk_texts]
# Delete existing entries for this path if any
self.client.delete(
collection_name=COLLECTION_NAME,
filter=f'path == "{str(path)}"'
)
# Prepare data for Milvus
data = []
for i, (chunk, vector) in enumerate(zip(chunks, vectors)):
data.append({
"id": hash(f"{path}_{i}"), # Create unique ID from path and index
"vector": vector,
"title": path.stem,
"path": str(path),
"timestamp": timestamp,
"index": i,
"level": chunk.level,
"header": chunk.header,
"content": chunk.content,
"parent_index": chunk.parent_index,
"child_indices": json.dumps(chunk.child_indices) # Milvus doesn't support list fields
})
# Insert into Milvus
self.client.insert(
collection_name=COLLECTION_NAME,
data=data
)
# Remove the try-except block for index creation since we handle it in __init__
# The index will already exist
def has_document(self, url: str) -> bool:
"""Check if a document exists in the store"""
results = self.client.query(
collection_name=COLLECTION_NAME,
filter=f'url == "{url}"',
output_fields=["url"],
limit=1
)
return len(results) > 0
def search(self, query: str, n_results: int = 20) -> list[dict]:
"""Search for relevant content chunks"""
# Get query embedding using Voyage
query_vector = get_embedding(query, model=MODEL_NAME, api_key=self.voyage_api_key)
# Search in Milvus
results = self.client.search(
collection_name=COLLECTION_NAME,
data=[query_vector],
limit=n_results,
output_fields=["title", "path", "timestamp", "index", "level", "header", "content", "parent_index", "child_indices"],
params={"nprobe": 10} # Number of clusters to search
)
# Format results
search_results = []
for result in results[0]: # First query's results
metadata = result["entity"]
search_results.append({
"metadata": MarkdownSection(
path=metadata["path"],
index=metadata["index"],
level=metadata["level"],
header=metadata["header"],
content=metadata["content"],
parent_index=metadata["parent_index"],
child_indices=json.loads(metadata["child_indices"]),
),
"title": metadata["title"],
"timestamp": metadata["timestamp"],
"similarity": 1 - result["distance"] # Convert distance to similarity
})
return search_results
def save_db(self):
"""Save the database to disk"""
self.client.flush(COLLECTION_NAME)