Skip to main content
Glama
datamoc

Wealthfolio MCP Server

by datamoc
mcp_server.py79.6 kB
#!/usr/bin/env python3 import os import sys import sqlite3 import asyncio import json from mcp.server import Server from mcp.types import Tool, TextContent from mcp.server.stdio import stdio_server from dotenv import load_dotenv from datetime import datetime, timedelta from typing import Optional, Dict, List, Any # Charge les variables d'environnement load_dotenv() # Chemin de la base de données DB_PATH = os.getenv("DB_PATH") if not DB_PATH or not os.path.exists(DB_PATH): print(f"ERREUR: Base de données non trouvée : {DB_PATH}", file=sys.stderr) sys.exit(1) # Créer le serveur MCP app = Server("wealthfolio") def get_real_estate_data() -> Dict[str, Any]: """ Récupère les données immobilières depuis app_settings. Returns: Dict contenant properties, loans, valuations, loanEvents Retourne une structure vide si l'addon n'est pas installé """ empty_data = { "properties": [], "loans": [], "valuations": [], "loanEvents": [], "version": "1.0.0" } try: conn = sqlite3.connect(DB_PATH) c = conn.cursor() # Vérifier si la table app_settings existe c.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='app_settings' """) if not c.fetchone(): conn.close() return empty_data # Récupérer les données immobilières # Try new key first, then fall back to old key for compatibility c.execute(""" SELECT setting_value FROM app_settings WHERE setting_key = 'addon:real-estate:real-estate-data' """) result = c.fetchone() # If no result with new key, try old key if not result or not result[0]: c.execute(""" SELECT setting_value FROM app_settings WHERE setting_key = 'addon:real-estate-addon:real-estate-data' """) result = c.fetchone() conn.close() # Si pas de résultat ou valeur vide if not result or not result[0]: return empty_data # Parser le JSON data = json.loads(result[0]) # Valider la structure et ajouter les clés manquantes if not isinstance(data, dict): return empty_data # S'assurer que toutes les clés existent for key in ["properties", "loans", "valuations", "loanEvents"]: if key not in data: data[key] = [] if "version" not in data: data["version"] = "1.0.0" return data except json.JSONDecodeError: # JSON invalide - retourner structure vide sans erreur return empty_data except sqlite3.OperationalError: # Erreur SQL (table inexistante, etc.) - retourner structure vide return empty_data except Exception: # Toute autre erreur - retourner structure vide return empty_data @app.list_tools() async def list_tools() -> list[Tool]: """Liste les outils disponibles.""" return [ Tool( name="get_portfolio_summary", description="Retourne la valeur totale du portefeuille et la répartition par type d'actif", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="run_sql_query", description="Exécute une requête SELECT sur la base de données Wealthfolio. Limites: 5000 lignes pour les cotations, 200 lignes pour les autres tables.", inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "Requête SQL SELECT à exécuter" } }, "required": ["sql"] } ), Tool( name="create_activity", description="Crée une nouvelle activité (transaction) dans le portefeuille. Utile pour créer des positions d'ouverture ou enregistrer des transactions.", inputSchema={ "type": "object", "properties": { "account_id": { "type": "string", "description": "ID du compte" }, "asset_id": { "type": "string", "description": "ID de l'actif (symbole)" }, "activity_type": { "type": "string", "description": "Type d'activité: BUY, SELL, DIVIDEND, INTEREST, FEE, TRANSFER_IN, TRANSFER_OUT", "enum": ["BUY", "SELL", "DIVIDEND", "INTEREST", "FEE", "TRANSFER_IN", "TRANSFER_OUT"] }, "activity_date": { "type": "string", "description": "Date de l'activité (format: YYYY-MM-DD)" }, "quantity": { "type": "number", "description": "Quantité (positif pour achats/entrées, négatif pour ventes/sorties)" }, "unit_price": { "type": "number", "description": "Prix unitaire" }, "currency": { "type": "string", "description": "Devise (USD, EUR, etc.)" }, "fee": { "type": "number", "description": "Frais de transaction (optionnel, défaut: 0)" }, "comment": { "type": "string", "description": "Commentaire optionnel" } }, "required": ["account_id", "asset_id", "activity_type", "activity_date", "quantity", "unit_price", "currency"] } ), Tool( name="create_asset", description="Crée un nouvel actif dans la base de données. Nécessaire avant de créer des activités pour un symbole non existant.", inputSchema={ "type": "object", "properties": { "symbol": { "type": "string", "description": "Symbole de l'actif (ex: AAPL, MSFT)" }, "name": { "type": "string", "description": "Nom de l'actif (ex: Apple Inc.)" }, "asset_type": { "type": "string", "description": "Type d'actif: Stock, ETF, Bond, Cryptocurrency, etc." }, "currency": { "type": "string", "description": "Devise de cotation (USD, EUR, etc.)" }, "isin": { "type": "string", "description": "Code ISIN (optionnel)" }, "asset_class": { "type": "string", "description": "Classe d'actif (optionnel)" } }, "required": ["symbol", "name", "asset_type", "currency"] } ), Tool( name="execute_write_query", description="Exécute une requête SQL d'écriture (INSERT, UPDATE, DELETE). À utiliser avec précaution! Requiert confirmation explicite.", inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "Requête SQL à exécuter (INSERT, UPDATE, DELETE)" }, "confirm": { "type": "boolean", "description": "Confirmation explicite requise (doit être true)" } }, "required": ["sql", "confirm"] } ), Tool( name="fetch_and_update_quotes", description="Télécharge automatiquement les cotations depuis Yahoo Finance et met à jour la table quotes. Économise temps et tokens!", inputSchema={ "type": "object", "properties": { "symbols": { "type": "array", "items": {"type": "string"}, "description": "Liste des symboles à mettre à jour. Si vide, récupère tous les symboles du portefeuille." }, "start_date": { "type": "string", "description": "Date de début pour l'historique (format: YYYY-MM-DD). Par défaut: 30 derniers jours" }, "end_date": { "type": "string", "description": "Date de fin pour l'historique (format: YYYY-MM-DD). Par défaut: aujourd'hui" }, "interval": { "type": "string", "enum": ["1d", "1wk", "1mo"], "description": "Intervalle des données: 1d (quotidien), 1wk (hebdo), 1mo (mensuel). Par défaut: 1d" } }, "required": [] } ), Tool( name="get_real_estate_properties", description="Récupère la liste des propriétés immobilières avec leurs détails (adresse, prix d'achat, valeur actuelle, etc.)", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="get_real_estate_loans", description="Récupère les crédits immobiliers, optionnellement filtrés par propriété. Affiche les détails: montant, taux, échéances, etc.", inputSchema={ "type": "object", "properties": { "propertyId": { "type": "string", "description": "ID de la propriété pour filtrer les crédits (optionnel)" } }, "required": [] } ), Tool( name="get_real_estate_summary", description="Retourne une synthèse complète du patrimoine immobilier: nombre de propriétés, valeur totale, dettes, valeur nette", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="get_total_net_worth", description="Calcule le patrimoine net total en combinant les actifs financiers et le patrimoine immobilier net des dettes", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="get_loan_amortization", description="Calcule le tableau d'amortissement détaillé d'un crédit avec possibilité de simuler des remboursements anticipés", inputSchema={ "type": "object", "properties": { "loanId": { "type": "string", "description": "ID du crédit à analyser" }, "extraPayments": { "type": "array", "items": { "type": "object", "properties": { "date": { "type": "string", "description": "Date du remboursement anticipé (format: YYYY-MM-DD)" }, "amount": { "type": "number", "description": "Montant du remboursement anticipé" } }, "required": ["date", "amount"] }, "description": "Liste optionnelle de remboursements anticipés à simuler" } }, "required": ["loanId"] } ), Tool( name="get_real_estate_valuations", description="Récupère l'historique des valorisations des propriétés immobilières, optionnellement filtré par propriété", inputSchema={ "type": "object", "properties": { "propertyId": { "type": "string", "description": "ID de la propriété pour filtrer les valorisations (optionnel)" } }, "required": [] } ), Tool( name="get_accounts_summary", description="Retourne un résumé détaillé par groupe de comptes (Investissement, Retraite, etc.) avec la valorisation à la dernière date", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="get_holdings_detail", description="Liste détaillée de tous les produits financiers détenus avec quantités, prix, valorisation, type d'actif, classe, secteur. Essentiel pour faire des recommandations et analyser la diversification.", inputSchema={ "type": "object", "properties": { "include_cash": { "type": "boolean", "description": "Inclure les positions cash (défaut: false)" } }, "required": [] } ), Tool( name="get_all_holdings", description="Retourne un objet JSON structuré avec TOUS les actifs et passifs: financial_holdings (actions, ETF, etc.), real_estate (propriétés + totaux), loans (crédits), et summary (synthèse globale). Parfait pour analyse patrimoniale complète.", inputSchema={ "type": "object", "properties": { "include_cash": { "type": "boolean", "description": "Inclure les positions cash dans les holdings financiers (défaut: false)" } }, "required": [] } ) ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Exécute un outil.""" if name == "get_portfolio_summary": try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() # 1. Actifs financiers - Utiliser la dernière date uniquement c.execute(""" SELECT CAST(total_value AS REAL) as total_value, base_currency FROM daily_account_valuation WHERE account_id = 'TOTAL' AND valuation_date = (SELECT MAX(valuation_date) FROM daily_account_valuation WHERE account_id = 'TOTAL') """) result = c.fetchone() financial_assets = result['total_value'] if result else 0 currency = result['base_currency'] if result else 'EUR' # 2. Répartition par type d'actif financier c.execute(""" SELECT a.asset_type, SUM( CAST(json_extract(value, '$.quantity') AS REAL) * COALESCE(CAST((SELECT close FROM quotes WHERE symbol = json_extract(value, '$.assetId') ORDER BY timestamp DESC LIMIT 1) AS REAL), 0) ) as market_value FROM holdings_snapshots hs, json_each(hs.positions) AS positions JOIN assets a ON json_extract(positions.value, '$.assetId') = a.id WHERE hs.snapshot_date = (SELECT MAX(snapshot_date) FROM holdings_snapshots) AND json_extract(positions.value, '$.quantity') > 0 GROUP BY a.asset_type """) allocation = {row['asset_type']: row['market_value'] for row in c.fetchall() if row['market_value']} conn.close() # 3. Patrimoine immobilier (optionnel - peut ne pas exister) try: re_data = get_real_estate_data() properties = re_data.get('properties', []) loans = re_data.get('loans', []) real_estate_gross = sum(float(p.get('currentValue', 0)) for p in properties if p.get('currentValue')) real_estate_debt = sum(float(l.get('currentBalance', 0)) for l in loans if l.get('currentBalance')) real_estate_net = real_estate_gross - real_estate_debt except Exception: # Si erreur, continuer sans données immobilières real_estate_gross = 0 real_estate_debt = 0 real_estate_net = 0 # 4. Total combiné total_net_worth = financial_assets + real_estate_net has_real_estate = real_estate_gross > 0 or real_estate_debt > 0 # Formater le résultat result_text = f"💼 PATRIMOINE TOTAL: {total_net_worth:,.2f} {currency}\n" result_text += f"{'='*60}\n\n" result_text += f"Actifs financiers: {financial_assets:,.2f} {currency}\n" # N'afficher l'immobilier que s'il y en a if has_real_estate: result_text += f"Immobilier net: {real_estate_net:,.2f} {currency}\n" if real_estate_gross > 0: result_text += f" (Valeur brute: {real_estate_gross:,.2f} {currency}\n" result_text += f" Dette: -{real_estate_debt:,.2f} {currency})\n" result_text += f"\n{'─'*60}\n\n" # Répartition financier vs immobilier (seulement si immobilier présent) if has_real_estate and total_net_worth > 0: fin_pct = (financial_assets / total_net_worth) * 100 re_pct = (real_estate_net / total_net_worth) * 100 result_text += f"RÉPARTITION GLOBALE:\n" result_text += f" Financier: {fin_pct:.1f}%\n" result_text += f" Immobilier: {re_pct:.1f}%\n\n" # Répartition par type d'actif financier if allocation: result_text += f"ACTIFS FINANCIERS PAR TYPE:\n" for asset_type, value in sorted(allocation.items(), key=lambda x: x[1], reverse=True): percentage = (value / financial_assets * 100) if financial_assets > 0 else 0 result_text += f" {asset_type}: {value:,.2f} {currency} ({percentage:.1f}%)\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur: {str(e)}")] elif name == "run_sql_query": sql = arguments.get("sql", "").strip() if not sql.upper().startswith("SELECT"): return [TextContent(type="text", text="Erreur: Uniquement les requêtes SELECT sont autorisées.")] try: conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(sql) rows = c.fetchall() columns = [desc[0] for desc in c.description] if c.description else [] conn.close() # Déterminer la limite basée sur le type de requête # Pour les cotations: 5000 lignes (300 jours/an × 10 ans × symboles multiples) # Pour les autres tables: 200 lignes sql_lower = sql.lower() if 'from quotes' in sql_lower or 'join quotes' in sql_lower: limit = 5000 else: limit = 200 total_rows = len(rows) rows = rows[:limit] if not rows: return [TextContent(type="text", text="Aucun résultat trouvé.")] # Formater les résultats result_text = f"Colonnes: {', '.join(columns)}\n\n" result_text += f"Résultats ({len(rows)} lignes" if total_rows > limit: result_text += f", {total_rows - limit} lignes tronquées" result_text += "):\n" for row in rows: result_text += " " + " | ".join(str(v) for v in row) + "\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur SQL: {str(e)}")] elif name == "create_activity": import uuid from datetime import datetime try: # Validation des paramètres account_id = arguments.get("account_id") asset_id = arguments.get("asset_id") activity_type = arguments.get("activity_type") activity_date = arguments.get("activity_date") quantity = arguments.get("quantity") unit_price = arguments.get("unit_price") currency = arguments.get("currency") fee = arguments.get("fee", 0) comment = arguments.get("comment", "") # Validation de la date try: datetime.strptime(activity_date, "%Y-%m-%d") except ValueError: return [TextContent(type="text", text="Erreur: Format de date invalide. Utilisez YYYY-MM-DD")] # Calcul du montant amount = float(quantity) * float(unit_price) if activity_type in ["SELL"]: amount = -amount # Connexion et insertion conn = sqlite3.connect(DB_PATH) c = conn.cursor() # Vérifier que le compte existe c.execute("SELECT id FROM accounts WHERE id = ?", (account_id,)) if not c.fetchone(): conn.close() return [TextContent(type="text", text=f"Erreur: Le compte '{account_id}' n'existe pas")] # Vérifier que l'actif existe c.execute("SELECT id FROM assets WHERE id = ?", (asset_id,)) if not c.fetchone(): conn.close() return [TextContent(type="text", text=f"Erreur: L'actif '{asset_id}' n'existe pas. Créez-le d'abord avec create_asset")] # Générer un ID unique activity_id = str(uuid.uuid4()) created_at = datetime.now().isoformat() # Insérer l'activité c.execute(""" INSERT INTO activities ( id, account_id, asset_id, activity_type, activity_date, quantity, unit_price, currency, fee, amount, is_draft, comment, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( activity_id, account_id, asset_id, activity_type, activity_date, str(quantity), str(unit_price), currency, str(fee), str(amount), 0, comment, created_at, created_at )) conn.commit() conn.close() result_text = f"Activité créée avec succès!\n\n" result_text += f"ID: {activity_id}\n" result_text += f"Type: {activity_type}\n" result_text += f"Actif: {asset_id}\n" result_text += f"Quantité: {quantity}\n" result_text += f"Prix: {unit_price} {currency}\n" result_text += f"Montant: {amount} {currency}\n" result_text += f"Date: {activity_date}\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur lors de la création de l'activité: {str(e)}")] elif name == "create_asset": import uuid from datetime import datetime try: symbol = arguments.get("symbol") name = arguments.get("name") asset_type = arguments.get("asset_type") currency = arguments.get("currency") isin = arguments.get("isin", "") asset_class = arguments.get("asset_class", "") conn = sqlite3.connect(DB_PATH) c = conn.cursor() # Vérifier si l'actif existe déjà c.execute("SELECT id FROM assets WHERE id = ? OR symbol = ?", (symbol, symbol)) existing = c.fetchone() if existing: conn.close() return [TextContent(type="text", text=f"Erreur: L'actif '{symbol}' existe déjà")] # Créer l'actif asset_id = symbol # Utiliser le symbole comme ID created_at = datetime.now().isoformat() c.execute(""" INSERT INTO assets ( id, symbol, name, asset_type, currency, isin, asset_class, data_source, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( asset_id, symbol, name, asset_type, currency, isin, asset_class, "MANUAL", created_at, created_at )) conn.commit() conn.close() result_text = f"Actif créé avec succès!\n\n" result_text += f"Symbole: {symbol}\n" result_text += f"Nom: {name}\n" result_text += f"Type: {asset_type}\n" result_text += f"Devise: {currency}\n" if isin: result_text += f"ISIN: {isin}\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur lors de la création de l'actif: {str(e)}")] elif name == "execute_write_query": sql = arguments.get("sql", "").strip() confirm = arguments.get("confirm", False) if not confirm: return [TextContent(type="text", text="Erreur: Confirmation requise. Passez 'confirm: true' pour exécuter cette requête.")] # Validation: interdire les DROP et TRUNCATE sql_upper = sql.upper() if "DROP" in sql_upper or "TRUNCATE" in sql_upper: return [TextContent(type="text", text="Erreur: Les commandes DROP et TRUNCATE sont interdites pour la sécurité.")] # S'assurer que c'est bien une requête d'écriture if not any(cmd in sql_upper for cmd in ["INSERT", "UPDATE", "DELETE"]): return [TextContent(type="text", text="Erreur: Seules les requêtes INSERT, UPDATE, DELETE sont autorisées.")] try: conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(sql) rows_affected = c.rowcount conn.commit() conn.close() result_text = f"Requête exécutée avec succès!\n" result_text += f"Lignes affectées: {rows_affected}\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur SQL: {str(e)}")] elif name == "fetch_and_update_quotes": import yfinance as yf from datetime import datetime, timedelta import uuid try: # Récupérer les paramètres symbols = arguments.get("symbols", []) start_date = arguments.get("start_date") end_date = arguments.get("end_date") interval = arguments.get("interval", "1d") # Si aucun symbole fourni, récupérer tous les symboles du portefeuille if not symbols: conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT DISTINCT symbol FROM assets WHERE symbol IS NOT NULL") symbols = [row[0] for row in c.fetchall()] conn.close() if not symbols: return [TextContent(type="text", text="Aucun symbole trouvé dans le portefeuille.")] # Définir les dates par défaut if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") if not start_date: # Par défaut: 30 jours avant end_date start_dt = datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=30) start_date = start_dt.strftime("%Y-%m-%d") # Télécharger les cotations result_text = f"Téléchargement des cotations pour {len(symbols)} symboles...\n" result_text += f"Période: {start_date} à {end_date}\n" result_text += f"Intervalle: {interval}\n\n" conn = sqlite3.connect(DB_PATH) c = conn.cursor() success_count = 0 error_count = 0 total_quotes = 0 errors = [] for symbol in symbols: try: # Télécharger les données ticker = yf.Ticker(symbol) hist = ticker.history(start=start_date, end=end_date, interval=interval) if hist.empty: error_count += 1 errors.append(f" [!] {symbol}: Aucune donnée disponible") continue # Insérer les données dans la base quotes_added = 0 for date, row in hist.iterrows(): quote_id = str(uuid.uuid4()) timestamp = date.strftime("%Y-%m-%dT%H:%M:%S.000Z") # Vérifier si la cotation existe déjà c.execute(""" SELECT id FROM quotes WHERE symbol = ? AND date(timestamp) = date(?) """, (symbol, timestamp)) if c.fetchone(): # Mise à jour c.execute(""" UPDATE quotes SET open = ?, high = ?, low = ?, close = ?, volume = ? WHERE symbol = ? AND date(timestamp) = date(?) """, ( str(row['Open']), str(row['High']), str(row['Low']), str(row['Close']), str(int(row['Volume'])), symbol, timestamp )) else: # Insertion c.execute(""" INSERT INTO quotes (id, symbol, timestamp, open, high, low, close, volume) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( quote_id, symbol, timestamp, str(row['Open']), str(row['High']), str(row['Low']), str(row['Close']), str(int(row['Volume'])) )) quotes_added += 1 conn.commit() success_count += 1 total_quotes += quotes_added result_text += f" [OK] {symbol}: {quotes_added} cotations\n" except Exception as e: error_count += 1 error_msg = str(e) # Limiter la longueur du message d'erreur if len(error_msg) > 100: error_msg = error_msg[:100] + "..." errors.append(f" [X] {symbol}: {error_msg}") conn.close() # Résumé result_text += f"\nRésumé:\n" result_text += f" - Symboles traités avec succès: {success_count}/{len(symbols)}\n" result_text += f" - Total cotations ajoutées/mises à jour: {total_quotes}\n" result_text += f" - Erreurs: {error_count}\n" if errors: result_text += f"\nDétails des erreurs:\n" # Limiter à 10 erreurs pour éviter un texte trop long for error in errors[:10]: result_text += error + "\n" if len(errors) > 10: result_text += f" ... et {len(errors) - 10} autres erreurs\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur lors du téléchargement des cotations: {str(e)}")] elif name == "get_real_estate_properties": try: data = get_real_estate_data() properties = data.get("properties", []) if not properties: return [TextContent(type="text", text="Aucune propriété immobilière trouvée.")] result_text = f"Propriétés immobilières ({len(properties)}):\n\n" for prop in properties: result_text += f"📍 {prop.get('name', 'Sans nom')}\n" result_text += f" ID: {prop.get('id')}\n" result_text += f" Type: {prop.get('type', 'N/A')}\n" result_text += f" Adresse: {prop.get('address', '')}, {prop.get('city', '')}, {prop.get('postalCode', '')}\n" result_text += f" Pays: {prop.get('country', 'N/A')}\n" result_text += f" Date d'achat: {prop.get('purchaseDate', 'N/A')}\n" result_text += f" Prix d'achat: {float(prop.get('purchasePrice', 0)):,.2f} {prop.get('currency', 'EUR')}\n" result_text += f" Valeur actuelle: {float(prop.get('currentValue', 0)):,.2f} {prop.get('currency', 'EUR')}\n" capital_gain = float(prop.get('currentValue', 0)) - float(prop.get('purchasePrice', 0)) result_text += f" Plus-value: {capital_gain:,.2f} {prop.get('currency', 'EUR')}\n" if prop.get('notes'): result_text += f" Notes: {prop.get('notes')}\n" result_text += "\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur: {str(e)}")] elif name == "get_real_estate_loans": try: property_id = arguments.get("propertyId") data = get_real_estate_data() loans = data.get("loans", []) # Filtrer par propertyId si fourni if property_id: loans = [loan for loan in loans if loan.get("propertyId") == property_id] if not loans: msg = f"Aucun crédit trouvé" if property_id: msg += f" pour la propriété {property_id}" return [TextContent(type="text", text=msg + ".")] result_text = f"Crédits immobiliers ({len(loans)}):\n\n" for loan in loans: result_text += f"💰 {loan.get('name', 'Sans nom')}\n" result_text += f" ID: {loan.get('id')}\n" result_text += f" Propriété: {loan.get('propertyId')}\n" result_text += f" Prêteur: {loan.get('lender', 'N/A')}\n" result_text += f" Type: {loan.get('type', 'N/A')}\n" result_text += f" Montant initial: {float(loan.get('originalAmount', 0)):,.2f} {loan.get('currency', 'EUR')}\n" result_text += f" Solde actuel: {float(loan.get('currentBalance', 0)):,.2f} {loan.get('currency', 'EUR')}\n" result_text += f" Taux d'intérêt: {float(loan.get('interestRate', 0)):.2f}%\n" result_text += f" Mensualité: {float(loan.get('monthlyPayment', 0)):,.2f} {loan.get('currency', 'EUR')}\n" result_text += f" Début: {loan.get('startDate', 'N/A')}\n" result_text += f" Fin: {loan.get('endDate', 'N/A')}\n" # Calculer le montant remboursé paid = float(loan.get('originalAmount', 0)) - float(loan.get('currentBalance', 0)) result_text += f" Déjà remboursé: {paid:,.2f} {loan.get('currency', 'EUR')}\n" if loan.get('notes'): result_text += f" Notes: {loan.get('notes')}\n" result_text += "\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur: {str(e)}")] elif name == "get_real_estate_summary": try: data = get_real_estate_data() properties = data.get("properties", []) loans = data.get("loans", []) # Calculs total_properties = len(properties) total_property_value = sum(float(p.get('currentValue', 0)) for p in properties) total_purchase_price = sum(float(p.get('purchasePrice', 0)) for p in properties) total_capital_gain = total_property_value - total_purchase_price total_loans = len(loans) total_debt = sum(float(l.get('currentBalance', 0)) for l in loans) total_original_debt = sum(float(l.get('originalAmount', 0)) for l in loans) total_monthly_payment = sum(float(l.get('monthlyPayment', 0)) for l in loans) net_real_estate_value = total_property_value - total_debt # Déterminer la devise (prendre la première propriété ou EUR par défaut) currency = properties[0].get('currency', 'EUR') if properties else 'EUR' result_text = "📊 SYNTHÈSE DU PATRIMOINE IMMOBILIER\n\n" result_text += "PROPRIÉTÉS:\n" result_text += f" Nombre de propriétés: {total_properties}\n" result_text += f" Valeur totale actuelle: {total_property_value:,.2f} {currency}\n" result_text += f" Prix d'achat total: {total_purchase_price:,.2f} {currency}\n" result_text += f" Plus-value totale: {total_capital_gain:,.2f} {currency}\n" if total_purchase_price > 0: gain_pct = (total_capital_gain / total_purchase_price) * 100 result_text += f" Rendement: {gain_pct:.2f}%\n" result_text += "\n" result_text += "CRÉDITS:\n" result_text += f" Nombre de crédits: {total_loans}\n" result_text += f" Dette totale: {total_debt:,.2f} {currency}\n" result_text += f" Montant initial total: {total_original_debt:,.2f} {currency}\n" result_text += f" Mensualités totales: {total_monthly_payment:,.2f} {currency}\n" if total_original_debt > 0: paid = total_original_debt - total_debt paid_pct = (paid / total_original_debt) * 100 result_text += f" Déjà remboursé: {paid:,.2f} {currency} ({paid_pct:.1f}%)\n" result_text += "\n" result_text += "BILAN:\n" result_text += f" Valeur nette immobilière: {net_real_estate_value:,.2f} {currency}\n" if total_property_value > 0: ltv = (total_debt / total_property_value) * 100 result_text += f" Taux d'endettement (LTV): {ltv:.1f}%\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur: {str(e)}")] elif name == "get_total_net_worth": from datetime import datetime try: # 1. Récupérer les actifs financiers conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute(""" SELECT CAST(total_value AS REAL) as total_value, base_currency FROM daily_account_valuation WHERE account_id = 'TOTAL' AND valuation_date = (SELECT MAX(valuation_date) FROM daily_account_valuation WHERE account_id = 'TOTAL') """) result = c.fetchone() conn.close() financial_assets = result['total_value'] if result else 0 currency = result['base_currency'] if result else 'EUR' # 2. Récupérer le patrimoine immobilier data = get_real_estate_data() properties = data.get("properties", []) loans = data.get("loans", []) real_estate_gross_value = sum(float(p.get('currentValue', 0)) for p in properties) total_debt = sum(float(l.get('currentBalance', 0)) for l in loans) real_estate_net_value = real_estate_gross_value - total_debt # 3. Calculer le patrimoine net total net_worth = financial_assets + real_estate_net_value # Formater le résultat result_text = "💎 PATRIMOINE NET TOTAL\n\n" result_text += f"Actifs financiers: {financial_assets:,.2f} {currency}\n" result_text += f"Immobilier (valeur brute): {real_estate_gross_value:,.2f} {currency}\n" result_text += f"Dettes immobilières: -{total_debt:,.2f} {currency}\n" result_text += f"Immobilier (valeur nette): {real_estate_net_value:,.2f} {currency}\n" result_text += f"\n{'='*50}\n" result_text += f"PATRIMOINE NET: {net_worth:,.2f} {currency}\n" result_text += f"{'='*50}\n\n" # Répartition en pourcentage if net_worth > 0: fin_pct = (financial_assets / net_worth) * 100 re_pct = (real_estate_net_value / net_worth) * 100 result_text += f"Répartition:\n" result_text += f" Financier: {fin_pct:.1f}%\n" result_text += f" Immobilier: {re_pct:.1f}%\n" result_text += f"\nCalculé le: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur: {str(e)}")] elif name == "get_loan_amortization": from datetime import datetime, timedelta try: loan_id = arguments.get("loanId") extra_payments = arguments.get("extraPayments", []) if not loan_id: return [TextContent(type="text", text="Erreur: loanId requis")] # Récupérer le crédit data = get_real_estate_data() loans = data.get("loans", []) if not loans: return [TextContent(type="text", text="Aucun crédit immobilier trouvé. L'addon Real Estate n'est peut-être pas installé.")] loan = next((l for l in loans if l.get('id') == loan_id), None) if not loan: return [TextContent(type="text", text=f"Erreur: Crédit {loan_id} introuvable")] # Valider les champs requis required_fields = ['originalAmount', 'currentBalance', 'interestRate', 'monthlyPayment', 'startDate', 'endDate'] missing_fields = [field for field in required_fields if not loan.get(field)] if missing_fields: return [TextContent(type="text", text=f"Erreur: Champs manquants dans les données du crédit: {', '.join(missing_fields)}")] # Paramètres du crédit avec validation try: original_amount = float(loan.get('originalAmount', 0)) current_balance = float(loan.get('currentBalance', 0)) annual_rate = float(loan.get('interestRate', 0)) / 100 monthly_rate = annual_rate / 12 monthly_payment = float(loan.get('monthlyPayment', 0)) start_date = datetime.strptime(loan.get('startDate'), '%Y-%m-%d') end_date = datetime.strptime(loan.get('endDate'), '%Y-%m-%d') currency = loan.get('currency', 'EUR') except (ValueError, TypeError) as e: return [TextContent(type="text", text=f"Erreur: Données invalides dans le crédit: {str(e)}")] # Validations if original_amount <= 0: return [TextContent(type="text", text="Erreur: Montant initial invalide")] if current_balance < 0: return [TextContent(type="text", text="Erreur: Solde actuel invalide")] if monthly_payment <= 0: return [TextContent(type="text", text="Erreur: Mensualité invalide")] if start_date >= end_date: return [TextContent(type="text", text="Erreur: Dates invalides (début >= fin)")] # Calculer les mois déjà écoulés depuis le début now = datetime.now() months_elapsed = (now.year - start_date.year) * 12 + (now.month - start_date.month) capital_paid = original_amount - current_balance # Calculer la durée totale du crédit total_months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month) months_remaining_contractual = total_months - months_elapsed # Calculer le tableau d'amortissement à partir du mois actuel schedule = [] balance = current_balance month = months_elapsed # Démarrer au bon numéro de mois # Jour de paiement (extraire du start_date) payment_day = start_date.day # Date de début pour le calcul (mois suivant) current_date = start_date # Avancer jusqu'au mois actuel for _ in range(months_elapsed): current_date = current_date.replace(day=1) + timedelta(days=32) current_date = current_date.replace(day=payment_day) cumulative_interest = 0 # Convertir extra_payments en dict par date extra_by_date = {} for ep in extra_payments: ep_date = datetime.strptime(ep['date'], '%Y-%m-%d') extra_by_date[ep_date.strftime('%Y-%m')] = float(ep['amount']) # Calculer l'amortissement pour les mois restants # Utiliser la durée contractuelle ou jusqu'à balance = 0 avec extra payments max_months = months_remaining_contractual if not extra_payments else 600 while month < total_months and balance > 0.01: month += 1 # Passer au mois suivant current_date = current_date.replace(day=1) + timedelta(days=32) current_date = current_date.replace(day=min(payment_day, 28)) # Gérer février # Intérêts du mois interest = balance * monthly_rate principal = min(monthly_payment - interest, balance) # Vérifier s'il y a un remboursement anticipé ce mois date_key = current_date.strftime('%Y-%m') extra = extra_by_date.get(date_key, 0) if extra > 0: principal += min(extra, balance - principal) balance -= principal cumulative_interest += interest schedule.append({ "month": month, "date": current_date.strftime('%Y-%m-%d'), "payment": monthly_payment + extra, "principal": principal, "interest": interest, "remainingBalance": max(0, balance), "cumulativeInterest": cumulative_interest }) if balance <= 0: break # Résumé total_payments_remaining = sum(s['payment'] for s in schedule) total_interest_remaining = cumulative_interest total_principal_remaining = current_balance months_remaining = months_remaining_contractual # Utiliser la durée contractuelle projected_end_date = end_date.strftime('%Y-%m-%d') # Utiliser la date contractuelle # Estimer les intérêts déjà payés estimated_interest_paid = (months_elapsed * monthly_payment) - capital_paid # Formater le résultat result_text = f"📅 TABLEAU D'AMORTISSEMENT - {loan.get('name', loan_id)}\n\n" result_text += f"INFORMATIONS DU CRÉDIT:\n" result_text += f" Montant initial: {original_amount:,.2f} {currency}\n" result_text += f" Date début: {start_date.strftime('%Y-%m-%d')}\n" result_text += f" Date fin prévue: {end_date.strftime('%Y-%m-%d')}\n" result_text += f" Taux annuel: {annual_rate*100:.2f}%\n" result_text += f" Mensualité: {monthly_payment:,.2f} {currency}\n" result_text += f"\n{'─'*60}\n" result_text += f"PAIEMENTS DÉJÀ EFFECTUÉS:\n" result_text += f" Mois écoulés: {months_elapsed}\n" result_text += f" Capital remboursé: {capital_paid:,.2f} {currency}\n" result_text += f" Intérêts payés (estimé): {estimated_interest_paid:,.2f} {currency}\n" result_text += f" Total payé: {months_elapsed * monthly_payment:,.2f} {currency}\n" result_text += f"\n{'─'*60}\n" result_text += f"SOLDE ACTUEL:\n" result_text += f" Reste à payer (capital): {current_balance:,.2f} {currency}\n" result_text += f" Mois restants: {months_remaining}\n" result_text += f" Date de fin: {projected_end_date}\n" if extra_payments: result_text += f"\n⚡ Remboursements anticipés simulés: {len(extra_payments)}\n" result_text += f"\n{'─'*60}\n" result_text += f"PROJECTION RESTANTE:\n" result_text += f" Total à payer: {total_payments_remaining:,.2f} {currency}\n" result_text += f" Dont capital: {total_principal_remaining:,.2f} {currency}\n" result_text += f" Dont intérêts (estimé): {total_interest_remaining:,.2f} {currency}\n" result_text += f"\n{'─'*60}\n" result_text += f"COÛT TOTAL DU CRÉDIT (ESTIMÉ):\n" total_cost = estimated_interest_paid + total_interest_remaining result_text += f" Intérêts totaux: {total_cost:,.2f} {currency}\n" result_text += f" Coût total: {original_amount + total_cost:,.2f} {currency}\n" # Afficher un échantillon du tableau (premiers et derniers mois) result_text += f"\n ÉCHÉANCIER (premiers et derniers mois):\n" result_text += f"{'Mois':<6} {'Date':<12} {'Paiement':<12} {'Capital':<12} {'Intérêts':<12} {'Solde':<12}\n" result_text += "-" * 72 + "\n" # Premiers 12 mois for s in schedule[:12]: result_text += f"{s['month']:<6} {s['date']:<12} {s['payment']:>11,.0f} {s['principal']:>11,.0f} {s['interest']:>11,.0f} {s['remainingBalance']:>11,.0f}\n" if len(schedule) > 24: result_text += "...\n" # Derniers 12 mois for s in schedule[-12:]: result_text += f"{s['month']:<6} {s['date']:<12} {s['payment']:>11,.0f} {s['principal']:>11,.0f} {s['interest']:>11,.0f} {s['remainingBalance']:>11,.0f}\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur: {str(e)}")] elif name == "get_real_estate_valuations": try: property_id = arguments.get("propertyId") data = get_real_estate_data() valuations = data.get("valuations", []) # Filtrer par propertyId si fourni if property_id: valuations = [v for v in valuations if v.get("propertyId") == property_id] if not valuations: msg = "Aucune valorisation trouvée" if property_id: msg += f" pour la propriété {property_id}" return [TextContent(type="text", text=msg + ".")] result_text = f"📈 HISTORIQUE DES VALORISATIONS ({len(valuations)}):\n\n" # Grouper par propriété by_property = {} for val in valuations: prop_id = val.get('propertyId') if prop_id not in by_property: by_property[prop_id] = [] by_property[prop_id].append(val) for prop_id, vals in by_property.items(): # Trier par date vals.sort(key=lambda x: x.get('valuationDate', '')) result_text += f"Propriété: {prop_id}\n" for val in vals: result_text += f" 📅 {val.get('valuationDate', 'N/A')}: " result_text += f"{float(val.get('value', 0)):,.2f} EUR" if val.get('method'): result_text += f" (Méthode: {val.get('method')})" if val.get('notes'): result_text += f"\n Notes: {val.get('notes')}" result_text += "\n" # Calculer l'évolution if len(vals) >= 2: first_val = float(vals[0].get('value', 0)) last_val = float(vals[-1].get('value', 0)) evolution = last_val - first_val evolution_pct = (evolution / first_val * 100) if first_val > 0 else 0 result_text += f" Évolution: {evolution:+,.2f} EUR ({evolution_pct:+.2f}%)\n" result_text += "\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur: {str(e)}")] elif name == "get_accounts_summary": try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() # Récupérer le résumé par groupe de comptes (seulement les comptes actifs) # Utilise la dernière date de valorisation pour CHAQUE compte c.execute(""" SELECT a."group" as groupe, COUNT(DISTINCT a.id) as nb_comptes, SUM(CAST(v.total_value AS REAL)) as valeur_totale FROM daily_account_valuation v JOIN accounts a ON v.account_id = a.id WHERE v.valuation_date = ( SELECT MAX(valuation_date) FROM daily_account_valuation v2 WHERE v2.account_id = v.account_id ) AND a.id != 'TOTAL' AND a.is_active = 1 GROUP BY a."group" ORDER BY valeur_totale DESC """) groups = c.fetchall() # Récupérer la date de valorisation c.execute(""" SELECT MAX(valuation_date) as date FROM daily_account_valuation """) valuation_date = c.fetchone()['date'] # Récupérer le détail des comptes (seulement les comptes actifs) # Utilise la dernière date de valorisation pour CHAQUE compte c.execute(""" SELECT a.name, a."group" as groupe, a.account_type, CAST(v.total_value AS REAL) as valeur, v.base_currency FROM daily_account_valuation v JOIN accounts a ON v.account_id = a.id WHERE v.valuation_date = ( SELECT MAX(valuation_date) FROM daily_account_valuation v2 WHERE v2.account_id = v.account_id ) AND a.id != 'TOTAL' AND a.is_active = 1 ORDER BY a."group", valeur DESC """) accounts = c.fetchall() conn.close() if not groups: return [TextContent(type="text", text="Aucun compte trouvé.")] # Calculer le total total_value = sum(g['valeur_totale'] for g in groups) currency = accounts[0]['base_currency'] if accounts else "EUR" # Formater le résultat result_text = f"📊 RÉSUMÉ PAR GROUPE DE COMPTES\n" result_text += f"Date de valorisation: {valuation_date}\n" result_text += f"{'='*60}\n\n" # Résumé par groupe result_text += f"PAR GROUPE:\n" for group in groups: valeur = group['valeur_totale'] pct = (valeur / total_value * 100) if total_value > 0 else 0 result_text += f"\n {group['groupe'] or 'Sans groupe'}:\n" result_text += f" Nombre de comptes: {group['nb_comptes']}\n" result_text += f" Valeur: {valeur:,.2f} {currency} ({pct:.1f}%)\n" # Total result_text += f"\n{'─'*60}\n" result_text += f"TOTAL: {total_value:,.2f} {currency}\n" # Détail des comptes par groupe result_text += f"\n{'─'*60}\n" result_text += f"\nDÉTAIL PAR COMPTE:\n" current_group = None for acc in accounts: # Nouveau groupe if acc['groupe'] != current_group: current_group = acc['groupe'] result_text += f"\n{current_group or 'Sans groupe'}:\n" pct = (acc['valeur'] / total_value * 100) if total_value > 0 else 0 result_text += f" • {acc['name']}: {acc['valeur']:,.2f} {currency} ({pct:.1f}%)\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur: {str(e)}")] elif name == "get_holdings_detail": try: include_cash = arguments.get("include_cash", False) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() # Récupérer tous les holdings avec leurs détails et le dernier prix c.execute(""" SELECT a.symbol, a.name, a.asset_type, a.asset_class, a.asset_sub_class, a.sectors, a.currency, a.isin, CAST(json_extract(positions.value, '$.quantity') AS REAL) as quantity, CAST(json_extract(positions.value, '$.averageCost') AS REAL) as avg_cost, CAST(json_extract(positions.value, '$.totalCostBasis') AS REAL) as book_value, COALESCE(q.close, json_extract(positions.value, '$.averageCost')) as market_price FROM holdings_snapshots hs, json_each(hs.positions) AS positions JOIN assets a ON json_extract(positions.value, '$.assetId') = a.id LEFT JOIN ( SELECT symbol, CAST(close AS REAL) as close FROM quotes q1 WHERE timestamp = (SELECT MAX(timestamp) FROM quotes q2 WHERE q2.symbol = q1.symbol) ) q ON a.symbol = q.symbol WHERE hs.snapshot_date = (SELECT MAX(snapshot_date) FROM holdings_snapshots) AND CAST(json_extract(positions.value, '$.quantity') AS REAL) > 0 ORDER BY (CAST(json_extract(positions.value, '$.quantity') AS REAL) * COALESCE(q.close, json_extract(positions.value, '$.averageCost'))) DESC """) holdings = c.fetchall() conn.close() if not holdings: return [TextContent(type="text", text="Aucune position trouvée.")] # Filtrer le cash si demandé if not include_cash: holdings = [h for h in holdings if h['asset_type'] not in ('CASH', 'Cash')] # Calculer le total (quantity * market_price) total_value = sum((float(h['quantity'] or 0) * float(h['market_price'] or 0)) for h in holdings) # Formater le résultat result_text = f"📊 DÉTAIL DES POSITIONS ({len(holdings)} produits)\n" result_text += f"Valeur totale: {total_value:,.2f} EUR\n" result_text += f"{'='*100}\n\n" # Grouper par type d'actif by_type = {} for h in holdings: asset_type = h['asset_type'] or 'Autre' if asset_type not in by_type: by_type[asset_type] = [] by_type[asset_type].append(h) # Afficher par catégorie for asset_type, positions in sorted(by_type.items(), key=lambda x: sum((float(p['quantity'] or 0) * float(p['market_price'] or 0)) for p in x[1]), reverse=True): type_value = sum((float(p['quantity'] or 0) * float(p['market_price'] or 0)) for p in positions) type_pct = (type_value / total_value * 100) if total_value > 0 else 0 result_text += f"\n{asset_type.upper()}: {type_value:,.2f} EUR ({type_pct:.1f}%)\n" result_text += f"{'-'*100}\n" for h in positions: quantity = float(h['quantity'] or 0) price = float(h['market_price'] or 0) value = quantity * price pct = (value / total_value * 100) if total_value > 0 else 0 # Nom du produit (limité à 40 caractères) name = (h['name'][:37] + '...') if len(h['name']) > 40 else h['name'] # Secteur/Classe sector_info = h['sectors'] or h['asset_class'] or 'N/A' if sector_info and len(sector_info) > 20: sector_info = sector_info[:17] + '...' result_text += f" {h['symbol'] or 'N/A':<12} {name:<40}\n" result_text += f" Quantité: {quantity:>10.2f} Prix: {price:>10.2f} EUR Valeur: {value:>12,.2f} EUR ({pct:.1f}%)\n" result_text += f" Classe: {h['asset_class'] or 'N/A':<20} Secteur: {sector_info:<20} ISIN: {h['isin'] or 'N/A'}\n" # Plus-value avg_cost = float(h['avg_cost'] or 0) if avg_cost > 0 and price > 0: gain = value - (quantity * avg_cost) gain_pct = ((price - avg_cost) / avg_cost * 100) gain_symbol = '+' if gain >= 0 else '' result_text += f" PRU: {avg_cost:.2f} EUR Plus-value: {gain_symbol}{gain:,.2f} EUR ({gain_symbol}{gain_pct:.1f}%)\n" result_text += "\n" # Résumé de diversification result_text += f"\n{'='*100}\n" result_text += f"ANALYSE DE DIVERSIFICATION:\n\n" # Par type d'actif result_text += f"Par type d'actif:\n" for asset_type, positions in sorted(by_type.items(), key=lambda x: sum((float(p['quantity'] or 0) * float(p['market_price'] or 0)) for p in x[1]), reverse=True): type_value = sum((float(p['quantity'] or 0) * float(p['market_price'] or 0)) for p in positions) type_pct = (type_value / total_value * 100) if total_value > 0 else 0 result_text += f" {asset_type:<20}: {type_value:>12,.2f} EUR ({type_pct:>5.1f}%) - {len(positions)} produit(s)\n" # Par classe d'actif by_class = {} for h in holdings: asset_class = h['asset_class'] or 'Non classé' if asset_class not in by_class: by_class[asset_class] = 0 by_class[asset_class] += (float(h['quantity'] or 0) * float(h['market_price'] or 0)) result_text += f"\nPar classe d'actif:\n" for asset_class, value in sorted(by_class.items(), key=lambda x: x[1], reverse=True): pct = (value / total_value * 100) if total_value > 0 else 0 result_text += f" {asset_class:<20}: {value:>12,.2f} EUR ({pct:>5.1f}%)\n" # Nombre de lignes result_text += f"\nNombre total de lignes: {len(holdings)}\n" result_text += f"Valeur moyenne par ligne: {total_value / len(holdings):,.2f} EUR\n" return [TextContent(type="text", text=result_text)] except Exception as e: return [TextContent(type="text", text=f"Erreur: {str(e)}")] elif name == "get_all_holdings": try: include_cash = arguments.get("include_cash", False) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() # 1. HOLDINGS FINANCIERS # Calculate holdings from activities table to get accurate account attribution # Then join with TOTAL holdings to get current market prices c.execute(""" WITH account_holdings AS ( SELECT acc.id as account_id, acc.name as account_name, acc."group" as account_group, acc.account_type, a.id as asset_id, a.symbol, a.name, a.asset_type, a.asset_class, a.asset_sub_class, a.sectors, a.currency, a.isin, SUM(CAST(act.quantity AS REAL)) as quantity, AVG(CAST(act.unit_price AS REAL)) as avg_cost, SUM(CAST(act.quantity AS REAL) * CAST(act.unit_price AS REAL)) as book_value FROM activities act JOIN assets a ON act.asset_id = a.id JOIN accounts acc ON act.account_id = acc.id WHERE acc.is_active = 1 GROUP BY acc.id, acc.name, acc."group", acc.account_type, a.id, a.symbol, a.name, a.asset_type, a.asset_class, a.asset_sub_class, a.sectors, a.currency, a.isin HAVING SUM(CAST(act.quantity AS REAL)) > 0 ) SELECT ah.*, COALESCE(q.close, ah.avg_cost) as market_price, q.timestamp as last_quote_date, (SELECT MAX(snapshot_date) FROM holdings_snapshots) as snapshot_date FROM account_holdings ah LEFT JOIN ( SELECT symbol, CAST(close AS REAL) as close, timestamp FROM quotes q1 WHERE timestamp = (SELECT MAX(timestamp) FROM quotes q2 WHERE q2.symbol = q1.symbol) ) q ON ah.symbol = q.symbol ORDER BY (ah.quantity * COALESCE(q.close, ah.avg_cost)) DESC """) holdings = c.fetchall() conn.close() # Filtrer le cash si demandé if not include_cash: holdings = [h for h in holdings if h['asset_type'] not in ('CASH', 'Cash')] # Construire la liste des holdings financiers financial_holdings = [] total_financial = 0 for h in holdings: quantity = float(h['quantity'] or 0) price = float(h['market_price'] or 0) avg_cost = float(h['avg_cost'] or 0) book_value = float(h['book_value'] or 0) value = quantity * price total_financial += value gain = value - book_value if book_value > 0 else (value - (quantity * avg_cost) if avg_cost > 0 else 0) gain_pct = ((price - avg_cost) / avg_cost * 100) if avg_cost > 0 else 0 financial_holdings.append({ "account": { "id": h['account_id'], "name": h['account_name'], "group": h['account_group'], "type": h['account_type'] }, "asset": { "id": h['asset_id'], "symbol": h['symbol'], "name": h['name'], "type": h['asset_type'], "class": h['asset_class'], "sub_class": h['asset_sub_class'], "sectors": h['sectors'], "currency": h['currency'], "isin": h['isin'] }, "holding": { "quantity": quantity, "average_cost": avg_cost, "book_value": book_value, "market_price": price, "market_value": value, "gain_loss": gain, "gain_loss_pct": gain_pct, "last_quote_date": h['last_quote_date'], "snapshot_date": h['snapshot_date'] } }) # 2. IMMOBILIER re_data = get_real_estate_data() properties = re_data.get("properties", []) loans = re_data.get("loans", []) # Calculer les totaux immobiliers total_real_estate_gross = sum(float(p.get('currentValue', 0)) for p in properties) total_debt = sum(float(l.get('currentBalance', 0)) for l in loans) total_real_estate_net = total_real_estate_gross - total_debt # 3. CONSTRUIRE LA RÉPONSE JSON response_data = { "financial_holdings": financial_holdings, "real_estate": { "properties": properties, "total_value": total_real_estate_gross, "total_debt": total_debt, "net_value": total_real_estate_net }, "loans": loans, "summary": { "total_financial_assets": total_financial, "total_real_estate_gross": total_real_estate_gross, "total_debt": total_debt, "total_net_worth": total_financial + total_real_estate_net } } # Retourner le JSON formaté result_json = json.dumps(response_data, indent=2, ensure_ascii=False) return [TextContent(type="text", text=result_json)] except Exception as e: # En cas d'erreur, retourner au moins les holdings financiers si disponibles try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute(""" WITH account_holdings AS ( SELECT acc.id as account_id, acc.name as account_name, acc."group" as account_group, acc.account_type, a.id as asset_id, a.symbol, a.name, a.asset_type, a.asset_class, a.asset_sub_class, a.sectors, a.currency, a.isin, SUM(CAST(act.quantity AS REAL)) as quantity, AVG(CAST(act.unit_price AS REAL)) as avg_cost, SUM(CAST(act.quantity AS REAL) * CAST(act.unit_price AS REAL)) as book_value FROM activities act JOIN assets a ON act.asset_id = a.id JOIN accounts acc ON act.account_id = acc.id WHERE acc.is_active = 1 GROUP BY acc.id, acc.name, acc."group", acc.account_type, a.id, a.symbol, a.name, a.asset_type, a.asset_class, a.asset_sub_class, a.sectors, a.currency, a.isin HAVING SUM(CAST(act.quantity AS REAL)) > 0 ) SELECT ah.*, COALESCE(q.close, ah.avg_cost) as market_price, q.timestamp as last_quote_date, (SELECT MAX(snapshot_date) FROM holdings_snapshots) as snapshot_date FROM account_holdings ah LEFT JOIN ( SELECT symbol, CAST(close AS REAL) as close, timestamp FROM quotes q1 WHERE timestamp = (SELECT MAX(timestamp) FROM quotes q2 WHERE q2.symbol = q1.symbol) ) q ON ah.symbol = q.symbol """) holdings = c.fetchall() conn.close() financial_holdings = [] total_financial = 0 for h in holdings: quantity = float(h['quantity'] or 0) price = float(h['market_price'] or 0) avg_cost = float(h['avg_cost'] or 0) book_value = float(h['book_value'] or 0) value = quantity * price total_financial += value gain = value - book_value if book_value > 0 else (value - (quantity * avg_cost) if avg_cost > 0 else 0) gain_pct = ((price - avg_cost) / avg_cost * 100) if avg_cost > 0 else 0 financial_holdings.append({ "account": { "id": h['account_id'], "name": h['account_name'], "group": h['account_group'], "type": h['account_type'] }, "asset": { "id": h['asset_id'], "symbol": h['symbol'], "name": h['name'], "type": h['asset_type'], "class": h['asset_class'], "sub_class": h['asset_sub_class'], "sectors": h['sectors'], "currency": h['currency'], "isin": h['isin'] }, "holding": { "quantity": quantity, "average_cost": avg_cost, "book_value": book_value, "market_price": price, "market_value": value, "gain_loss": gain, "gain_loss_pct": gain_pct, "last_quote_date": h['last_quote_date'], "snapshot_date": h['snapshot_date'] } }) error_response = { "financial_holdings": financial_holdings, "real_estate": { "properties": [], "total_value": 0, "total_debt": 0, "net_value": 0 }, "loans": [], "summary": { "total_financial_assets": total_financial, "total_real_estate_gross": 0, "total_debt": 0, "total_net_worth": total_financial }, "error": f"Error loading real estate data: {str(e)}" } return [TextContent(type="text", text=json.dumps(error_response, indent=2, ensure_ascii=False))] except Exception as inner_e: return [TextContent(type="text", text=json.dumps({ "error": f"Critical error: {str(inner_e)}" }, indent=2))] else: return [TextContent(type="text", text=f"Outil inconnu: {name}")] async def main(): """Point d'entrée principal.""" async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/datamoc/WFmcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server