"""
Test database queries for the Wealthfolio MCP server.
"""
import os
import sqlite3
# Set the database path
DB_PATH = r'C:\Users\miche\AppData\Roaming\com.teymz.wealthfolio\app.db'
def test_total_value():
"""Test getting total portfolio value."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("""
SELECT CAST(total_value AS REAL) as total_value
FROM daily_account_valuation
WHERE account_id = 'TOTAL'
AND valuation_date = (SELECT MAX(valuation_date) FROM daily_account_valuation)
""")
result = c.fetchone()
total_value = result['total_value'] if result else 0
conn.close()
print(f"[PASS] Total Value Query: EUR {total_value:,.2f}")
assert total_value > 0, "Total value should be greater than 0"
return total_value
def test_asset_allocation():
"""Test getting asset allocation."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("""
SELECT
a.asset_type,
SUM(
CAST(json_extract(value, '$.quantity') AS REAL) *
COALESCE(CAST((SELECT close FROM quotes
WHERE symbol = json_extract(value, '$.assetId')
ORDER BY timestamp DESC LIMIT 1) AS REAL), 0)
) as market_value
FROM holdings_snapshots hs,
json_each(hs.positions) AS positions
JOIN assets a ON json_extract(positions.value, '$.assetId') = a.id
WHERE hs.snapshot_date = (SELECT MAX(snapshot_date) FROM holdings_snapshots)
AND json_extract(positions.value, '$.quantity') > 0
GROUP BY a.asset_type
""")
allocation = {row['asset_type']: row['market_value'] for row in c.fetchall() if row['market_value']}
conn.close()
print(f"[PASS] Asset Allocation Query: {allocation}")
assert len(allocation) > 0, "Should have at least one asset type"
return allocation
def test_sql_query():
"""Test a simple SELECT query."""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT name FROM accounts LIMIT 5")
rows = c.fetchall()
conn.close()
print(f"[PASS] SQL Query Test: Found {len(rows)} accounts")
return rows
if __name__ == "__main__":
print("Running Wealthfolio MCP Server Database Tests\n")
try:
total = test_total_value()
allocation = test_asset_allocation()
accounts = test_sql_query()
print("\n=== All Tests Passed! ===")
print(f"Portfolio Value: EUR {total:,.2f}")
print(f"Asset Types: {list(allocation.keys())}")
except Exception as e:
print(f"\n[FAIL] Test Failed: {e}")
import traceback
traceback.print_exc()