"""MCP tools for statement indexing and management."""
import os
import shutil
import json
from datetime import datetime
from pathlib import Path
from ..database.sqlite_client import SQLiteClient
from ..database.lancedb_client import LanceDBClient
from ..database.schemas import Statement
from ..embeddings.generator import EmbeddingGenerator
# Calculate default paths relative to this file's location
# investing-mcp/src/tools/indexing.py -> investments/data
_DATA_DIR = Path(__file__).parent.parent.parent.parent.resolve() / "data"
_DEFAULT_PDF_PATH = str(_DATA_DIR / "pdfs")
_DEFAULT_CSV_PATH = str(_DATA_DIR / "csvs")
_DEFAULT_JSON_PATH = str(_DATA_DIR / "json")
class IndexingTools:
"""Tools for indexing and managing statements."""
def __init__(
self,
sqlite_client: SQLiteClient,
lance_client: LanceDBClient,
embedding_gen: EmbeddingGenerator,
pdf_archive_path: str = _DEFAULT_PDF_PATH,
csv_archive_path: str = _DEFAULT_CSV_PATH,
json_archive_path: str = _DEFAULT_JSON_PATH,
):
"""Initialize indexing tools.
Args:
sqlite_client: SQLite database client
lance_client: LanceDB client
embedding_gen: Embedding generator
pdf_archive_path: Path to PDF archive directory
csv_archive_path: Path to CSV archive directory
json_archive_path: Path to JSON archive directory
"""
self.sqlite = sqlite_client
self.lance = lance_client
self.embedder = embedding_gen
self.pdf_archive_path = pdf_archive_path
self.csv_archive_path = csv_archive_path
self.json_archive_path = json_archive_path
# Create archive directories
os.makedirs(pdf_archive_path, exist_ok=True)
os.makedirs(csv_archive_path, exist_ok=True)
os.makedirs(json_archive_path, exist_ok=True)
# Lazy-load parsers on first use
self._questrade_parser = None
self._ibkr_parser = None
self._scotia_parser = None
self._td_parser = None
@property
def questrade_parser(self):
"""Lazy-load Questrade parser."""
if self._questrade_parser is None:
from ..parsers.questrade_parser import QuestradeParser
self._questrade_parser = QuestradeParser()
return self._questrade_parser
@property
def ibkr_parser(self):
"""Lazy-load IBKR parser."""
if self._ibkr_parser is None:
from ..parsers.ibkr_parser import IBKRParser
self._ibkr_parser = IBKRParser()
return self._ibkr_parser
@property
def scotia_parser(self):
"""Lazy-load Scotia parser."""
if self._scotia_parser is None:
from ..parsers.scotia_parser import ScotiaParser
self._scotia_parser = ScotiaParser()
return self._scotia_parser
@property
def td_parser(self):
"""Lazy-load TD parser."""
if self._td_parser is None:
from ..parsers.td_direct_parser import TDDirectParser
self._td_parser = TDDirectParser()
return self._td_parser
def _detect_pdf_parser(self, file_path: str) -> str:
"""Detect which parser to use for a PDF file.
Args:
file_path: Path to PDF file
Returns:
Parser name: 'scotia', 'questrade', 'ibkr', or 'td'
"""
# Check file path for hints
path_lower = file_path.lower()
filename = os.path.basename(path_lower)
if 'scotia' in path_lower or 'scotiaitrade' in path_lower:
return 'scotia'
elif 'questrade' in path_lower:
return 'questrade'
elif 'ibkr' in path_lower or 'interactive' in path_lower:
return 'ibkr'
elif 'td' in path_lower or 'waterhouse' in path_lower or filename.startswith('statement_'):
return 'td'
# Try to parse with each parser and see which works
# Default to Questrade for backward compatibility
return 'questrade'
async def index_statement(self, file_path: str) -> dict:
"""Index a statement file (PDF or CSV).
Supports:
- Questrade PDFs (auto-detected from filename)
- Scotia iTRADE PDFs (auto-detected from filename)
- IBKR PDFs (auto-detected from filename)
- IBKR CSVs (FlexQuery format)
Args:
file_path: Path to statement file (PDF or CSV)
Returns:
Result dictionary with status and statement_id
"""
try:
# Validate file exists
if not os.path.exists(file_path):
return {"status": "error", "message": "File not found"}
# Detect file type and parse accordingly
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == ".csv":
# Parse as IBKR CSV
statement = self.ibkr_parser.parse(file_path)
# Archive CSV
csv_filename = f"{statement.statement_id}.csv"
csv_dest = os.path.join(self.csv_archive_path, csv_filename)
shutil.copy2(file_path, csv_dest)
statement.pdf_path = csv_dest # Reuse pdf_path field for source file
elif file_ext == ".pdf":
# Detect which parser to use
parser_type = self._detect_pdf_parser(file_path)
# Parse with appropriate parser
if parser_type == 'scotia':
statement = self.scotia_parser.parse(file_path)
elif parser_type == 'ibkr':
statement = self.ibkr_parser.parse(file_path)
elif parser_type == 'td':
statement = self.td_parser.parse(file_path)
else: # questrade (default)
statement = self.questrade_parser.parse(file_path)
# Archive PDF
pdf_filename = f"{statement.statement_id}.pdf"
pdf_dest = os.path.join(self.pdf_archive_path, pdf_filename)
shutil.copy2(file_path, pdf_dest)
statement.pdf_path = pdf_dest
else:
return {"status": "error", "message": f"Unsupported file type: {file_ext}"}
# Save as JSON
json_filename = f"{statement.statement_id}.json"
json_dest = os.path.join(self.json_archive_path, json_filename)
with open(json_dest, "w") as f:
json.dump(statement.model_dump(mode="json"), f, indent=2, default=str)
statement.json_path = json_dest
# Insert into SQLite
await self.sqlite.insert_statement(statement)
# Delete old vector chunks for this statement to prevent duplicates
await self.lance.delete_statement_chunks(statement.statement_id)
# Generate embeddings and store in LanceDB
await self._index_vectors(statement)
return {
"status": "success",
"statement_id": statement.statement_id,
"institution": statement.institution,
"account_number": statement.summary.account_number,
"statement_date": statement.summary.statement_date.isoformat(),
}
except Exception as e:
return {
"status": "error",
"message": str(e),
}
async def _index_vectors(self, statement):
"""Generate and store vector embeddings for a statement.
Args:
statement: Statement object
"""
chunks = []
# Create summary chunk
summary_text = f"""
Account: {statement.summary.account_number}
Institution: {statement.institution}
Date: {statement.summary.statement_date}
Balance: ${statement.summary.current_balance_cad:,.2f} CAD
Cash: ${statement.summary.cash_cad:,.2f} CAD, ${statement.summary.cash_usd:,.2f} USD
Securities: ${statement.summary.securities_value_cad:,.2f} CAD
"""
summary_embedding = self.embedder.encode(summary_text.strip())
chunks.append({
"chunk_id": f"{statement.statement_id}_summary",
"statement_id": statement.statement_id,
"content": summary_text.strip(),
"chunk_type": "summary",
"vector": summary_embedding,
"account_number": statement.summary.account_number,
"statement_date": statement.summary.statement_date.isoformat(),
"institution": statement.institution,
})
# Create holdings chunks
for holding in statement.holdings:
holding_text = f"""
Symbol: {holding.symbol}
Description: {holding.description}
Quantity: {holding.quantity}
Cost: ${holding.total_cost:,.2f}
Market Value: ${holding.market_value_cad:,.2f} CAD
P/L: ${holding.profit_loss:,.2f} ({holding.percent_return:.2f}%)
"""
holding_embedding = self.embedder.encode(holding_text.strip())
chunks.append({
"chunk_id": f"{statement.statement_id}_holding_{holding.symbol}",
"statement_id": statement.statement_id,
"content": holding_text.strip(),
"chunk_type": "holding",
"vector": holding_embedding,
"account_number": statement.summary.account_number,
"statement_date": statement.summary.statement_date.isoformat(),
"institution": statement.institution,
})
# Create transaction chunks (grouped)
if statement.transactions:
txn_texts = [
f"{txn.transaction_date.strftime('%Y-%m-%d')}: {txn.activity_type} "
f"{txn.symbol or ''} {txn.description} ${txn.net_amount:,.2f}"
for txn in statement.transactions[:20] # Limit to first 20
]
txn_text = "\n".join(txn_texts)
txn_embedding = self.embedder.encode(txn_text)
chunks.append({
"chunk_id": f"{statement.statement_id}_transactions",
"statement_id": statement.statement_id,
"content": txn_text,
"chunk_type": "transactions",
"vector": txn_embedding,
"account_number": statement.summary.account_number,
"statement_date": statement.summary.statement_date.isoformat(),
"institution": statement.institution,
})
# Add chunks to LanceDB
await self.lance.add_chunks(chunks)
async def reindex_statement(self, statement_id: str) -> dict:
"""Reindex an existing statement.
Args:
statement_id: Statement ID
Returns:
Result dictionary
"""
# Get statement from database
statement_data = await self.sqlite.get_statement(statement_id)
if not statement_data:
return {"status": "error", "message": "Statement not found"}
# Re-parse from PDF if available
if statement_data.get("pdf_path") and os.path.exists(statement_data["pdf_path"]):
return await self.index_statement(statement_data["pdf_path"])
return {"status": "error", "message": "Original PDF not found"}
async def get_indexing_stats(self) -> dict:
"""Get indexing statistics.
Returns:
Statistics dictionary
"""
sqlite_stats = await self.sqlite.get_stats()
lance_stats = await self.lance.get_stats()
return {
**sqlite_stats,
**lance_stats,
}
async def rebuild_from_archives(self) -> dict:
"""Rebuild SQLite and LanceDB databases from JSON archives.
This tool is useful when syncing data between machines. The JSON archives
are the source of truth, and this rebuilds the local query databases.
Returns:
Result dictionary with rebuild statistics
"""
try:
# Get list of JSON files
json_files = list(Path(self.json_archive_path).glob("*.json"))
if not json_files:
return {
"status": "warning",
"message": "No JSON archives found",
"rebuilt": 0,
}
# Clear existing data
await self.sqlite.clear_all()
await self.lance.drop_table()
# Track results
rebuilt = 0
errors = []
# Process each JSON file
for json_file in json_files:
try:
# Load statement from JSON
with open(json_file, "r") as f:
data = json.load(f)
# Reconstruct Statement object
statement = Statement.model_validate(data)
# Insert into SQLite
await self.sqlite.insert_statement(statement)
# Generate embeddings and store in LanceDB
await self._index_vectors(statement)
rebuilt += 1
except Exception as e:
errors.append({
"file": str(json_file),
"error": str(e),
})
result = {
"status": "success",
"rebuilt": rebuilt,
"total_archives": len(json_files),
}
if errors:
result["errors"] = errors
result["status"] = "partial" if rebuilt > 0 else "error"
# Add final stats
stats = await self.get_indexing_stats()
result["stats"] = stats
return result
except Exception as e:
return {
"status": "error",
"message": str(e),
}