"""Main MCP server for investment statements."""
import os
from pathlib import Path
from fastmcp import FastMCP
from .database.sqlite_client import SQLiteClient
from .database.lancedb_client import LanceDBClient
from .embeddings.generator import EmbeddingGenerator
from .tools.indexing import IndexingTools
from .tools.search import SearchTools
from .tools.query import QueryTools
from .tools.risk import RiskCalculator
from .tools.reports import ReportGenerator
from .tools.simulation import MonteCarloSimulator
from .resources.catalog import CatalogResource
# Initialize FastMCP server
mcp = FastMCP("Investment Statement Server")
# Calculate base paths relative to this file's location
# investing-mcp/src/server.py -> investing-mcp -> investments
_PROJECT_ROOT = Path(__file__).parent.parent.parent.resolve()
_DB_DIR = _PROJECT_ROOT / "db"
_DATA_DIR = _PROJECT_ROOT / "data"
# Get configuration from environment (with relative path defaults)
SQLITE_PATH = os.getenv("SQLITE_PATH", str(_DB_DIR / "statements.db"))
LANCEDB_PATH = os.getenv("LANCEDB_PATH", str(_DB_DIR / "lancedb"))
PDF_ARCHIVE_PATH = os.getenv("PDF_ARCHIVE_PATH", str(_DATA_DIR / "pdfs"))
CSV_ARCHIVE_PATH = os.getenv("CSV_ARCHIVE_PATH", str(_DATA_DIR / "csvs"))
JSON_ARCHIVE_PATH = os.getenv("JSON_ARCHIVE_PATH", str(_DATA_DIR / "json"))
REPORTS_PATH = os.getenv("REPORTS_PATH", str(_DATA_DIR / "reports"))
MODEL_NAME = os.getenv("MODEL_NAME", "all-MiniLM-L6-v2")
# Initialize clients
sqlite_client = SQLiteClient(db_path=SQLITE_PATH)
lance_client = LanceDBClient(db_path=LANCEDB_PATH)
embedding_gen = EmbeddingGenerator(model_name=MODEL_NAME)
# Initialize tools and resources
indexing_tools = IndexingTools(
sqlite_client=sqlite_client,
lance_client=lance_client,
embedding_gen=embedding_gen,
pdf_archive_path=PDF_ARCHIVE_PATH,
csv_archive_path=CSV_ARCHIVE_PATH,
json_archive_path=JSON_ARCHIVE_PATH,
)
search_tools = SearchTools(lance_client=lance_client, embedding_gen=embedding_gen)
query_tools = QueryTools(db_client=sqlite_client)
risk_calculator = RiskCalculator(db_client=sqlite_client)
report_generator = ReportGenerator(
json_archive_path=JSON_ARCHIVE_PATH,
reports_path=REPORTS_PATH,
)
monte_carlo = MonteCarloSimulator(db_client=sqlite_client, reports_path=REPORTS_PATH)
catalog_resource = CatalogResource(db_client=sqlite_client)
# ============================================================================
# Resources
# ============================================================================
@mcp.resource("statements://catalog")
async def get_catalog() -> str:
"""List all indexed investment statements with summaries.
Returns catalog of all statements including institution, account number,
and statement dates.
"""
return await catalog_resource.get_catalog()
@mcp.resource("statements://{statement_id}")
async def get_statement(statement_id: str) -> str:
"""Get specific statement data.
Args:
statement_id: Unique statement identifier
Returns complete statement data including metadata.
"""
return await catalog_resource.get_statement(statement_id)
@mcp.resource("statements://{statement_id}/summary")
async def get_statement_summary(statement_id: str) -> str:
"""Get account summary section for a statement.
Args:
statement_id: Unique statement identifier
Returns account summary with balances, cash positions, and FX rates.
"""
return await catalog_resource.get_statement_summary(statement_id)
# ============================================================================
# Indexing Tools
# ============================================================================
@mcp.tool
async def index_statement(file_path: str) -> dict:
"""Index a new PDF investment statement.
Parses the PDF, extracts account summary, holdings, and transactions,
then stores the data in both SQLite and LanceDB for querying.
Args:
file_path: Absolute path to PDF statement file
Returns:
Dictionary with status, statement_id, and metadata
"""
# Initialize database on first use
await sqlite_client.initialize()
return await indexing_tools.index_statement(file_path)
@mcp.tool
async def reindex_statement(statement_id: str) -> dict:
"""Reindex an existing statement from its original PDF.
Useful for fixing parsing errors or updating the index after schema changes.
Args:
statement_id: Unique statement identifier
Returns:
Dictionary with status and updated metadata
"""
return await indexing_tools.reindex_statement(statement_id)
@mcp.tool
async def get_indexing_stats() -> dict:
"""Get statistics about indexed statements.
Returns:
Dictionary with total statements, holdings, transactions, and vector chunks
"""
return await indexing_tools.get_indexing_stats()
@mcp.tool
async def rebuild_from_archives() -> dict:
"""Rebuild SQLite and LanceDB databases from JSON archives.
Use this tool when setting up on a new machine or after syncing JSON archives
from another computer. The JSON files in the archive directory are the source
of truth, and this tool rebuilds the local query databases from them.
This clears existing data in SQLite and LanceDB, then reprocesses all JSON files.
Returns:
Dictionary with rebuild status, count of rebuilt statements, and final stats
"""
# Initialize database schema
await sqlite_client.initialize()
return await indexing_tools.rebuild_from_archives()
# ============================================================================
# Search Tools
# ============================================================================
@mcp.tool
async def search_statements(query: str, limit: int = 10) -> list[dict]:
"""Search all statements using natural language.
Uses semantic search to find relevant information across all indexed statements.
Examples:
- "What were my total dividends in Q3 2025?"
- "Show me all SPY holdings"
- "When did I buy shares of Apple?"
Args:
query: Natural language search query
limit: Maximum number of results to return (default: 10)
Returns:
List of matching statement chunks with relevance scores
"""
return await search_tools.search_statements(query=query, limit=limit)
# ============================================================================
# Query Tools
# ============================================================================
@mcp.tool
async def get_holdings_by_symbol(symbol: str) -> list[dict]:
"""Get all holdings for a specific stock/ETF symbol across all statements.
Useful for tracking position changes over time.
Args:
symbol: Stock/ETF ticker symbol (e.g., "SPY", "AAPL")
Returns:
List of holdings ordered by statement date (newest first)
"""
return await query_tools.get_holdings_by_symbol(symbol)
@mcp.tool
async def get_transactions_by_date_range(
start_date: str, end_date: str, account_number: str = None
) -> list[dict]:
"""Get all transactions within a date range.
Args:
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
account_number: Optional account number to filter by
Returns:
List of transactions ordered by date (newest first)
"""
return await query_tools.get_transactions_by_date_range(
start_date=start_date,
end_date=end_date,
account_number=account_number,
)
@mcp.tool
async def get_account_balance(account_number: str, date: str) -> dict:
"""Get account balance at a specific date.
Returns the most recent statement on or before the specified date.
Args:
account_number: Account number (e.g., "51516162")
date: Date in YYYY-MM-DD format
Returns:
Statement data with balance information or None if not found
"""
result = await query_tools.get_account_balance(
account_number=account_number,
date=date,
)
return result if result else {"error": "No statement found for the specified date"}
# ============================================================================
# Performance & Risk Tools
# ============================================================================
@mcp.tool
async def get_performance_history(account_number: str, limit: int = 12) -> dict:
"""Get performance history for an account over time.
Shows balance changes and returns across multiple statements.
Args:
account_number: Account number (e.g., "51516162")
limit: Number of months to return (default: 12)
Returns:
Performance history with balances and returns over time
"""
return await query_tools.get_performance_history(
account_number=account_number,
limit=limit,
)
@mcp.tool
async def compare_to_benchmark(account_number: str, benchmark_name: str = None) -> dict:
"""Compare account performance to market benchmarks.
Shows your returns vs index returns (S&P 500, TSX, etc.) and calculates alpha.
Args:
account_number: Account number (e.g., "51516162")
benchmark_name: Optional benchmark name to filter (e.g., "S&P 500")
Returns:
Comparison of account returns vs benchmark returns for each period
"""
return await query_tools.compare_to_benchmark(
account_number=account_number,
benchmark_name=benchmark_name,
)
@mcp.tool
async def get_risk_metrics(account_number: str) -> dict:
"""Get risk metrics for an account.
Returns volatility, beta, Sharpe ratio, and maximum drawdown.
Requires previously calculated metrics (use calculate_risk_metrics first).
Args:
account_number: Account number (e.g., "51516162")
Returns:
Risk metrics including volatility, beta, Sharpe ratio, max drawdown
"""
return await query_tools.get_risk_metrics(account_number=account_number)
@mcp.tool
async def calculate_risk_metrics(account_number: str, period_months: int = 12) -> dict:
"""Calculate risk metrics from historical performance data.
Computes volatility, Sharpe ratio, maximum drawdown, and VaR from
your account's historical returns. Requires at least 3 months of data.
Args:
account_number: Account number (e.g., "51516162")
period_months: Number of months to use for calculation (default: 12)
Returns:
Calculated risk metrics stored in database
"""
return await risk_calculator.calculate_risk_metrics(
account_number=account_number,
period_months=period_months,
)
@mcp.tool
async def get_portfolio_summary() -> dict:
"""Get consolidated portfolio summary across all accounts.
Shows total value, allocation by account, and weighted average returns.
Returns:
Portfolio summary with total balance, allocations, and performance
"""
return await query_tools.get_portfolio_summary()
@mcp.tool
async def run_monte_carlo_simulation(
n_simulations: int = 10000,
projection_years: int = 5,
account_numbers: list[str] = None,
percentiles: list[float] = None,
method: str = "normal"
) -> dict:
"""Run Monte Carlo simulation for portfolio projection.
Projects future portfolio value using historical returns.
No contributions or withdrawals are assumed during the projection period.
Two methods available:
1. 'normal' (default): Geometric Brownian Motion with parametric normal distribution
- Assumes returns follow normal distribution
- Uses mean and volatility from portfolio history
- Faster, more stable for long projections
2. 'historical': Historical bootstrap using S&P 500 returns
- Samples actual S&P 500 returns from 50+ years of history
- Scales by portfolio's volatility ratio and alpha vs S&P 500
- Preserves actual distribution characteristics (skew, kurtosis, fat tails)
- Better for tail risk analysis
The simulation:
1. Loads historical monthly returns and aggregates across all accounts
2. Calculates mean return and volatility from historical data
3. (If historical) Fetches S&P 500 data and calculates scaling factors
4. Runs Monte Carlo simulation using selected method
5. Calculates percentiles for each year (1-5 years)
6. Generates visualization charts (fan chart, distribution, confidence intervals)
Args:
n_simulations: Number of simulation paths to run (default: 10,000)
projection_years: Number of years to project (default: 5)
account_numbers: List of account numbers to include (default: all accounts)
percentiles: Percentiles to calculate, e.g., [10,25,50,75,90] (default: [10,25,50,75,90])
method: Simulation method - 'normal' or 'historical' (default: 'normal')
Returns:
Dictionary containing:
- simulation_metadata: Simulation parameters, method, and initial portfolio value
- input_statistics: Historical returns data (mean, volatility, periods)
- scaling_factors: (historical method only) Volatility ratio, alpha, S&P 500 stats
- year_by_year_projections: Percentiles and statistics for each year
- visualizations: Paths to generated chart files (fan chart, distribution, confidence intervals)
- data_files: Paths to CSV files (projections, summary, historical_returns)
"""
return await monte_carlo.run_portfolio_simulation(
n_simulations=n_simulations,
projection_years=projection_years,
account_numbers=account_numbers,
percentiles=percentiles,
method=method
)
@mcp.tool
async def run_stress_test_simulation(
n_simulations: int = 10000,
projection_years: int = 5,
account_numbers: list[str] = None,
percentiles: list[float] = None
) -> dict:
"""Run Monte Carlo stress test with multiple adverse scenarios.
Projects portfolio under baseline conditions and multiple stress scenarios:
- Market Crash: Sudden 30% decline
- Bear Market: 12 months of -5% monthly returns
- Volatility Spike: 3x normal volatility for 6 months
Stress events occur at random times during projection (different for each simulation).
This provides insight into worst-case scenarios and portfolio resilience.
Args:
n_simulations: Number of simulation paths to run (default: 10,000)
projection_years: Number of years to project (default: 5)
account_numbers: List of account numbers to include (default: all accounts)
percentiles: Percentiles to calculate, e.g., [10,25,50,75,90] (default: [10,25,50,75,90])
Returns:
Dictionary containing:
- simulation_metadata: Simulation parameters and initial portfolio value
- input_statistics: Historical returns data (mean, volatility, periods)
- baseline: Baseline percentile projections (no stress)
- stress_scenarios: Results for each stress scenario with projections
- visualizations: Paths to comparison charts (comparison_chart, distributions, impact_summary)
- data_files: Paths to CSV files (scenario_comparison, impact_analysis, summary)
"""
return await monte_carlo.run_stress_test_simulation(
n_simulations=n_simulations,
projection_years=projection_years,
account_numbers=account_numbers,
percentiles=percentiles
)
# ============================================================================
# Report Generation Tools
# ============================================================================
@mcp.tool
async def update_monthly_returns(account_numbers: list[str] = None) -> dict:
"""Update monthly returns CSV file for all accounts.
Generates a CSV file with monthly returns calculated using the Modified Dietz
method, which excludes the impact of deposits and withdrawals to show true
investment performance. The file is saved to ../data/reports/monthly_returns.csv.
Args:
account_numbers: List of account numbers to include. If None, includes all
known accounts (IBKR, Questrade, Scotia, TD Direct).
Returns:
Dictionary with status, file path, total rows, and per-account statistics
including statement count, date range, and return metrics
"""
return await report_generator.update_monthly_returns(account_numbers=account_numbers)
# ============================================================================
# Server Entry Point
# ============================================================================
if __name__ == "__main__":
# Run the MCP server
mcp.run()