"""LanceDB client for vector storage and semantic search."""
import os
from pathlib import Path
from typing import Optional
from .schemas import DocumentChunk
# Calculate default path relative to this file's location
# investing-mcp/src/database/lancedb_client.py -> investments/db
_DEFAULT_DB_PATH = str(Path(__file__).parent.parent.parent.parent.resolve() / "db" / "lancedb")
class LanceDBClient:
"""LanceDB client for semantic search over statements."""
def __init__(self, db_path: str = _DEFAULT_DB_PATH):
"""Initialize LanceDB client configuration (no connection yet).
Args:
db_path: Path to LanceDB directory
"""
self.db_path = db_path
os.makedirs(db_path, exist_ok=True)
self._db = None # Connection created on first use
@property
def db(self):
"""Lazy-load the LanceDB connection."""
if self._db is None:
import lancedb
self._db = lancedb.connect(self.db_path)
return self._db
def create_table(self, table_name: str = "statement_chunks"):
"""Create or get the chunks table.
Args:
table_name: Name of the table
Returns:
LanceDB table
"""
# Check if table exists
try:
return self.db.open_table(table_name)
except Exception:
# Table doesn't exist, will be created when first data is added
return None
async def add_chunks(
self, chunks: list[dict], table_name: str = "statement_chunks"
):
"""Add document chunks to the vector store.
Args:
chunks: List of chunk dictionaries with 'vector' field
table_name: Name of the table
"""
if not chunks:
return
# Try to open existing table
try:
table = self.db.open_table(table_name)
table.add(chunks)
except Exception:
# Table doesn't exist, create it with data
table = self.db.create_table(table_name, data=chunks)
async def search(
self,
query_vector: list[float],
limit: int = 10,
filter_dict: Optional[dict] = None,
table_name: str = "statement_chunks",
) -> list[dict]:
"""Search for similar chunks.
Args:
query_vector: Query embedding vector
limit: Maximum number of results
filter_dict: Optional metadata filters
table_name: Name of the table
Returns:
List of matching chunks
"""
try:
table = self.db.open_table(table_name)
# Build search query
search_query = table.search(query_vector).limit(limit)
# Apply filters if provided
if filter_dict:
for key, value in filter_dict.items():
search_query = search_query.where(f"{key} = '{value}'")
# Execute search
results = search_query.to_list()
return results
except Exception as e:
print(f"Search error: {e}")
return []
async def delete_statement_chunks(
self, statement_id: str, table_name: str = "statement_chunks"
):
"""Delete all chunks for a statement.
Args:
statement_id: Statement ID
table_name: Name of the table
"""
try:
table = self.db.open_table(table_name)
table.delete(f"statement_id = '{statement_id}'")
except Exception:
pass # Table might not exist yet
async def get_stats(self, table_name: str = "statement_chunks") -> dict:
"""Get vector store statistics.
Args:
table_name: Name of the table
Returns:
Statistics dictionary
"""
try:
table = self.db.open_table(table_name)
count = table.count_rows()
return {
"total_chunks": count,
"table_name": table_name,
}
except Exception:
return {
"total_chunks": 0,
"table_name": table_name,
}
async def drop_table(self, table_name: str = "statement_chunks"):
"""Drop the specified table.
Args:
table_name: Name of the table to drop
"""
try:
self.db.drop_table(table_name)
except Exception:
pass # Table might not exist