#!/usr/bin/env python3
import os
import sys
import sqlite3
import asyncio
import json
from mcp.server import Server
from mcp.types import Tool, TextContent
from mcp.server.stdio import stdio_server
from dotenv import load_dotenv
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
# Charge les variables d'environnement
load_dotenv()
# Chemin de la base de données
DB_PATH = os.getenv("DB_PATH")
if not DB_PATH or not os.path.exists(DB_PATH):
print(f"ERREUR: Base de données non trouvée : {DB_PATH}", file=sys.stderr)
sys.exit(1)
# Créer le serveur MCP
app = Server("wealthfolio")
def get_real_estate_data() -> Dict[str, Any]:
"""
Récupère les données immobilières depuis app_settings.
Returns:
Dict contenant properties, loans, valuations, loanEvents
Retourne une structure vide si l'addon n'est pas installé
"""
empty_data = {
"properties": [],
"loans": [],
"valuations": [],
"loanEvents": [],
"version": "1.0.0"
}
try:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Vérifier si la table app_settings existe
c.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='app_settings'
""")
if not c.fetchone():
conn.close()
return empty_data
# Récupérer les données immobilières
# Try new key first, then fall back to old key for compatibility
c.execute("""
SELECT setting_value
FROM app_settings
WHERE setting_key = 'addon:real-estate:real-estate-data'
""")
result = c.fetchone()
# If no result with new key, try old key
if not result or not result[0]:
c.execute("""
SELECT setting_value
FROM app_settings
WHERE setting_key = 'addon:real-estate-addon:real-estate-data'
""")
result = c.fetchone()
conn.close()
# Si pas de résultat ou valeur vide
if not result or not result[0]:
return empty_data
# Parser le JSON
data = json.loads(result[0])
# Valider la structure et ajouter les clés manquantes
if not isinstance(data, dict):
return empty_data
# S'assurer que toutes les clés existent
for key in ["properties", "loans", "valuations", "loanEvents"]:
if key not in data:
data[key] = []
if "version" not in data:
data["version"] = "1.0.0"
return data
except json.JSONDecodeError:
# JSON invalide - retourner structure vide sans erreur
return empty_data
except sqlite3.OperationalError:
# Erreur SQL (table inexistante, etc.) - retourner structure vide
return empty_data
except Exception:
# Toute autre erreur - retourner structure vide
return empty_data
@app.list_tools()
async def list_tools() -> list[Tool]:
"""Liste les outils disponibles."""
return [
Tool(
name="get_portfolio_summary",
description="Retourne la valeur totale du portefeuille et la répartition par type d'actif",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="run_sql_query",
description="Exécute une requête SELECT sur la base de données Wealthfolio. Limites: 5000 lignes pour les cotations, 200 lignes pour les autres tables.",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "Requête SQL SELECT à exécuter"
}
},
"required": ["sql"]
}
),
Tool(
name="create_activity",
description="Crée une nouvelle activité (transaction) dans le portefeuille. Utile pour créer des positions d'ouverture ou enregistrer des transactions.",
inputSchema={
"type": "object",
"properties": {
"account_id": {
"type": "string",
"description": "ID du compte"
},
"asset_id": {
"type": "string",
"description": "ID de l'actif (symbole)"
},
"activity_type": {
"type": "string",
"description": "Type d'activité: BUY, SELL, DIVIDEND, INTEREST, FEE, TRANSFER_IN, TRANSFER_OUT",
"enum": ["BUY", "SELL", "DIVIDEND", "INTEREST", "FEE", "TRANSFER_IN", "TRANSFER_OUT"]
},
"activity_date": {
"type": "string",
"description": "Date de l'activité (format: YYYY-MM-DD)"
},
"quantity": {
"type": "number",
"description": "Quantité (positif pour achats/entrées, négatif pour ventes/sorties)"
},
"unit_price": {
"type": "number",
"description": "Prix unitaire"
},
"currency": {
"type": "string",
"description": "Devise (USD, EUR, etc.)"
},
"fee": {
"type": "number",
"description": "Frais de transaction (optionnel, défaut: 0)"
},
"comment": {
"type": "string",
"description": "Commentaire optionnel"
}
},
"required": ["account_id", "asset_id", "activity_type", "activity_date", "quantity", "unit_price", "currency"]
}
),
Tool(
name="create_asset",
description="Crée un nouvel actif dans la base de données. Nécessaire avant de créer des activités pour un symbole non existant.",
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Symbole de l'actif (ex: AAPL, MSFT)"
},
"name": {
"type": "string",
"description": "Nom de l'actif (ex: Apple Inc.)"
},
"asset_type": {
"type": "string",
"description": "Type d'actif: Stock, ETF, Bond, Cryptocurrency, etc."
},
"currency": {
"type": "string",
"description": "Devise de cotation (USD, EUR, etc.)"
},
"isin": {
"type": "string",
"description": "Code ISIN (optionnel)"
},
"asset_class": {
"type": "string",
"description": "Classe d'actif (optionnel)"
}
},
"required": ["symbol", "name", "asset_type", "currency"]
}
),
Tool(
name="execute_write_query",
description="Exécute une requête SQL d'écriture (INSERT, UPDATE, DELETE). À utiliser avec précaution! Requiert confirmation explicite.",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "Requête SQL à exécuter (INSERT, UPDATE, DELETE)"
},
"confirm": {
"type": "boolean",
"description": "Confirmation explicite requise (doit être true)"
}
},
"required": ["sql", "confirm"]
}
),
Tool(
name="fetch_and_update_quotes",
description="Télécharge automatiquement les cotations depuis Yahoo Finance et met à jour la table quotes. Économise temps et tokens!",
inputSchema={
"type": "object",
"properties": {
"symbols": {
"type": "array",
"items": {"type": "string"},
"description": "Liste des symboles à mettre à jour. Si vide, récupère tous les symboles du portefeuille."
},
"start_date": {
"type": "string",
"description": "Date de début pour l'historique (format: YYYY-MM-DD). Par défaut: 30 derniers jours"
},
"end_date": {
"type": "string",
"description": "Date de fin pour l'historique (format: YYYY-MM-DD). Par défaut: aujourd'hui"
},
"interval": {
"type": "string",
"enum": ["1d", "1wk", "1mo"],
"description": "Intervalle des données: 1d (quotidien), 1wk (hebdo), 1mo (mensuel). Par défaut: 1d"
}
},
"required": []
}
),
Tool(
name="get_real_estate_properties",
description="Récupère la liste des propriétés immobilières avec leurs détails (adresse, prix d'achat, valeur actuelle, etc.)",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="get_real_estate_loans",
description="Récupère les crédits immobiliers, optionnellement filtrés par propriété. Affiche les détails: montant, taux, échéances, etc.",
inputSchema={
"type": "object",
"properties": {
"propertyId": {
"type": "string",
"description": "ID de la propriété pour filtrer les crédits (optionnel)"
}
},
"required": []
}
),
Tool(
name="get_real_estate_summary",
description="Retourne une synthèse complète du patrimoine immobilier: nombre de propriétés, valeur totale, dettes, valeur nette",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="get_total_net_worth",
description="Calcule le patrimoine net total en combinant les actifs financiers et le patrimoine immobilier net des dettes",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="get_loan_amortization",
description="Calcule le tableau d'amortissement détaillé d'un crédit avec possibilité de simuler des remboursements anticipés",
inputSchema={
"type": "object",
"properties": {
"loanId": {
"type": "string",
"description": "ID du crédit à analyser"
},
"extraPayments": {
"type": "array",
"items": {
"type": "object",
"properties": {
"date": {
"type": "string",
"description": "Date du remboursement anticipé (format: YYYY-MM-DD)"
},
"amount": {
"type": "number",
"description": "Montant du remboursement anticipé"
}
},
"required": ["date", "amount"]
},
"description": "Liste optionnelle de remboursements anticipés à simuler"
}
},
"required": ["loanId"]
}
),
Tool(
name="get_real_estate_valuations",
description="Récupère l'historique des valorisations des propriétés immobilières, optionnellement filtré par propriété",
inputSchema={
"type": "object",
"properties": {
"propertyId": {
"type": "string",
"description": "ID de la propriété pour filtrer les valorisations (optionnel)"
}
},
"required": []
}
),
Tool(
name="get_accounts_summary",
description="Retourne un résumé détaillé par groupe de comptes (Investissement, Retraite, etc.) avec la valorisation à la dernière date",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
Tool(
name="get_holdings_detail",
description="Liste détaillée de tous les produits financiers détenus avec quantités, prix, valorisation, type d'actif, classe, secteur. Essentiel pour faire des recommandations et analyser la diversification.",
inputSchema={
"type": "object",
"properties": {
"include_cash": {
"type": "boolean",
"description": "Inclure les positions cash (défaut: false)"
}
},
"required": []
}
),
Tool(
name="get_all_holdings",
description="Retourne un objet JSON structuré avec TOUS les actifs et passifs: financial_holdings (actions, ETF, etc.), real_estate (propriétés + totaux), loans (crédits), et summary (synthèse globale). Parfait pour analyse patrimoniale complète.",
inputSchema={
"type": "object",
"properties": {
"include_cash": {
"type": "boolean",
"description": "Inclure les positions cash dans les holdings financiers (défaut: false)"
}
},
"required": []
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Exécute un outil."""
if name == "get_portfolio_summary":
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
# 1. Actifs financiers - Utiliser la dernière date uniquement
c.execute("""
SELECT
CAST(total_value AS REAL) as total_value,
base_currency
FROM daily_account_valuation
WHERE account_id = 'TOTAL'
AND valuation_date = (SELECT MAX(valuation_date) FROM daily_account_valuation WHERE account_id = 'TOTAL')
""")
result = c.fetchone()
financial_assets = result['total_value'] if result else 0
currency = result['base_currency'] if result else 'EUR'
# 2. Répartition par type d'actif financier
c.execute("""
SELECT
a.asset_type,
SUM(
CAST(json_extract(value, '$.quantity') AS REAL) *
COALESCE(CAST((SELECT close FROM quotes
WHERE symbol = json_extract(value, '$.assetId')
ORDER BY timestamp DESC LIMIT 1) AS REAL), 0)
) as market_value
FROM holdings_snapshots hs,
json_each(hs.positions) AS positions
JOIN assets a ON json_extract(positions.value, '$.assetId') = a.id
WHERE hs.snapshot_date = (SELECT MAX(snapshot_date) FROM holdings_snapshots)
AND json_extract(positions.value, '$.quantity') > 0
GROUP BY a.asset_type
""")
allocation = {row['asset_type']: row['market_value'] for row in c.fetchall() if row['market_value']}
conn.close()
# 3. Patrimoine immobilier (optionnel - peut ne pas exister)
try:
re_data = get_real_estate_data()
properties = re_data.get('properties', [])
loans = re_data.get('loans', [])
real_estate_gross = sum(float(p.get('currentValue', 0)) for p in properties if p.get('currentValue'))
real_estate_debt = sum(float(l.get('currentBalance', 0)) for l in loans if l.get('currentBalance'))
real_estate_net = real_estate_gross - real_estate_debt
except Exception:
# Si erreur, continuer sans données immobilières
real_estate_gross = 0
real_estate_debt = 0
real_estate_net = 0
# 4. Total combiné
total_net_worth = financial_assets + real_estate_net
has_real_estate = real_estate_gross > 0 or real_estate_debt > 0
# Formater le résultat
result_text = f"💼 PATRIMOINE TOTAL: {total_net_worth:,.2f} {currency}\n"
result_text += f"{'='*60}\n\n"
result_text += f"Actifs financiers: {financial_assets:,.2f} {currency}\n"
# N'afficher l'immobilier que s'il y en a
if has_real_estate:
result_text += f"Immobilier net: {real_estate_net:,.2f} {currency}\n"
if real_estate_gross > 0:
result_text += f" (Valeur brute: {real_estate_gross:,.2f} {currency}\n"
result_text += f" Dette: -{real_estate_debt:,.2f} {currency})\n"
result_text += f"\n{'─'*60}\n\n"
# Répartition financier vs immobilier (seulement si immobilier présent)
if has_real_estate and total_net_worth > 0:
fin_pct = (financial_assets / total_net_worth) * 100
re_pct = (real_estate_net / total_net_worth) * 100
result_text += f"RÉPARTITION GLOBALE:\n"
result_text += f" Financier: {fin_pct:.1f}%\n"
result_text += f" Immobilier: {re_pct:.1f}%\n\n"
# Répartition par type d'actif financier
if allocation:
result_text += f"ACTIFS FINANCIERS PAR TYPE:\n"
for asset_type, value in sorted(allocation.items(), key=lambda x: x[1], reverse=True):
percentage = (value / financial_assets * 100) if financial_assets > 0 else 0
result_text += f" {asset_type}: {value:,.2f} {currency} ({percentage:.1f}%)\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur: {str(e)}")]
elif name == "run_sql_query":
sql = arguments.get("sql", "").strip()
if not sql.upper().startswith("SELECT"):
return [TextContent(type="text", text="Erreur: Uniquement les requêtes SELECT sont autorisées.")]
try:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(sql)
rows = c.fetchall()
columns = [desc[0] for desc in c.description] if c.description else []
conn.close()
# Déterminer la limite basée sur le type de requête
# Pour les cotations: 5000 lignes (300 jours/an × 10 ans × symboles multiples)
# Pour les autres tables: 200 lignes
sql_lower = sql.lower()
if 'from quotes' in sql_lower or 'join quotes' in sql_lower:
limit = 5000
else:
limit = 200
total_rows = len(rows)
rows = rows[:limit]
if not rows:
return [TextContent(type="text", text="Aucun résultat trouvé.")]
# Formater les résultats
result_text = f"Colonnes: {', '.join(columns)}\n\n"
result_text += f"Résultats ({len(rows)} lignes"
if total_rows > limit:
result_text += f", {total_rows - limit} lignes tronquées"
result_text += "):\n"
for row in rows:
result_text += " " + " | ".join(str(v) for v in row) + "\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur SQL: {str(e)}")]
elif name == "create_activity":
import uuid
from datetime import datetime
try:
# Validation des paramètres
account_id = arguments.get("account_id")
asset_id = arguments.get("asset_id")
activity_type = arguments.get("activity_type")
activity_date = arguments.get("activity_date")
quantity = arguments.get("quantity")
unit_price = arguments.get("unit_price")
currency = arguments.get("currency")
fee = arguments.get("fee", 0)
comment = arguments.get("comment", "")
# Validation de la date
try:
datetime.strptime(activity_date, "%Y-%m-%d")
except ValueError:
return [TextContent(type="text", text="Erreur: Format de date invalide. Utilisez YYYY-MM-DD")]
# Calcul du montant
amount = float(quantity) * float(unit_price)
if activity_type in ["SELL"]:
amount = -amount
# Connexion et insertion
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Vérifier que le compte existe
c.execute("SELECT id FROM accounts WHERE id = ?", (account_id,))
if not c.fetchone():
conn.close()
return [TextContent(type="text", text=f"Erreur: Le compte '{account_id}' n'existe pas")]
# Vérifier que l'actif existe
c.execute("SELECT id FROM assets WHERE id = ?", (asset_id,))
if not c.fetchone():
conn.close()
return [TextContent(type="text", text=f"Erreur: L'actif '{asset_id}' n'existe pas. Créez-le d'abord avec create_asset")]
# Générer un ID unique
activity_id = str(uuid.uuid4())
created_at = datetime.now().isoformat()
# Insérer l'activité
c.execute("""
INSERT INTO activities (
id, account_id, asset_id, activity_type, activity_date,
quantity, unit_price, currency, fee, amount, is_draft, comment, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
activity_id, account_id, asset_id, activity_type, activity_date,
str(quantity), str(unit_price), currency, str(fee), str(amount), 0, comment, created_at, created_at
))
conn.commit()
conn.close()
result_text = f"Activité créée avec succès!\n\n"
result_text += f"ID: {activity_id}\n"
result_text += f"Type: {activity_type}\n"
result_text += f"Actif: {asset_id}\n"
result_text += f"Quantité: {quantity}\n"
result_text += f"Prix: {unit_price} {currency}\n"
result_text += f"Montant: {amount} {currency}\n"
result_text += f"Date: {activity_date}\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur lors de la création de l'activité: {str(e)}")]
elif name == "create_asset":
import uuid
from datetime import datetime
try:
symbol = arguments.get("symbol")
name = arguments.get("name")
asset_type = arguments.get("asset_type")
currency = arguments.get("currency")
isin = arguments.get("isin", "")
asset_class = arguments.get("asset_class", "")
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Vérifier si l'actif existe déjà
c.execute("SELECT id FROM assets WHERE id = ? OR symbol = ?", (symbol, symbol))
existing = c.fetchone()
if existing:
conn.close()
return [TextContent(type="text", text=f"Erreur: L'actif '{symbol}' existe déjà")]
# Créer l'actif
asset_id = symbol # Utiliser le symbole comme ID
created_at = datetime.now().isoformat()
c.execute("""
INSERT INTO assets (
id, symbol, name, asset_type, currency, isin, asset_class, data_source, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
asset_id, symbol, name, asset_type, currency, isin, asset_class, "MANUAL", created_at, created_at
))
conn.commit()
conn.close()
result_text = f"Actif créé avec succès!\n\n"
result_text += f"Symbole: {symbol}\n"
result_text += f"Nom: {name}\n"
result_text += f"Type: {asset_type}\n"
result_text += f"Devise: {currency}\n"
if isin:
result_text += f"ISIN: {isin}\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur lors de la création de l'actif: {str(e)}")]
elif name == "execute_write_query":
sql = arguments.get("sql", "").strip()
confirm = arguments.get("confirm", False)
if not confirm:
return [TextContent(type="text", text="Erreur: Confirmation requise. Passez 'confirm: true' pour exécuter cette requête.")]
# Validation: interdire les DROP et TRUNCATE
sql_upper = sql.upper()
if "DROP" in sql_upper or "TRUNCATE" in sql_upper:
return [TextContent(type="text", text="Erreur: Les commandes DROP et TRUNCATE sont interdites pour la sécurité.")]
# S'assurer que c'est bien une requête d'écriture
if not any(cmd in sql_upper for cmd in ["INSERT", "UPDATE", "DELETE"]):
return [TextContent(type="text", text="Erreur: Seules les requêtes INSERT, UPDATE, DELETE sont autorisées.")]
try:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(sql)
rows_affected = c.rowcount
conn.commit()
conn.close()
result_text = f"Requête exécutée avec succès!\n"
result_text += f"Lignes affectées: {rows_affected}\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur SQL: {str(e)}")]
elif name == "fetch_and_update_quotes":
import yfinance as yf
from datetime import datetime, timedelta
import uuid
try:
# Récupérer les paramètres
symbols = arguments.get("symbols", [])
start_date = arguments.get("start_date")
end_date = arguments.get("end_date")
interval = arguments.get("interval", "1d")
# Si aucun symbole fourni, récupérer tous les symboles du portefeuille
if not symbols:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT DISTINCT symbol FROM assets WHERE symbol IS NOT NULL")
symbols = [row[0] for row in c.fetchall()]
conn.close()
if not symbols:
return [TextContent(type="text", text="Aucun symbole trouvé dans le portefeuille.")]
# Définir les dates par défaut
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
# Par défaut: 30 jours avant end_date
start_dt = datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=30)
start_date = start_dt.strftime("%Y-%m-%d")
# Télécharger les cotations
result_text = f"Téléchargement des cotations pour {len(symbols)} symboles...\n"
result_text += f"Période: {start_date} à {end_date}\n"
result_text += f"Intervalle: {interval}\n\n"
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
success_count = 0
error_count = 0
total_quotes = 0
errors = []
for symbol in symbols:
try:
# Télécharger les données
ticker = yf.Ticker(symbol)
hist = ticker.history(start=start_date, end=end_date, interval=interval)
if hist.empty:
error_count += 1
errors.append(f" [!] {symbol}: Aucune donnée disponible")
continue
# Insérer les données dans la base
quotes_added = 0
for date, row in hist.iterrows():
quote_id = str(uuid.uuid4())
timestamp = date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
# Vérifier si la cotation existe déjà
c.execute("""
SELECT id FROM quotes
WHERE symbol = ? AND date(timestamp) = date(?)
""", (symbol, timestamp))
if c.fetchone():
# Mise à jour
c.execute("""
UPDATE quotes
SET open = ?, high = ?, low = ?, close = ?, volume = ?
WHERE symbol = ? AND date(timestamp) = date(?)
""", (
str(row['Open']), str(row['High']), str(row['Low']),
str(row['Close']), str(int(row['Volume'])),
symbol, timestamp
))
else:
# Insertion
c.execute("""
INSERT INTO quotes (id, symbol, timestamp, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
quote_id, symbol, timestamp,
str(row['Open']), str(row['High']), str(row['Low']),
str(row['Close']), str(int(row['Volume']))
))
quotes_added += 1
conn.commit()
success_count += 1
total_quotes += quotes_added
result_text += f" [OK] {symbol}: {quotes_added} cotations\n"
except Exception as e:
error_count += 1
error_msg = str(e)
# Limiter la longueur du message d'erreur
if len(error_msg) > 100:
error_msg = error_msg[:100] + "..."
errors.append(f" [X] {symbol}: {error_msg}")
conn.close()
# Résumé
result_text += f"\nRésumé:\n"
result_text += f" - Symboles traités avec succès: {success_count}/{len(symbols)}\n"
result_text += f" - Total cotations ajoutées/mises à jour: {total_quotes}\n"
result_text += f" - Erreurs: {error_count}\n"
if errors:
result_text += f"\nDétails des erreurs:\n"
# Limiter à 10 erreurs pour éviter un texte trop long
for error in errors[:10]:
result_text += error + "\n"
if len(errors) > 10:
result_text += f" ... et {len(errors) - 10} autres erreurs\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur lors du téléchargement des cotations: {str(e)}")]
elif name == "get_real_estate_properties":
try:
data = get_real_estate_data()
properties = data.get("properties", [])
if not properties:
return [TextContent(type="text", text="Aucune propriété immobilière trouvée.")]
result_text = f"Propriétés immobilières ({len(properties)}):\n\n"
for prop in properties:
result_text += f"📍 {prop.get('name', 'Sans nom')}\n"
result_text += f" ID: {prop.get('id')}\n"
result_text += f" Type: {prop.get('type', 'N/A')}\n"
result_text += f" Adresse: {prop.get('address', '')}, {prop.get('city', '')}, {prop.get('postalCode', '')}\n"
result_text += f" Pays: {prop.get('country', 'N/A')}\n"
result_text += f" Date d'achat: {prop.get('purchaseDate', 'N/A')}\n"
result_text += f" Prix d'achat: {float(prop.get('purchasePrice', 0)):,.2f} {prop.get('currency', 'EUR')}\n"
result_text += f" Valeur actuelle: {float(prop.get('currentValue', 0)):,.2f} {prop.get('currency', 'EUR')}\n"
capital_gain = float(prop.get('currentValue', 0)) - float(prop.get('purchasePrice', 0))
result_text += f" Plus-value: {capital_gain:,.2f} {prop.get('currency', 'EUR')}\n"
if prop.get('notes'):
result_text += f" Notes: {prop.get('notes')}\n"
result_text += "\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur: {str(e)}")]
elif name == "get_real_estate_loans":
try:
property_id = arguments.get("propertyId")
data = get_real_estate_data()
loans = data.get("loans", [])
# Filtrer par propertyId si fourni
if property_id:
loans = [loan for loan in loans if loan.get("propertyId") == property_id]
if not loans:
msg = f"Aucun crédit trouvé"
if property_id:
msg += f" pour la propriété {property_id}"
return [TextContent(type="text", text=msg + ".")]
result_text = f"Crédits immobiliers ({len(loans)}):\n\n"
for loan in loans:
result_text += f"💰 {loan.get('name', 'Sans nom')}\n"
result_text += f" ID: {loan.get('id')}\n"
result_text += f" Propriété: {loan.get('propertyId')}\n"
result_text += f" Prêteur: {loan.get('lender', 'N/A')}\n"
result_text += f" Type: {loan.get('type', 'N/A')}\n"
result_text += f" Montant initial: {float(loan.get('originalAmount', 0)):,.2f} {loan.get('currency', 'EUR')}\n"
result_text += f" Solde actuel: {float(loan.get('currentBalance', 0)):,.2f} {loan.get('currency', 'EUR')}\n"
result_text += f" Taux d'intérêt: {float(loan.get('interestRate', 0)):.2f}%\n"
result_text += f" Mensualité: {float(loan.get('monthlyPayment', 0)):,.2f} {loan.get('currency', 'EUR')}\n"
result_text += f" Début: {loan.get('startDate', 'N/A')}\n"
result_text += f" Fin: {loan.get('endDate', 'N/A')}\n"
# Calculer le montant remboursé
paid = float(loan.get('originalAmount', 0)) - float(loan.get('currentBalance', 0))
result_text += f" Déjà remboursé: {paid:,.2f} {loan.get('currency', 'EUR')}\n"
if loan.get('notes'):
result_text += f" Notes: {loan.get('notes')}\n"
result_text += "\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur: {str(e)}")]
elif name == "get_real_estate_summary":
try:
data = get_real_estate_data()
properties = data.get("properties", [])
loans = data.get("loans", [])
# Calculs
total_properties = len(properties)
total_property_value = sum(float(p.get('currentValue', 0)) for p in properties)
total_purchase_price = sum(float(p.get('purchasePrice', 0)) for p in properties)
total_capital_gain = total_property_value - total_purchase_price
total_loans = len(loans)
total_debt = sum(float(l.get('currentBalance', 0)) for l in loans)
total_original_debt = sum(float(l.get('originalAmount', 0)) for l in loans)
total_monthly_payment = sum(float(l.get('monthlyPayment', 0)) for l in loans)
net_real_estate_value = total_property_value - total_debt
# Déterminer la devise (prendre la première propriété ou EUR par défaut)
currency = properties[0].get('currency', 'EUR') if properties else 'EUR'
result_text = "📊 SYNTHÈSE DU PATRIMOINE IMMOBILIER\n\n"
result_text += "PROPRIÉTÉS:\n"
result_text += f" Nombre de propriétés: {total_properties}\n"
result_text += f" Valeur totale actuelle: {total_property_value:,.2f} {currency}\n"
result_text += f" Prix d'achat total: {total_purchase_price:,.2f} {currency}\n"
result_text += f" Plus-value totale: {total_capital_gain:,.2f} {currency}\n"
if total_purchase_price > 0:
gain_pct = (total_capital_gain / total_purchase_price) * 100
result_text += f" Rendement: {gain_pct:.2f}%\n"
result_text += "\n"
result_text += "CRÉDITS:\n"
result_text += f" Nombre de crédits: {total_loans}\n"
result_text += f" Dette totale: {total_debt:,.2f} {currency}\n"
result_text += f" Montant initial total: {total_original_debt:,.2f} {currency}\n"
result_text += f" Mensualités totales: {total_monthly_payment:,.2f} {currency}\n"
if total_original_debt > 0:
paid = total_original_debt - total_debt
paid_pct = (paid / total_original_debt) * 100
result_text += f" Déjà remboursé: {paid:,.2f} {currency} ({paid_pct:.1f}%)\n"
result_text += "\n"
result_text += "BILAN:\n"
result_text += f" Valeur nette immobilière: {net_real_estate_value:,.2f} {currency}\n"
if total_property_value > 0:
ltv = (total_debt / total_property_value) * 100
result_text += f" Taux d'endettement (LTV): {ltv:.1f}%\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur: {str(e)}")]
elif name == "get_total_net_worth":
from datetime import datetime
try:
# 1. Récupérer les actifs financiers
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("""
SELECT CAST(total_value AS REAL) as total_value, base_currency
FROM daily_account_valuation
WHERE account_id = 'TOTAL'
AND valuation_date = (SELECT MAX(valuation_date) FROM daily_account_valuation WHERE account_id = 'TOTAL')
""")
result = c.fetchone()
conn.close()
financial_assets = result['total_value'] if result else 0
currency = result['base_currency'] if result else 'EUR'
# 2. Récupérer le patrimoine immobilier
data = get_real_estate_data()
properties = data.get("properties", [])
loans = data.get("loans", [])
real_estate_gross_value = sum(float(p.get('currentValue', 0)) for p in properties)
total_debt = sum(float(l.get('currentBalance', 0)) for l in loans)
real_estate_net_value = real_estate_gross_value - total_debt
# 3. Calculer le patrimoine net total
net_worth = financial_assets + real_estate_net_value
# Formater le résultat
result_text = "💎 PATRIMOINE NET TOTAL\n\n"
result_text += f"Actifs financiers: {financial_assets:,.2f} {currency}\n"
result_text += f"Immobilier (valeur brute): {real_estate_gross_value:,.2f} {currency}\n"
result_text += f"Dettes immobilières: -{total_debt:,.2f} {currency}\n"
result_text += f"Immobilier (valeur nette): {real_estate_net_value:,.2f} {currency}\n"
result_text += f"\n{'='*50}\n"
result_text += f"PATRIMOINE NET: {net_worth:,.2f} {currency}\n"
result_text += f"{'='*50}\n\n"
# Répartition en pourcentage
if net_worth > 0:
fin_pct = (financial_assets / net_worth) * 100
re_pct = (real_estate_net_value / net_worth) * 100
result_text += f"Répartition:\n"
result_text += f" Financier: {fin_pct:.1f}%\n"
result_text += f" Immobilier: {re_pct:.1f}%\n"
result_text += f"\nCalculé le: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur: {str(e)}")]
elif name == "get_loan_amortization":
from datetime import datetime, timedelta
try:
loan_id = arguments.get("loanId")
extra_payments = arguments.get("extraPayments", [])
if not loan_id:
return [TextContent(type="text", text="Erreur: loanId requis")]
# Récupérer le crédit
data = get_real_estate_data()
loans = data.get("loans", [])
if not loans:
return [TextContent(type="text", text="Aucun crédit immobilier trouvé. L'addon Real Estate n'est peut-être pas installé.")]
loan = next((l for l in loans if l.get('id') == loan_id), None)
if not loan:
return [TextContent(type="text", text=f"Erreur: Crédit {loan_id} introuvable")]
# Valider les champs requis
required_fields = ['originalAmount', 'currentBalance', 'interestRate',
'monthlyPayment', 'startDate', 'endDate']
missing_fields = [field for field in required_fields if not loan.get(field)]
if missing_fields:
return [TextContent(type="text",
text=f"Erreur: Champs manquants dans les données du crédit: {', '.join(missing_fields)}")]
# Paramètres du crédit avec validation
try:
original_amount = float(loan.get('originalAmount', 0))
current_balance = float(loan.get('currentBalance', 0))
annual_rate = float(loan.get('interestRate', 0)) / 100
monthly_rate = annual_rate / 12
monthly_payment = float(loan.get('monthlyPayment', 0))
start_date = datetime.strptime(loan.get('startDate'), '%Y-%m-%d')
end_date = datetime.strptime(loan.get('endDate'), '%Y-%m-%d')
currency = loan.get('currency', 'EUR')
except (ValueError, TypeError) as e:
return [TextContent(type="text",
text=f"Erreur: Données invalides dans le crédit: {str(e)}")]
# Validations
if original_amount <= 0:
return [TextContent(type="text", text="Erreur: Montant initial invalide")]
if current_balance < 0:
return [TextContent(type="text", text="Erreur: Solde actuel invalide")]
if monthly_payment <= 0:
return [TextContent(type="text", text="Erreur: Mensualité invalide")]
if start_date >= end_date:
return [TextContent(type="text", text="Erreur: Dates invalides (début >= fin)")]
# Calculer les mois déjà écoulés depuis le début
now = datetime.now()
months_elapsed = (now.year - start_date.year) * 12 + (now.month - start_date.month)
capital_paid = original_amount - current_balance
# Calculer la durée totale du crédit
total_months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
months_remaining_contractual = total_months - months_elapsed
# Calculer le tableau d'amortissement à partir du mois actuel
schedule = []
balance = current_balance
month = months_elapsed # Démarrer au bon numéro de mois
# Jour de paiement (extraire du start_date)
payment_day = start_date.day
# Date de début pour le calcul (mois suivant)
current_date = start_date
# Avancer jusqu'au mois actuel
for _ in range(months_elapsed):
current_date = current_date.replace(day=1) + timedelta(days=32)
current_date = current_date.replace(day=payment_day)
cumulative_interest = 0
# Convertir extra_payments en dict par date
extra_by_date = {}
for ep in extra_payments:
ep_date = datetime.strptime(ep['date'], '%Y-%m-%d')
extra_by_date[ep_date.strftime('%Y-%m')] = float(ep['amount'])
# Calculer l'amortissement pour les mois restants
# Utiliser la durée contractuelle ou jusqu'à balance = 0 avec extra payments
max_months = months_remaining_contractual if not extra_payments else 600
while month < total_months and balance > 0.01:
month += 1
# Passer au mois suivant
current_date = current_date.replace(day=1) + timedelta(days=32)
current_date = current_date.replace(day=min(payment_day, 28)) # Gérer février
# Intérêts du mois
interest = balance * monthly_rate
principal = min(monthly_payment - interest, balance)
# Vérifier s'il y a un remboursement anticipé ce mois
date_key = current_date.strftime('%Y-%m')
extra = extra_by_date.get(date_key, 0)
if extra > 0:
principal += min(extra, balance - principal)
balance -= principal
cumulative_interest += interest
schedule.append({
"month": month,
"date": current_date.strftime('%Y-%m-%d'),
"payment": monthly_payment + extra,
"principal": principal,
"interest": interest,
"remainingBalance": max(0, balance),
"cumulativeInterest": cumulative_interest
})
if balance <= 0:
break
# Résumé
total_payments_remaining = sum(s['payment'] for s in schedule)
total_interest_remaining = cumulative_interest
total_principal_remaining = current_balance
months_remaining = months_remaining_contractual # Utiliser la durée contractuelle
projected_end_date = end_date.strftime('%Y-%m-%d') # Utiliser la date contractuelle
# Estimer les intérêts déjà payés
estimated_interest_paid = (months_elapsed * monthly_payment) - capital_paid
# Formater le résultat
result_text = f"📅 TABLEAU D'AMORTISSEMENT - {loan.get('name', loan_id)}\n\n"
result_text += f"INFORMATIONS DU CRÉDIT:\n"
result_text += f" Montant initial: {original_amount:,.2f} {currency}\n"
result_text += f" Date début: {start_date.strftime('%Y-%m-%d')}\n"
result_text += f" Date fin prévue: {end_date.strftime('%Y-%m-%d')}\n"
result_text += f" Taux annuel: {annual_rate*100:.2f}%\n"
result_text += f" Mensualité: {monthly_payment:,.2f} {currency}\n"
result_text += f"\n{'─'*60}\n"
result_text += f"PAIEMENTS DÉJÀ EFFECTUÉS:\n"
result_text += f" Mois écoulés: {months_elapsed}\n"
result_text += f" Capital remboursé: {capital_paid:,.2f} {currency}\n"
result_text += f" Intérêts payés (estimé): {estimated_interest_paid:,.2f} {currency}\n"
result_text += f" Total payé: {months_elapsed * monthly_payment:,.2f} {currency}\n"
result_text += f"\n{'─'*60}\n"
result_text += f"SOLDE ACTUEL:\n"
result_text += f" Reste à payer (capital): {current_balance:,.2f} {currency}\n"
result_text += f" Mois restants: {months_remaining}\n"
result_text += f" Date de fin: {projected_end_date}\n"
if extra_payments:
result_text += f"\n⚡ Remboursements anticipés simulés: {len(extra_payments)}\n"
result_text += f"\n{'─'*60}\n"
result_text += f"PROJECTION RESTANTE:\n"
result_text += f" Total à payer: {total_payments_remaining:,.2f} {currency}\n"
result_text += f" Dont capital: {total_principal_remaining:,.2f} {currency}\n"
result_text += f" Dont intérêts (estimé): {total_interest_remaining:,.2f} {currency}\n"
result_text += f"\n{'─'*60}\n"
result_text += f"COÛT TOTAL DU CRÉDIT (ESTIMÉ):\n"
total_cost = estimated_interest_paid + total_interest_remaining
result_text += f" Intérêts totaux: {total_cost:,.2f} {currency}\n"
result_text += f" Coût total: {original_amount + total_cost:,.2f} {currency}\n"
# Afficher un échantillon du tableau (premiers et derniers mois)
result_text += f"\n ÉCHÉANCIER (premiers et derniers mois):\n"
result_text += f"{'Mois':<6} {'Date':<12} {'Paiement':<12} {'Capital':<12} {'Intérêts':<12} {'Solde':<12}\n"
result_text += "-" * 72 + "\n"
# Premiers 12 mois
for s in schedule[:12]:
result_text += f"{s['month']:<6} {s['date']:<12} {s['payment']:>11,.0f} {s['principal']:>11,.0f} {s['interest']:>11,.0f} {s['remainingBalance']:>11,.0f}\n"
if len(schedule) > 24:
result_text += "...\n"
# Derniers 12 mois
for s in schedule[-12:]:
result_text += f"{s['month']:<6} {s['date']:<12} {s['payment']:>11,.0f} {s['principal']:>11,.0f} {s['interest']:>11,.0f} {s['remainingBalance']:>11,.0f}\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur: {str(e)}")]
elif name == "get_real_estate_valuations":
try:
property_id = arguments.get("propertyId")
data = get_real_estate_data()
valuations = data.get("valuations", [])
# Filtrer par propertyId si fourni
if property_id:
valuations = [v for v in valuations if v.get("propertyId") == property_id]
if not valuations:
msg = "Aucune valorisation trouvée"
if property_id:
msg += f" pour la propriété {property_id}"
return [TextContent(type="text", text=msg + ".")]
result_text = f"📈 HISTORIQUE DES VALORISATIONS ({len(valuations)}):\n\n"
# Grouper par propriété
by_property = {}
for val in valuations:
prop_id = val.get('propertyId')
if prop_id not in by_property:
by_property[prop_id] = []
by_property[prop_id].append(val)
for prop_id, vals in by_property.items():
# Trier par date
vals.sort(key=lambda x: x.get('valuationDate', ''))
result_text += f"Propriété: {prop_id}\n"
for val in vals:
result_text += f" 📅 {val.get('valuationDate', 'N/A')}: "
result_text += f"{float(val.get('value', 0)):,.2f} EUR"
if val.get('method'):
result_text += f" (Méthode: {val.get('method')})"
if val.get('notes'):
result_text += f"\n Notes: {val.get('notes')}"
result_text += "\n"
# Calculer l'évolution
if len(vals) >= 2:
first_val = float(vals[0].get('value', 0))
last_val = float(vals[-1].get('value', 0))
evolution = last_val - first_val
evolution_pct = (evolution / first_val * 100) if first_val > 0 else 0
result_text += f" Évolution: {evolution:+,.2f} EUR ({evolution_pct:+.2f}%)\n"
result_text += "\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur: {str(e)}")]
elif name == "get_accounts_summary":
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
# Récupérer le résumé par groupe de comptes (seulement les comptes actifs)
# Utilise la dernière date de valorisation pour CHAQUE compte
c.execute("""
SELECT
a."group" as groupe,
COUNT(DISTINCT a.id) as nb_comptes,
SUM(CAST(v.total_value AS REAL)) as valeur_totale
FROM daily_account_valuation v
JOIN accounts a ON v.account_id = a.id
WHERE v.valuation_date = (
SELECT MAX(valuation_date)
FROM daily_account_valuation v2
WHERE v2.account_id = v.account_id
)
AND a.id != 'TOTAL'
AND a.is_active = 1
GROUP BY a."group"
ORDER BY valeur_totale DESC
""")
groups = c.fetchall()
# Récupérer la date de valorisation
c.execute("""
SELECT MAX(valuation_date) as date
FROM daily_account_valuation
""")
valuation_date = c.fetchone()['date']
# Récupérer le détail des comptes (seulement les comptes actifs)
# Utilise la dernière date de valorisation pour CHAQUE compte
c.execute("""
SELECT
a.name,
a."group" as groupe,
a.account_type,
CAST(v.total_value AS REAL) as valeur,
v.base_currency
FROM daily_account_valuation v
JOIN accounts a ON v.account_id = a.id
WHERE v.valuation_date = (
SELECT MAX(valuation_date)
FROM daily_account_valuation v2
WHERE v2.account_id = v.account_id
)
AND a.id != 'TOTAL'
AND a.is_active = 1
ORDER BY a."group", valeur DESC
""")
accounts = c.fetchall()
conn.close()
if not groups:
return [TextContent(type="text", text="Aucun compte trouvé.")]
# Calculer le total
total_value = sum(g['valeur_totale'] for g in groups)
currency = accounts[0]['base_currency'] if accounts else "EUR"
# Formater le résultat
result_text = f"📊 RÉSUMÉ PAR GROUPE DE COMPTES\n"
result_text += f"Date de valorisation: {valuation_date}\n"
result_text += f"{'='*60}\n\n"
# Résumé par groupe
result_text += f"PAR GROUPE:\n"
for group in groups:
valeur = group['valeur_totale']
pct = (valeur / total_value * 100) if total_value > 0 else 0
result_text += f"\n {group['groupe'] or 'Sans groupe'}:\n"
result_text += f" Nombre de comptes: {group['nb_comptes']}\n"
result_text += f" Valeur: {valeur:,.2f} {currency} ({pct:.1f}%)\n"
# Total
result_text += f"\n{'─'*60}\n"
result_text += f"TOTAL: {total_value:,.2f} {currency}\n"
# Détail des comptes par groupe
result_text += f"\n{'─'*60}\n"
result_text += f"\nDÉTAIL PAR COMPTE:\n"
current_group = None
for acc in accounts:
# Nouveau groupe
if acc['groupe'] != current_group:
current_group = acc['groupe']
result_text += f"\n{current_group or 'Sans groupe'}:\n"
pct = (acc['valeur'] / total_value * 100) if total_value > 0 else 0
result_text += f" • {acc['name']}: {acc['valeur']:,.2f} {currency} ({pct:.1f}%)\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur: {str(e)}")]
elif name == "get_holdings_detail":
try:
include_cash = arguments.get("include_cash", False)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
# Récupérer tous les holdings avec leurs détails et le dernier prix
c.execute("""
SELECT
a.symbol,
a.name,
a.asset_type,
a.asset_class,
a.asset_sub_class,
a.sectors,
a.currency,
a.isin,
CAST(json_extract(positions.value, '$.quantity') AS REAL) as quantity,
CAST(json_extract(positions.value, '$.averageCost') AS REAL) as avg_cost,
CAST(json_extract(positions.value, '$.totalCostBasis') AS REAL) as book_value,
COALESCE(q.close, json_extract(positions.value, '$.averageCost')) as market_price
FROM holdings_snapshots hs,
json_each(hs.positions) AS positions
JOIN assets a ON json_extract(positions.value, '$.assetId') = a.id
LEFT JOIN (
SELECT symbol, CAST(close AS REAL) as close
FROM quotes q1
WHERE timestamp = (SELECT MAX(timestamp) FROM quotes q2 WHERE q2.symbol = q1.symbol)
) q ON a.symbol = q.symbol
WHERE hs.snapshot_date = (SELECT MAX(snapshot_date) FROM holdings_snapshots)
AND CAST(json_extract(positions.value, '$.quantity') AS REAL) > 0
ORDER BY (CAST(json_extract(positions.value, '$.quantity') AS REAL) *
COALESCE(q.close, json_extract(positions.value, '$.averageCost'))) DESC
""")
holdings = c.fetchall()
conn.close()
if not holdings:
return [TextContent(type="text", text="Aucune position trouvée.")]
# Filtrer le cash si demandé
if not include_cash:
holdings = [h for h in holdings if h['asset_type'] not in ('CASH', 'Cash')]
# Calculer le total (quantity * market_price)
total_value = sum((float(h['quantity'] or 0) * float(h['market_price'] or 0)) for h in holdings)
# Formater le résultat
result_text = f"📊 DÉTAIL DES POSITIONS ({len(holdings)} produits)\n"
result_text += f"Valeur totale: {total_value:,.2f} EUR\n"
result_text += f"{'='*100}\n\n"
# Grouper par type d'actif
by_type = {}
for h in holdings:
asset_type = h['asset_type'] or 'Autre'
if asset_type not in by_type:
by_type[asset_type] = []
by_type[asset_type].append(h)
# Afficher par catégorie
for asset_type, positions in sorted(by_type.items(),
key=lambda x: sum((float(p['quantity'] or 0) * float(p['market_price'] or 0)) for p in x[1]),
reverse=True):
type_value = sum((float(p['quantity'] or 0) * float(p['market_price'] or 0)) for p in positions)
type_pct = (type_value / total_value * 100) if total_value > 0 else 0
result_text += f"\n{asset_type.upper()}: {type_value:,.2f} EUR ({type_pct:.1f}%)\n"
result_text += f"{'-'*100}\n"
for h in positions:
quantity = float(h['quantity'] or 0)
price = float(h['market_price'] or 0)
value = quantity * price
pct = (value / total_value * 100) if total_value > 0 else 0
# Nom du produit (limité à 40 caractères)
name = (h['name'][:37] + '...') if len(h['name']) > 40 else h['name']
# Secteur/Classe
sector_info = h['sectors'] or h['asset_class'] or 'N/A'
if sector_info and len(sector_info) > 20:
sector_info = sector_info[:17] + '...'
result_text += f" {h['symbol'] or 'N/A':<12} {name:<40}\n"
result_text += f" Quantité: {quantity:>10.2f} Prix: {price:>10.2f} EUR Valeur: {value:>12,.2f} EUR ({pct:.1f}%)\n"
result_text += f" Classe: {h['asset_class'] or 'N/A':<20} Secteur: {sector_info:<20} ISIN: {h['isin'] or 'N/A'}\n"
# Plus-value
avg_cost = float(h['avg_cost'] or 0)
if avg_cost > 0 and price > 0:
gain = value - (quantity * avg_cost)
gain_pct = ((price - avg_cost) / avg_cost * 100)
gain_symbol = '+' if gain >= 0 else ''
result_text += f" PRU: {avg_cost:.2f} EUR Plus-value: {gain_symbol}{gain:,.2f} EUR ({gain_symbol}{gain_pct:.1f}%)\n"
result_text += "\n"
# Résumé de diversification
result_text += f"\n{'='*100}\n"
result_text += f"ANALYSE DE DIVERSIFICATION:\n\n"
# Par type d'actif
result_text += f"Par type d'actif:\n"
for asset_type, positions in sorted(by_type.items(),
key=lambda x: sum((float(p['quantity'] or 0) * float(p['market_price'] or 0)) for p in x[1]),
reverse=True):
type_value = sum((float(p['quantity'] or 0) * float(p['market_price'] or 0)) for p in positions)
type_pct = (type_value / total_value * 100) if total_value > 0 else 0
result_text += f" {asset_type:<20}: {type_value:>12,.2f} EUR ({type_pct:>5.1f}%) - {len(positions)} produit(s)\n"
# Par classe d'actif
by_class = {}
for h in holdings:
asset_class = h['asset_class'] or 'Non classé'
if asset_class not in by_class:
by_class[asset_class] = 0
by_class[asset_class] += (float(h['quantity'] or 0) * float(h['market_price'] or 0))
result_text += f"\nPar classe d'actif:\n"
for asset_class, value in sorted(by_class.items(), key=lambda x: x[1], reverse=True):
pct = (value / total_value * 100) if total_value > 0 else 0
result_text += f" {asset_class:<20}: {value:>12,.2f} EUR ({pct:>5.1f}%)\n"
# Nombre de lignes
result_text += f"\nNombre total de lignes: {len(holdings)}\n"
result_text += f"Valeur moyenne par ligne: {total_value / len(holdings):,.2f} EUR\n"
return [TextContent(type="text", text=result_text)]
except Exception as e:
return [TextContent(type="text", text=f"Erreur: {str(e)}")]
elif name == "get_all_holdings":
try:
include_cash = arguments.get("include_cash", False)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
# 1. HOLDINGS FINANCIERS
# Calculate holdings from activities table to get accurate account attribution
# Then join with TOTAL holdings to get current market prices
c.execute("""
WITH account_holdings AS (
SELECT
acc.id as account_id,
acc.name as account_name,
acc."group" as account_group,
acc.account_type,
a.id as asset_id,
a.symbol,
a.name,
a.asset_type,
a.asset_class,
a.asset_sub_class,
a.sectors,
a.currency,
a.isin,
SUM(CAST(act.quantity AS REAL)) as quantity,
AVG(CAST(act.unit_price AS REAL)) as avg_cost,
SUM(CAST(act.quantity AS REAL) * CAST(act.unit_price AS REAL)) as book_value
FROM activities act
JOIN assets a ON act.asset_id = a.id
JOIN accounts acc ON act.account_id = acc.id
WHERE acc.is_active = 1
GROUP BY acc.id, acc.name, acc."group", acc.account_type,
a.id, a.symbol, a.name, a.asset_type, a.asset_class,
a.asset_sub_class, a.sectors, a.currency, a.isin
HAVING SUM(CAST(act.quantity AS REAL)) > 0
)
SELECT
ah.*,
COALESCE(q.close, ah.avg_cost) as market_price,
q.timestamp as last_quote_date,
(SELECT MAX(snapshot_date) FROM holdings_snapshots) as snapshot_date
FROM account_holdings ah
LEFT JOIN (
SELECT symbol, CAST(close AS REAL) as close, timestamp
FROM quotes q1
WHERE timestamp = (SELECT MAX(timestamp) FROM quotes q2 WHERE q2.symbol = q1.symbol)
) q ON ah.symbol = q.symbol
ORDER BY (ah.quantity * COALESCE(q.close, ah.avg_cost)) DESC
""")
holdings = c.fetchall()
conn.close()
# Filtrer le cash si demandé
if not include_cash:
holdings = [h for h in holdings if h['asset_type'] not in ('CASH', 'Cash')]
# Construire la liste des holdings financiers
financial_holdings = []
total_financial = 0
for h in holdings:
quantity = float(h['quantity'] or 0)
price = float(h['market_price'] or 0)
avg_cost = float(h['avg_cost'] or 0)
book_value = float(h['book_value'] or 0)
value = quantity * price
total_financial += value
gain = value - book_value if book_value > 0 else (value - (quantity * avg_cost) if avg_cost > 0 else 0)
gain_pct = ((price - avg_cost) / avg_cost * 100) if avg_cost > 0 else 0
financial_holdings.append({
"account": {
"id": h['account_id'],
"name": h['account_name'],
"group": h['account_group'],
"type": h['account_type']
},
"asset": {
"id": h['asset_id'],
"symbol": h['symbol'],
"name": h['name'],
"type": h['asset_type'],
"class": h['asset_class'],
"sub_class": h['asset_sub_class'],
"sectors": h['sectors'],
"currency": h['currency'],
"isin": h['isin']
},
"holding": {
"quantity": quantity,
"average_cost": avg_cost,
"book_value": book_value,
"market_price": price,
"market_value": value,
"gain_loss": gain,
"gain_loss_pct": gain_pct,
"last_quote_date": h['last_quote_date'],
"snapshot_date": h['snapshot_date']
}
})
# 2. IMMOBILIER
re_data = get_real_estate_data()
properties = re_data.get("properties", [])
loans = re_data.get("loans", [])
# Calculer les totaux immobiliers
total_real_estate_gross = sum(float(p.get('currentValue', 0)) for p in properties)
total_debt = sum(float(l.get('currentBalance', 0)) for l in loans)
total_real_estate_net = total_real_estate_gross - total_debt
# 3. CONSTRUIRE LA RÉPONSE JSON
response_data = {
"financial_holdings": financial_holdings,
"real_estate": {
"properties": properties,
"total_value": total_real_estate_gross,
"total_debt": total_debt,
"net_value": total_real_estate_net
},
"loans": loans,
"summary": {
"total_financial_assets": total_financial,
"total_real_estate_gross": total_real_estate_gross,
"total_debt": total_debt,
"total_net_worth": total_financial + total_real_estate_net
}
}
# Retourner le JSON formaté
result_json = json.dumps(response_data, indent=2, ensure_ascii=False)
return [TextContent(type="text", text=result_json)]
except Exception as e:
# En cas d'erreur, retourner au moins les holdings financiers si disponibles
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("""
WITH account_holdings AS (
SELECT
acc.id as account_id,
acc.name as account_name,
acc."group" as account_group,
acc.account_type,
a.id as asset_id,
a.symbol,
a.name,
a.asset_type,
a.asset_class,
a.asset_sub_class,
a.sectors,
a.currency,
a.isin,
SUM(CAST(act.quantity AS REAL)) as quantity,
AVG(CAST(act.unit_price AS REAL)) as avg_cost,
SUM(CAST(act.quantity AS REAL) * CAST(act.unit_price AS REAL)) as book_value
FROM activities act
JOIN assets a ON act.asset_id = a.id
JOIN accounts acc ON act.account_id = acc.id
WHERE acc.is_active = 1
GROUP BY acc.id, acc.name, acc."group", acc.account_type,
a.id, a.symbol, a.name, a.asset_type, a.asset_class,
a.asset_sub_class, a.sectors, a.currency, a.isin
HAVING SUM(CAST(act.quantity AS REAL)) > 0
)
SELECT
ah.*,
COALESCE(q.close, ah.avg_cost) as market_price,
q.timestamp as last_quote_date,
(SELECT MAX(snapshot_date) FROM holdings_snapshots) as snapshot_date
FROM account_holdings ah
LEFT JOIN (
SELECT symbol, CAST(close AS REAL) as close, timestamp
FROM quotes q1
WHERE timestamp = (SELECT MAX(timestamp) FROM quotes q2 WHERE q2.symbol = q1.symbol)
) q ON ah.symbol = q.symbol
""")
holdings = c.fetchall()
conn.close()
financial_holdings = []
total_financial = 0
for h in holdings:
quantity = float(h['quantity'] or 0)
price = float(h['market_price'] or 0)
avg_cost = float(h['avg_cost'] or 0)
book_value = float(h['book_value'] or 0)
value = quantity * price
total_financial += value
gain = value - book_value if book_value > 0 else (value - (quantity * avg_cost) if avg_cost > 0 else 0)
gain_pct = ((price - avg_cost) / avg_cost * 100) if avg_cost > 0 else 0
financial_holdings.append({
"account": {
"id": h['account_id'],
"name": h['account_name'],
"group": h['account_group'],
"type": h['account_type']
},
"asset": {
"id": h['asset_id'],
"symbol": h['symbol'],
"name": h['name'],
"type": h['asset_type'],
"class": h['asset_class'],
"sub_class": h['asset_sub_class'],
"sectors": h['sectors'],
"currency": h['currency'],
"isin": h['isin']
},
"holding": {
"quantity": quantity,
"average_cost": avg_cost,
"book_value": book_value,
"market_price": price,
"market_value": value,
"gain_loss": gain,
"gain_loss_pct": gain_pct,
"last_quote_date": h['last_quote_date'],
"snapshot_date": h['snapshot_date']
}
})
error_response = {
"financial_holdings": financial_holdings,
"real_estate": {
"properties": [],
"total_value": 0,
"total_debt": 0,
"net_value": 0
},
"loans": [],
"summary": {
"total_financial_assets": total_financial,
"total_real_estate_gross": 0,
"total_debt": 0,
"total_net_worth": total_financial
},
"error": f"Error loading real estate data: {str(e)}"
}
return [TextContent(type="text", text=json.dumps(error_response, indent=2, ensure_ascii=False))]
except Exception as inner_e:
return [TextContent(type="text", text=json.dumps({
"error": f"Critical error: {str(inner_e)}"
}, indent=2))]
else:
return [TextContent(type="text", text=f"Outil inconnu: {name}")]
async def main():
"""Point d'entrée principal."""
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())