Skip to main content
Glama

Collective Brain MCP Server

by bparpette
main.py40.5 kB
""" MCP Collective Brain Server - Version multi-tenant optimisée pour Lambda Système de mémoire collective avec isolation par équipe """ import os import hashlib import json from datetime import datetime from typing import List, Dict, Optional # Import paresseux pour optimiser le démarrage Lambda requests = None urllib = None def ensure_requests(): """Import paresseux de requests""" global requests if requests is None: try: import requests if not IS_LAMBDA: print("✅ Module requests importé") except ImportError: requests = None if not IS_LAMBDA: print("❌ Module requests non disponible") def ensure_urllib(): """Import paresseux de urllib""" global urllib if urllib is None: try: import urllib.request import urllib.parse import urllib.error import json as json_module urllib = { 'request': urllib.request, 'parse': urllib.parse, 'error': urllib.error, 'json': json_module } if not IS_LAMBDA: print("✅ Module urllib disponible comme fallback") except ImportError: urllib = None if not IS_LAMBDA: print("❌ Module urllib non disponible") QDRANT_AVAILABLE = False QdrantClient = None Distance = None VectorParams = None PointStruct = None # Charger les variables d'environnement depuis config.env.example si .env n'existe pas if not os.path.exists('.env') and os.path.exists('config.env.example'): with open('config.env.example', 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) if key not in os.environ: os.environ[key] = value # Configuration Supabase SUPABASE_URL = os.getenv("SUPABASE_URL", "https://hzoggayzniyxlbwxchcx.supabase.co") SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Configuration Qdrant - optimisée pour démarrage rapide QDRANT_URL = os.getenv("QDRANT_URL") QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") QDRANT_ENABLED = os.getenv("QDRANT_ENABLED", "false").lower() == "true" # Détection environnement Lambda IS_LAMBDA = ( os.getenv("AWS_LAMBDA_FUNCTION_NAME") is not None or os.getenv("AWS_EXECUTION_ENV") is not None or os.getenv("LAMBDA_TASK_ROOT") is not None ) # En Lambda, mode paresseux pour éviter les timeouts if IS_LAMBDA: # En Lambda, on active Qdrant si les credentials sont disponibles USE_QDRANT = bool(QDRANT_URL and QDRANT_API_KEY) if USE_QDRANT: print("🚀 Mode Lambda - Qdrant activé (paresseux)") else: print("🚀 Mode Lambda - Qdrant désactivé (pas de credentials)") else: USE_QDRANT = bool(QDRANT_URL and QDRANT_API_KEY and QDRANT_ENABLED) # Debug minimal - seulement en local if not IS_LAMBDA: print(f"🔧 Qdrant: {'Activé' if USE_QDRANT else 'Désactivé'}") print(f"🔧 Supabase: {'Activé' if SUPABASE_SERVICE_KEY else 'Désactivé'}") # Import paresseux de FastMCP def get_mcp(): """Import paresseux de FastMCP - optimisé pour Lambda""" try: from mcp.server.fastmcp import FastMCP # Configuration optimisée pour Lambda - pas d'imports FastAPI au démarrage return FastMCP( "Collective Brain Server", port=3000, stateless_http=True, debug=False ) except ImportError: if not IS_LAMBDA: print("❌ FastMCP non disponible") return None # Variable globale pour stocker les headers de la requête courante current_request_headers = {} def set_request_headers(headers: dict): """Définir les headers de la requête courante""" global current_request_headers current_request_headers = headers def get_request_headers() -> dict: """Récupérer les headers de la requête courante""" return current_request_headers def get_current_user_token(token: str = None): """Récupérer le token utilisateur (simplifié pour Lambda)""" try: if not token: print("⚠️ Token non fourni") return "" print(f"🔍 Token reçu: {token[:10]}...") # Nettoyer le token si nécessaire if token.startswith("Bearer "): token = token[7:] print(f"🔍 Token nettoyé: {token[:10]}...") return token except Exception as e: print(f"❌ Erreur récupération token: {e}") return "" def extract_token_from_context(ctx): """Extraire le token Bearer depuis le contexte FastMCP""" try: if not ctx: print("⚠️ Contexte non fourni") return "" print("🔍 Contexte FastMCP disponible") # Accéder aux headers via le contexte de requête if hasattr(ctx, 'request_context') and ctx.request_context: print("🔍 request_context disponible") if hasattr(ctx.request_context, 'request') and ctx.request_context.request: request = ctx.request_context.request print(f"🔍 Objet request: {type(request)}") # Essayer d'accéder aux headers if hasattr(request, 'headers'): headers = request.headers print(f"🔍 Headers via request: {headers}") if 'authorization' in headers: auth_header = headers['authorization'] if auth_header.startswith("Bearer "): token = auth_header[7:] print(f"✅ Token trouvé via contexte: {token[:10]}...") return token # Essayer d'accéder aux métadonnées if hasattr(ctx.request_context, 'meta') and ctx.request_context.meta: meta = ctx.request_context.meta print(f"🔍 Métadonnées: {meta}") # Essayer d'accéder aux headers via l'app FastAPI if hasattr(ctx, 'fastmcp') and hasattr(ctx.fastmcp, '_app'): app = ctx.fastmcp._app print(f"🔍 App FastAPI: {type(app)}") print("❌ Token non trouvé dans le contexte") return "" except Exception as e: print(f"❌ Erreur extraction token depuis contexte: {e}") return "" def extract_token_from_headers(): """Extraire le token Bearer depuis les headers HTTP (fallback)""" try: # 1. Essayer de récupérer depuis les headers de la requête courante request_headers = get_request_headers() print(f"🔍 Headers de la requête: {request_headers}") if "authorization" in request_headers: auth_header = request_headers["authorization"] if auth_header.startswith("Bearer "): print(f"✅ Token trouvé dans les headers de requête: {auth_header[7:]}") return auth_header[7:] # Enlever "Bearer " # 2. Debug: afficher tous les headers disponibles dans l'environnement print("=== DEBUG HEADERS ENV ===") for key, value in os.environ.items(): if "AUTH" in key.upper() or "HEADER" in key.upper() or "HTTP" in key.upper(): print(f"{key}: {value}") print("========================") # 3. En Lambda, les headers sont disponibles via les variables d'environnement auth_header = os.getenv("HTTP_AUTHORIZATION", "") if auth_header.startswith("Bearer "): print(f"✅ Token trouvé dans HTTP_AUTHORIZATION: {auth_header[7:]}") return auth_header[7:] # Enlever "Bearer " # 4. Fallback: chercher dans d'autres variables d'environnement for key, value in os.environ.items(): if "AUTHORIZATION" in key.upper() and value.startswith("Bearer "): print(f"✅ Token trouvé dans {key}: {value[7:]}") return value[7:] print("❌ Aucun token Bearer trouvé") return "" except Exception as e: print(f"❌ Erreur extraction token: {e}") return "" # Modèle de données enrichi pour le cerveau collectif class Memory: def __init__(self, content: str, user_id: str = "", team_id: str = "", timestamp: str = "", tags: List[str] = [], category: str = "general", visibility: str = "team", confidence: float = 0.5): self.content = content self.user_id = user_id self.team_id = team_id self.timestamp = timestamp self.tags = tags self.category = category self.visibility = visibility self.confidence = confidence # Stockage en mémoire simple (fallback) memories: Dict[str, Memory] = {} # Import paresseux de Qdrant QDRANT_AVAILABLE = False QdrantClient = None Distance = None VectorParams = None PointStruct = None def ensure_qdrant_import(): """Import paresseux de Qdrant - optimisé pour Lambda""" global QDRANT_AVAILABLE, QdrantClient, Distance, VectorParams, PointStruct if not QDRANT_AVAILABLE and USE_QDRANT: try: from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct QDRANT_AVAILABLE = True # Pas de print en Lambda pour éviter les logs if not IS_LAMBDA: print("✅ Qdrant importé avec succès") except ImportError: QDRANT_AVAILABLE = False if not IS_LAMBDA: print("❌ Qdrant non disponible") def calculate_similarity(text1: str, text2: str) -> float: """Calcule la similarité entre deux textes""" words1 = set(text1.lower().split()) words2 = set(text2.lower().split()) intersection = len(words1.intersection(words2)) union = len(words1.union(words2)) return intersection / union if union > 0 else 0.0 def generate_embedding(text: str) -> List[float]: """Génère un embedding simple basé sur le hash du texte""" hash_obj = hashlib.md5(text.encode()) hash_bytes = hash_obj.digest() vector = [] for i in range(384): # Dimension standard vector.append((hash_bytes[i % 16] - 128) / 128.0) return vector def verify_user_token(user_token: str) -> Optional[Dict]: """Vérifier un token utilisateur via Supabase (obligatoire)""" if not IS_LAMBDA: print(f"🔍 Début vérification token: {user_token[:10]}...") if not SUPABASE_SERVICE_KEY: if not IS_LAMBDA: print("❌ Supabase non configuré - authentification obligatoire") return None try: # Si c'est un token Bearer, enlever le préfixe if user_token.startswith("Bearer "): user_token = user_token[7:] if not IS_LAMBDA: print(f"🔍 Token nettoyé: {user_token[:10]}...") if not IS_LAMBDA: print(f"🔍 Appel Supabase: {SUPABASE_URL}/rest/v1/rpc/verify_user_token") # Utiliser requests si disponible, sinon urllib ensure_requests() if requests is not None: if not IS_LAMBDA: print("🔍 Utilisation de requests") response = requests.post( f"{SUPABASE_URL}/rest/v1/rpc/verify_user_token", headers={ "Authorization": f"Bearer {SUPABASE_SERVICE_KEY}", "Content-Type": "application/json", "apikey": SUPABASE_SERVICE_KEY }, json={"token": user_token}, timeout=3 ) status_code = response.status_code response_text = response.text else: if not IS_LAMBDA: print("🔍 Utilisation de urllib (fallback)") ensure_urllib() if urllib is not None: # Utiliser urllib comme fallback data = json.dumps({"token": user_token}).encode('utf-8') req = urllib['request'].Request( f"{SUPABASE_URL}/rest/v1/rpc/verify_user_token", data=data, headers={ "Authorization": f"Bearer {SUPABASE_SERVICE_KEY}", "Content-Type": "application/json", "apikey": SUPABASE_SERVICE_KEY } ) response = urllib['request'].urlopen(req, timeout=3) status_code = response.getcode() response_text = response.read().decode('utf-8') else: if not IS_LAMBDA: print("❌ Aucun module HTTP disponible") return None if not IS_LAMBDA: print(f"🔍 Réponse Supabase: {status_code}") print(f"🔍 Contenu de la réponse: {response_text}") if status_code == 200: data = json.loads(response_text) if not IS_LAMBDA: print(f"🔍 Données reçues: {data}") if data and len(data) > 0: if not IS_LAMBDA: print(f"✅ Token valide pour utilisateur: {data[0]}") return data[0] if not IS_LAMBDA: print(f"❌ Token invalide: {user_token[:10]}... (status: {status_code})") return None except Exception as e: print(f"❌ Erreur vérification token: {e}") return None class QdrantStorage: """Gestionnaire de stockage Qdrant multi-tenant""" def __init__(self): self.client = None self._initialized = False self._init_attempted = False def _ensure_connected(self): """Connexion paresseuse avec timeout court""" if not self._initialized and not self._init_attempted: self._init_attempted = True # En Lambda, activer Qdrant si les credentials sont disponibles if IS_LAMBDA and QDRANT_URL and QDRANT_API_KEY: print("🔧 Activation de Qdrant en Lambda...") # Activer Qdrant dynamiquement global USE_QDRANT USE_QDRANT = True # Import paresseux ensure_qdrant_import() if not QDRANT_AVAILABLE: raise Exception("Qdrant non disponible") try: print("🔄 Connexion Qdrant...") self.client = QdrantClient( url=QDRANT_URL, api_key=QDRANT_API_KEY, timeout=3 # Timeout court pour Lambda ) self._initialized = True print("✅ Qdrant connecté") except Exception as e: print(f"❌ Erreur Qdrant: {e}") self.client = None self._initialized = False raise Exception(f"Connexion Qdrant échouée: {e}") if not self._initialized: raise Exception("Qdrant non disponible") def _get_collection_name(self, team_id: str) -> str: """Générer le nom de collection pour une équipe""" # Nettoyer l'ID d'équipe pour créer un nom de collection valide clean_team_id = team_id.replace("-", "_").replace(" ", "_") return f"team_memories_{clean_team_id}" def _ensure_collection_exists(self, team_id: str): """S'assurer que la collection de l'équipe existe (paresseux)""" collection_name = self._get_collection_name(team_id) try: collections = self.client.get_collections() collection_names = [c.name for c in collections.collections] if collection_name not in collection_names: self.client.create_collection( collection_name=collection_name, vectors_config=VectorParams(size=384, distance=Distance.COSINE) ) print(f"✅ Collection '{collection_name}' créée pour l'équipe {team_id}") else: print(f"✅ Collection '{collection_name}' existe pour l'équipe {team_id}") except Exception as e: print(f"❌ Erreur collection: {e}") raise def store_memory(self, memory: Memory, memory_id: str, team_id: str) -> str: """Stocker une mémoire avec isolation par équipe""" try: self._ensure_connected() self._ensure_collection_exists(team_id) collection_name = self._get_collection_name(team_id) embedding = generate_embedding(memory.content) point = PointStruct( id=memory_id, vector=embedding, payload={ "content": memory.content, "timestamp": memory.timestamp, "tags": memory.tags, "user_id": memory.user_id, "team_id": memory.team_id, "category": memory.category, "confidence": memory.confidence } ) self.client.upsert( collection_name=collection_name, points=[point] ) return memory_id except Exception as e: print(f"❌ Erreur stockage: {e}") raise def search_memories(self, query: str, team_id: str, limit: int = 5) -> List[Dict]: """Recherche avec isolation par équipe""" try: self._ensure_connected() collection_name = self._get_collection_name(team_id) # Vérifier que la collection existe collections = self.client.get_collections() collection_names = [c.name for c in collections.collections] if collection_name not in collection_names: print(f"⚠️ Collection {collection_name} n'existe pas encore") return [] query_embedding = generate_embedding(query) search_results = self.client.search( collection_name=collection_name, query_vector=query_embedding, limit=limit ) results = [] for result in search_results: results.append({ "memory_id": result.id, "content": result.payload["content"], "tags": result.payload["tags"], "timestamp": result.payload["timestamp"], "user_id": result.payload.get("user_id", "unknown"), "category": result.payload.get("category", "general"), "confidence": result.payload.get("confidence", 0.5), "similarity_score": round(result.score, 3) }) return results except Exception as e: print(f"❌ Erreur recherche: {e}") return [] def delete_memory(self, memory_id: str, team_id: str) -> bool: """Suppression avec isolation par équipe""" try: self._ensure_connected() collection_name = self._get_collection_name(team_id) self.client.delete( collection_name=collection_name, points_selector=[memory_id] ) return True except Exception as e: print(f"❌ Erreur suppression: {e}") return False def list_memories(self, team_id: str) -> List[Dict]: """Listage avec isolation par équipe""" try: self._ensure_connected() collection_name = self._get_collection_name(team_id) # Vérifier que la collection existe collections = self.client.get_collections() collection_names = [c.name for c in collections.collections] if collection_name not in collection_names: return [] points = self.client.scroll( collection_name=collection_name, limit=1000 )[0] results = [] for point in points: results.append({ "memory_id": point.id, "content": point.payload["content"], "tags": point.payload["tags"], "timestamp": point.payload["timestamp"], "user_id": point.payload.get("user_id", "unknown"), "category": point.payload.get("category", "general"), "confidence": point.payload.get("confidence", 0.5) }) return results except Exception as e: print(f"❌ Erreur listage: {e}") return [] # Initialisation paresseuse du stockage storage = None def get_storage(): """Obtenir l'instance de stockage avec initialisation paresseuse""" global storage if storage is None: # En Lambda, activer Qdrant à la demande si les credentials sont disponibles if IS_LAMBDA and QDRANT_URL and QDRANT_API_KEY: try: print("🔧 Activation de Qdrant en Lambda...") storage = QdrantStorage() print("✅ Qdrant activé en Lambda") except Exception as e: print(f"⚠️ Erreur activation Qdrant en Lambda: {e}") storage = None elif USE_QDRANT: storage = QdrantStorage() else: storage = None return storage # Initialisation paresseuse de MCP mcp = None def get_mcp_instance(): """Obtenir l'instance MCP avec initialisation paresseuse""" global mcp if mcp is None: mcp = get_mcp() return mcp # Outils MCP avec authentification via OAuth2PasswordBearer def add_memory( content: str, tags: str = "", category: str = "general", visibility: str = "team", token: str = None ) -> str: """Ajouter une mémoire au cerveau collectif avec authentification""" # Récupérer le token via OAuth2PasswordBearer if not IS_LAMBDA: print("🔍 Récupération du token via OAuth2PasswordBearer...") user_token = get_current_user_token(token) # Fallback vers l'ancienne méthode if not user_token: if not IS_LAMBDA: print("🔍 Fallback vers extract_token_from_headers...") user_token = extract_token_from_headers() # MODE TEST: Si aucun token trouvé, utiliser le token de test if not user_token: if not IS_LAMBDA: print("🧪 MODE TEST: Utilisation du token de test") user_token = "user_d8a7996df3c777e9ac2914ef16d5b501" if not user_token: return json.dumps({ "status": "error", "message": "Token utilisateur requis. Veuillez configurer l'authentification Bearer dans Le Chat." }) # Vérifier le token utilisateur if not IS_LAMBDA: print(f"🔍 Vérification du token: {user_token[:10]}...") user_info = verify_user_token(user_token) if not user_info: if not IS_LAMBDA: print(f"❌ Token invalide ou erreur de vérification: {user_token[:10]}...") return json.dumps({ "status": "error", "message": f"Token utilisateur invalide: {user_token[:10]}..." }) user_id = user_info["user_id"] team_id = user_info["team_id"] user_name = user_info["user_name"] # Générer un ID unique memory_id = hashlib.md5(f"{content}{datetime.now().isoformat()}{user_id}".encode()).hexdigest() # Parser les tags tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()] if tags else [] # Détection automatique de l'importance confidence = 0.5 if any(word in content.lower() for word in ["décision", "important", "critique", "urgent", "bug", "erreur"]): confidence = 0.8 elif any(word in content.lower() for word in ["solution", "résolu", "fix", "correction"]): confidence = 0.7 # Détection automatique de catégorie if category == "general": if any(word in content.lower() for word in ["bug", "erreur", "problème", "issue"]): category = "bug" elif any(word in content.lower() for word in ["décision", "choix", "stratégie"]): category = "decision" elif any(word in content.lower() for word in ["feature", "fonctionnalité", "nouveau"]): category = "feature" elif any(word in content.lower() for word in ["réunion", "meeting", "call"]): category = "meeting" # Créer la mémoire enrichie memory = Memory( content=content, user_id=user_id, team_id=team_id, timestamp=datetime.now().isoformat(), tags=tag_list, category=category, visibility=visibility, confidence=confidence ) # Stocker via le système de stockage storage = get_storage() if storage: try: storage.store_memory(memory, memory_id, team_id) message = f"Mémoire ajoutée au cerveau collectif de l'équipe (Qdrant)" except Exception as e: print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}") memories[memory_id] = memory message = f"Mémoire ajoutée au cerveau collectif (mémoire - fallback)" else: memories[memory_id] = memory message = f"Mémoire ajoutée au cerveau collectif (mémoire)" return json.dumps({ "status": "success", "memory_id": memory_id, "message": message, "user": user_name, "team": team_id }) def search_memories( query: str, limit: int = 5, token: str = None ) -> str: """Rechercher dans le cerveau collectif avec authentification""" # Récupérer le token via OAuth2PasswordBearer user_token = get_current_user_token(token) # Fallback vers l'ancienne méthode if not user_token: user_token = extract_token_from_headers() # MODE TEST: Si aucun token trouvé, utiliser le token de test if not user_token: print("🧪 MODE TEST: Utilisation du token de test") user_token = "user_d8a7996df3c777e9ac2914ef16d5b501" if not user_token: return json.dumps({ "status": "error", "message": "Token utilisateur requis. Veuillez configurer l'authentification Bearer dans Le Chat." }) # Vérifier le token utilisateur user_info = verify_user_token(user_token) if not user_info: return json.dumps({ "status": "error", "message": f"Token utilisateur invalide: {user_token[:10]}..." }) team_id = user_info["team_id"] user_name = user_info["user_name"] storage = get_storage() if storage: try: results = storage.search_memories(query, team_id, limit) except Exception as e: print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}") results = [] else: results = [] # Si pas de résultats de Qdrant, utiliser le stockage en mémoire if not results: scored_memories = [] for memory_id, memory in memories.items(): if memory.team_id == team_id: # Isolation par équipe similarity = calculate_similarity(query, memory.content) scored_memories.append((similarity, memory_id, memory)) scored_memories.sort(key=lambda x: x[0], reverse=True) for similarity, memory_id, memory in scored_memories[:limit]: if similarity > 0: results.append({ "memory_id": memory_id, "content": memory.content, "tags": memory.tags, "timestamp": memory.timestamp, "user_id": memory.user_id, "category": memory.category, "confidence": memory.confidence, "similarity_score": round(similarity, 3) }) return json.dumps({ "status": "success", "query": query, "results": results, "total_found": len(results), "user": user_name, "team": team_id }) # TEMPORAIREMENT DÉSACTIVÉ POUR LA PRODUCTION # def delete_memory(memory_id: str, token: str = None) -> str: # """Supprimer une mémoire du cerveau collectif avec authentification""" # # # Récupérer le token via OAuth2PasswordBearer # user_token = get_current_user_token(token) # # # Fallback vers l'ancienne méthode # if not user_token: # user_token = extract_token_from_headers() # # # MODE TEST: Si aucun token trouvé, utiliser le token de test # if not user_token: # print("🧪 MODE TEST: Utilisation du token de test") # user_token = "user_d8a7996df3c777e9ac2914ef16d5b501" # # if not user_token: # return json.dumps({ # "status": "error", # "message": "Token utilisateur requis. Veuillez configurer l'authentification Bearer dans Le Chat." # }) # # # Vérifier le token utilisateur # user_info = verify_user_token(user_token) # if not user_info: # return json.dumps({ # "status": "error", # "message": f"Token utilisateur invalide: {user_token[:10]}..." # }) # # team_id = user_info["team_id"] # user_name = user_info["user_name"] # # storage = get_storage() # if storage: # try: # success = storage.delete_memory(memory_id, team_id) # if success: # return json.dumps({ # "status": "success", # "message": f"Mémoire {memory_id} supprimée du cerveau collectif (Qdrant)" # }) # else: # return json.dumps({ # "status": "error", # "message": "Erreur lors de la suppression" # }) # except Exception as e: # print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}") # # # Utiliser le stockage en mémoire (fallback ou par défaut) # if memory_id not in memories: # return json.dumps({ # "status": "error", # "message": "Mémoire non trouvée" # }) # # # Vérifier que la mémoire appartient à l'équipe de l'utilisateur # memory = memories[memory_id] # if memory.team_id != team_id: # return json.dumps({ # "status": "error", # "message": "Accès non autorisé à cette mémoire" # }) # # del memories[memory_id] # # return json.dumps({ # "status": "success", # "message": f"Mémoire {memory_id} supprimée du cerveau collectif (mémoire)" # }) def list_memories(token: str = None) -> str: """Lister toutes les mémoires du cerveau collectif avec authentification""" # Récupérer le token via OAuth2PasswordBearer user_token = get_current_user_token(token) # Fallback vers l'ancienne méthode if not user_token: user_token = extract_token_from_headers() # MODE TEST: Si aucun token trouvé, utiliser le token de test if not user_token: print("🧪 MODE TEST: Utilisation du token de test") user_token = "user_d8a7996df3c777e9ac2914ef16d5b501" if not user_token: return json.dumps({ "status": "error", "message": "Token utilisateur requis. Veuillez configurer l'authentification Bearer dans Le Chat." }) # Vérifier le token utilisateur user_info = verify_user_token(user_token) if not user_info: return json.dumps({ "status": "error", "message": f"Token utilisateur invalide: {user_token[:10]}..." }) team_id = user_info["team_id"] user_name = user_info["user_name"] storage = get_storage() if storage: try: all_memories = storage.list_memories(team_id) except Exception as e: print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}") all_memories = [] else: all_memories = [] # Si pas de résultats de Qdrant, utiliser le stockage en mémoire if not all_memories: if not memories: return json.dumps({ "status": "success", "message": "Aucune mémoire dans le cerveau collectif", "total": 0, "memories": [], "user": user_name, "team": team_id }) for memory_id, memory in memories.items(): if memory.team_id == team_id: # Isolation par équipe all_memories.append({ "memory_id": memory_id, "content": memory.content, "tags": memory.tags, "timestamp": memory.timestamp, "user_id": memory.user_id, "category": memory.category, "confidence": memory.confidence }) return json.dumps({ "status": "success", "total": len(all_memories), "memories": all_memories, "user": user_name, "team": team_id }) def get_team_insights(token: str = None) -> str: """Obtenir des insights sur l'activité de l'équipe avec authentification""" # Récupérer le token via OAuth2PasswordBearer user_token = get_current_user_token(token) # Fallback vers l'ancienne méthode if not user_token: user_token = extract_token_from_headers() # MODE TEST: Si aucun token trouvé, utiliser le token de test if not user_token: print("🧪 MODE TEST: Utilisation du token de test") user_token = "user_d8a7996df3c777e9ac2914ef16d5b501" if not user_token: return json.dumps({ "status": "error", "message": "Token utilisateur requis. Veuillez configurer l'authentification Bearer dans Le Chat." }) # Vérifier le token utilisateur user_info = verify_user_token(user_token) if not user_info: return json.dumps({ "status": "error", "message": f"Token utilisateur invalide: {user_token[:10]}..." }) team_id = user_info["team_id"] user_name = user_info["user_name"] # Récupérer toutes les mémoires de l'équipe all_memories = [] storage = get_storage() if storage: try: all_memories = storage.list_memories(team_id) except Exception as e: print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}") # Fallback vers mémoire locale if not all_memories: for memory_id, memory in memories.items(): if memory.team_id == team_id: all_memories.append({ "memory_id": memory_id, "content": memory.content, "category": getattr(memory, 'category', 'general'), "tags": memory.tags, "timestamp": memory.timestamp, "user_id": getattr(memory, 'user_id', 'unknown'), "confidence": getattr(memory, 'confidence', 0.5) }) # Analyser les patterns categories = {} contributors = {} tags_count = {} high_confidence = 0 for memory in all_memories: # Compter les catégories category = memory.get('category', 'general') categories[category] = categories.get(category, 0) + 1 # Compter les contributeurs user = memory.get('user_id', 'unknown') contributors[user] = contributors.get(user, 0) + 1 # Compter les tags for tag in memory.get('tags', []): tags_count[tag] = tags_count.get(tag, 0) + 1 # Compter les mémoires importantes if memory.get('confidence', 0) > 0.7: high_confidence += 1 # Top 5 de chaque catégorie top_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True)[:5] top_contributors = sorted(contributors.items(), key=lambda x: x[1], reverse=True)[:5] top_tags = sorted(tags_count.items(), key=lambda x: x[1], reverse=True)[:5] insights = { "team_id": team_id, "total_memories": len(all_memories), "high_confidence_memories": high_confidence, "top_categories": [{"category": cat, "count": count} for cat, count in top_categories], "top_contributors": [{"user": user, "contributions": count} for user, count in top_contributors], "trending_tags": [{"tag": tag, "count": count} for tag, count in top_tags], "knowledge_health": { "coverage": len(categories), "activity_level": "high" if len(all_memories) > 20 else "medium" if len(all_memories) > 5 else "low", "collaboration_score": len(contributors) / max(len(all_memories), 1) } } return json.dumps({ "status": "success", "insights": insights, "user": user_name, "team": team_id }) # Middleware pour capturer les headers def capture_headers_middleware(request, call_next): """Middleware pour capturer les headers HTTP""" try: # Capturer les headers de la requête headers = dict(request.headers) set_request_headers(headers) print(f"🔍 Headers capturés: {headers}") except Exception as e: print(f"❌ Erreur capture headers: {e}") response = call_next(request) return response # Initialisation paresseuse de MCP def initialize_mcp(): """Initialiser MCP de manière paresseuse""" global mcp if mcp is None: mcp = get_mcp() if mcp: # Enregistrer les outils avec OAuth2PasswordBearer mcp.tool( title="Add Memory", description="Ajouter une mémoire au cerveau collectif de l'équipe", )(add_memory) mcp.tool( title="Search Memories", description="Rechercher dans le cerveau collectif de l'équipe", )(search_memories) # TEMPORAIREMENT DÉSACTIVÉ POUR LA PRODUCTION # mcp.tool( # title="Delete Memory", # description="Supprimer une mémoire du cerveau collectif", # )(delete_memory) mcp.tool( title="List All Memories", description="Lister toutes les mémoires du cerveau collectif", )(list_memories) mcp.tool( title="Get Team Insights", description="Obtenir des insights sur l'activité de l'équipe", )(get_team_insights) if not IS_LAMBDA: print("✅ MCP initialisé avec succès") else: if not IS_LAMBDA: print("❌ Impossible d'initialiser MCP") return mcp if __name__ == "__main__": if not IS_LAMBDA: print("🎯 Démarrage du serveur MCP Collective Brain...") # Initialisation paresseuse mcp = initialize_mcp() if mcp: if not IS_LAMBDA: print("🚀 Serveur MCP Collective Brain démarré - prêt à recevoir des requêtes") mcp.run(transport="streamable-http") else: if not IS_LAMBDA: print("❌ Impossible de démarrer le serveur MCP")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bparpette/MistralHackathonMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server