main_auth.py•25.9 kB
"""
MCP Collective Brain Server - Version avec authentification Supabase
Système de mémoire collective multi-tenant avec isolation par équipe
"""
import os
import hashlib
import json
import requests
from datetime import datetime
from typing import List, Dict, Optional
# Configuration Supabase
SUPABASE_URL = os.getenv("SUPABASE_URL", "https://hzoggayzniyxlbwxchcx.supabase.co")
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
# Configuration Qdrant - optimisée pour démarrage rapide
QDRANT_URL = os.getenv("QDRANT_URL")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
QDRANT_ENABLED = os.getenv("QDRANT_ENABLED", "false").lower() == "true"
# Détection environnement Lambda
IS_LAMBDA = (
os.getenv("AWS_LAMBDA_FUNCTION_NAME") is not None or
os.getenv("AWS_EXECUTION_ENV") is not None or
os.getenv("LAMBDA_TASK_ROOT") is not None
)
# En Lambda, mode paresseux pour éviter les timeouts
if IS_LAMBDA and QDRANT_ENABLED:
print("🚀 Mode Lambda - Qdrant en mode paresseux")
USE_QDRANT = True
else:
USE_QDRANT = bool(QDRANT_URL and QDRANT_API_KEY and QDRANT_ENABLED)
# Debug minimal
print(f"🔧 Qdrant: {'Activé' if USE_QDRANT else 'Désactivé'}")
print(f"🔧 Supabase: {'Activé' if SUPABASE_SERVICE_KEY else 'Désactivé'}")
# Import paresseux de FastMCP
def get_mcp():
"""Import paresseux de FastMCP"""
try:
from mcp.server.fastmcp import FastMCP
return FastMCP("Collective Brain Server", port=3000, stateless_http=True, debug=False)
except ImportError:
print("❌ FastMCP non disponible")
return None
# Modèle de données enrichi pour le cerveau collectif
class Memory:
def __init__(self, content: str, user_id: str = "", team_id: str = "",
timestamp: str = "", tags: List[str] = [], category: str = "general",
visibility: str = "team", confidence: float = 0.5):
self.content = content
self.user_id = user_id
self.team_id = team_id
self.timestamp = timestamp
self.tags = tags
self.category = category
self.visibility = visibility
self.confidence = confidence
# Stockage en mémoire simple (fallback)
memories: Dict[str, Memory] = {}
# Import paresseux de Qdrant
QDRANT_AVAILABLE = False
QdrantClient = None
Distance = None
VectorParams = None
PointStruct = None
def ensure_qdrant_import():
"""Import paresseux de Qdrant"""
global QDRANT_AVAILABLE, QdrantClient, Distance, VectorParams, PointStruct
if not QDRANT_AVAILABLE and USE_QDRANT:
try:
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
QDRANT_AVAILABLE = True
print("✅ Qdrant importé avec succès")
except ImportError:
QDRANT_AVAILABLE = False
print("❌ Qdrant non disponible")
def calculate_similarity(text1: str, text2: str) -> float:
"""Calcule la similarité entre deux textes"""
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
return intersection / union if union > 0 else 0.0
def generate_embedding(text: str) -> List[float]:
"""Génère un embedding simple basé sur le hash du texte"""
hash_obj = hashlib.md5(text.encode())
hash_bytes = hash_obj.digest()
vector = []
for i in range(384): # Dimension standard
vector.append((hash_bytes[i % 16] - 128) / 128.0)
return vector
def verify_user_token(user_token: str) -> Optional[Dict]:
"""Vérifier un token utilisateur via Supabase"""
if not SUPABASE_SERVICE_KEY:
print("⚠️ Supabase non configuré, mode anonyme")
return {
"user_id": "anonymous",
"team_id": "default_team",
"team_token": "default_team_token",
"user_name": "Utilisateur Anonyme",
"user_role": "member"
}
try:
# Appeler l'API Supabase pour vérifier le token
response = requests.post(
f"{SUPABASE_URL}/rest/v1/rpc/verify_user_token",
headers={
"Authorization": f"Bearer {SUPABASE_SERVICE_KEY}",
"Content-Type": "application/json",
"apikey": SUPABASE_SERVICE_KEY
},
json={"token": user_token},
timeout=5
)
if response.status_code == 200:
data = response.json()
if data and len(data) > 0:
return data[0]
print(f"❌ Token invalide: {user_token}")
return None
except Exception as e:
print(f"❌ Erreur vérification token: {e}")
return None
class QdrantStorage:
"""Gestionnaire de stockage Qdrant multi-tenant"""
def __init__(self):
self.client = None
self._initialized = False
self._init_attempted = False
def _ensure_connected(self):
"""Connexion paresseuse avec timeout court"""
if not self._initialized and not self._init_attempted:
self._init_attempted = True
# Import paresseux
ensure_qdrant_import()
if not QDRANT_AVAILABLE:
raise Exception("Qdrant non disponible")
try:
print("🔄 Connexion Qdrant...")
self.client = QdrantClient(
url=QDRANT_URL,
api_key=QDRANT_API_KEY,
timeout=3 # Timeout très court pour Lambda
)
self._initialized = True
print("✅ Qdrant connecté")
except Exception as e:
print(f"❌ Erreur Qdrant: {e}")
self.client = None
self._initialized = False
raise Exception(f"Connexion Qdrant échouée: {e}")
if not self._initialized:
raise Exception("Qdrant non disponible")
def _get_collection_name(self, team_id: str) -> str:
"""Générer le nom de collection pour une équipe"""
# Nettoyer l'ID d'équipe pour créer un nom de collection valide
clean_team_id = team_id.replace("-", "_").replace(" ", "_")
return f"team_memories_{clean_team_id}"
def _ensure_collection_exists(self, team_id: str):
"""S'assurer que la collection de l'équipe existe"""
collection_name = self._get_collection_name(team_id)
try:
collections = self.client.get_collections()
collection_names = [c.name for c in collections.collections]
if collection_name not in collection_names:
self.client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)
print(f"✅ Collection '{collection_name}' créée pour l'équipe {team_id}")
else:
print(f"✅ Collection '{collection_name}' existe pour l'équipe {team_id}")
except Exception as e:
print(f"❌ Erreur collection: {e}")
raise
def store_memory(self, memory: Memory, memory_id: str, team_id: str) -> str:
"""Stocker une mémoire avec isolation par équipe"""
try:
self._ensure_connected()
self._ensure_collection_exists(team_id)
collection_name = self._get_collection_name(team_id)
embedding = generate_embedding(memory.content)
point = PointStruct(
id=memory_id,
vector=embedding,
payload={
"content": memory.content,
"timestamp": memory.timestamp,
"tags": memory.tags,
"user_id": memory.user_id,
"team_id": memory.team_id,
"category": memory.category,
"confidence": memory.confidence
}
)
self.client.upsert(
collection_name=collection_name,
points=[point]
)
return memory_id
except Exception as e:
print(f"❌ Erreur stockage: {e}")
raise
def search_memories(self, query: str, team_id: str, limit: int = 5) -> List[Dict]:
"""Recherche avec isolation par équipe"""
try:
self._ensure_connected()
collection_name = self._get_collection_name(team_id)
# Vérifier que la collection existe
collections = self.client.get_collections()
collection_names = [c.name for c in collections.collections]
if collection_name not in collection_names:
print(f"⚠️ Collection {collection_name} n'existe pas encore")
return []
query_embedding = generate_embedding(query)
search_results = self.client.search(
collection_name=collection_name,
query_vector=query_embedding,
limit=limit
)
results = []
for result in search_results:
results.append({
"memory_id": result.id,
"content": result.payload["content"],
"tags": result.payload["tags"],
"timestamp": result.payload["timestamp"],
"user_id": result.payload.get("user_id", "unknown"),
"category": result.payload.get("category", "general"),
"confidence": result.payload.get("confidence", 0.5),
"similarity_score": round(result.score, 3)
})
return results
except Exception as e:
print(f"❌ Erreur recherche: {e}")
return []
def delete_memory(self, memory_id: str, team_id: str) -> bool:
"""Suppression avec isolation par équipe"""
try:
self._ensure_connected()
collection_name = self._get_collection_name(team_id)
self.client.delete(
collection_name=collection_name,
points_selector=[memory_id]
)
return True
except Exception as e:
print(f"❌ Erreur suppression: {e}")
return False
def list_memories(self, team_id: str) -> List[Dict]:
"""Listage avec isolation par équipe"""
try:
self._ensure_connected()
collection_name = self._get_collection_name(team_id)
# Vérifier que la collection existe
collections = self.client.get_collections()
collection_names = [c.name for c in collections.collections]
if collection_name not in collection_names:
return []
points = self.client.scroll(
collection_name=collection_name,
limit=1000
)[0]
results = []
for point in points:
results.append({
"memory_id": point.id,
"content": point.payload["content"],
"tags": point.payload["tags"],
"timestamp": point.payload["timestamp"],
"user_id": point.payload.get("user_id", "unknown"),
"category": point.payload.get("category", "general"),
"confidence": point.payload.get("confidence", 0.5)
})
return results
except Exception as e:
print(f"❌ Erreur listage: {e}")
return []
# Initialisation paresseuse du stockage
storage = None
def get_storage():
"""Obtenir l'instance de stockage avec initialisation paresseuse"""
global storage
if storage is None:
if USE_QDRANT:
storage = QdrantStorage()
else:
storage = None
return storage
# Initialisation paresseuse de MCP
mcp = None
def get_mcp_instance():
"""Obtenir l'instance MCP avec initialisation paresseuse"""
global mcp
if mcp is None:
mcp = get_mcp()
return mcp
# Outils MCP avec authentification
def add_memory(
content: str,
user_token: str,
tags: str = "",
category: str = "general",
visibility: str = "team"
) -> str:
"""Ajouter une mémoire au bucket partagé avec authentification"""
# Vérifier le token utilisateur
user_info = verify_user_token(user_token)
if not user_info:
return json.dumps({
"status": "error",
"message": "Token utilisateur invalide"
})
user_id = user_info["user_id"]
team_id = user_info["team_id"]
user_name = user_info["user_name"]
# Générer un ID unique
memory_id = hashlib.md5(f"{content}{datetime.now().isoformat()}{user_id}".encode()).hexdigest()
# Parser les tags
tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()] if tags else []
# Détection automatique de l'importance
confidence = 0.5
if any(word in content.lower() for word in ["décision", "important", "critique", "urgent", "bug", "erreur"]):
confidence = 0.8
elif any(word in content.lower() for word in ["solution", "résolu", "fix", "correction"]):
confidence = 0.7
# Détection automatique de catégorie
if category == "general":
if any(word in content.lower() for word in ["bug", "erreur", "problème", "issue"]):
category = "bug"
elif any(word in content.lower() for word in ["décision", "choix", "stratégie"]):
category = "decision"
elif any(word in content.lower() for word in ["feature", "fonctionnalité", "nouveau"]):
category = "feature"
elif any(word in content.lower() for word in ["réunion", "meeting", "call"]):
category = "meeting"
# Créer la mémoire enrichie
memory = Memory(
content=content,
user_id=user_id,
team_id=team_id,
timestamp=datetime.now().isoformat(),
tags=tag_list,
category=category,
visibility=visibility,
confidence=confidence
)
# Stocker via le système de stockage
storage = get_storage()
if storage:
try:
storage.store_memory(memory, memory_id, team_id)
message = f"Mémoire ajoutée au cerveau collectif de l'équipe (Qdrant)"
except Exception as e:
print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}")
memories[memory_id] = memory
message = f"Mémoire ajoutée au cerveau collectif (mémoire - fallback)"
else:
memories[memory_id] = memory
message = f"Mémoire ajoutée au cerveau collectif (mémoire)"
return json.dumps({
"status": "success",
"memory_id": memory_id,
"message": message,
"user": user_name,
"team": team_id
})
def search_memories(
query: str,
user_token: str,
limit: int = 5
) -> str:
"""Rechercher dans le bucket de mémoires partagé avec authentification"""
# Vérifier le token utilisateur
user_info = verify_user_token(user_token)
if not user_info:
return json.dumps({
"status": "error",
"message": "Token utilisateur invalide"
})
team_id = user_info["team_id"]
user_name = user_info["user_name"]
storage = get_storage()
if storage:
try:
results = storage.search_memories(query, team_id, limit)
except Exception as e:
print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}")
results = []
else:
results = []
# Si pas de résultats de Qdrant, utiliser le stockage en mémoire
if not results:
scored_memories = []
for memory_id, memory in memories.items():
if memory.team_id == team_id: # Isolation par équipe
similarity = calculate_similarity(query, memory.content)
scored_memories.append((similarity, memory_id, memory))
scored_memories.sort(key=lambda x: x[0], reverse=True)
for similarity, memory_id, memory in scored_memories[:limit]:
if similarity > 0:
results.append({
"memory_id": memory_id,
"content": memory.content,
"tags": memory.tags,
"timestamp": memory.timestamp,
"user_id": memory.user_id,
"category": memory.category,
"confidence": memory.confidence,
"similarity_score": round(similarity, 3)
})
return json.dumps({
"status": "success",
"query": query,
"results": results,
"total_found": len(results),
"user": user_name,
"team": team_id
})
def delete_memory(memory_id: str, user_token: str) -> str:
"""Supprimer une mémoire du bucket partagé avec authentification"""
# Vérifier le token utilisateur
user_info = verify_user_token(user_token)
if not user_info:
return json.dumps({
"status": "error",
"message": "Token utilisateur invalide"
})
team_id = user_info["team_id"]
user_name = user_info["user_name"]
storage = get_storage()
if storage:
try:
success = storage.delete_memory(memory_id, team_id)
if success:
return json.dumps({
"status": "success",
"message": f"Mémoire {memory_id} supprimée du cerveau collectif (Qdrant)"
})
else:
return json.dumps({
"status": "error",
"message": "Erreur lors de la suppression"
})
except Exception as e:
print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}")
# Utiliser le stockage en mémoire (fallback ou par défaut)
if memory_id not in memories:
return json.dumps({
"status": "error",
"message": "Mémoire non trouvée"
})
# Vérifier que la mémoire appartient à l'équipe de l'utilisateur
memory = memories[memory_id]
if memory.team_id != team_id:
return json.dumps({
"status": "error",
"message": "Accès non autorisé à cette mémoire"
})
del memories[memory_id]
return json.dumps({
"status": "success",
"message": f"Mémoire {memory_id} supprimée du cerveau collectif (mémoire)"
})
def list_memories(user_token: str) -> str:
"""Lister toutes les mémoires du bucket partagé avec authentification"""
# Vérifier le token utilisateur
user_info = verify_user_token(user_token)
if not user_info:
return json.dumps({
"status": "error",
"message": "Token utilisateur invalide"
})
team_id = user_info["team_id"]
user_name = user_info["user_name"]
storage = get_storage()
if storage:
try:
all_memories = storage.list_memories(team_id)
except Exception as e:
print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}")
all_memories = []
else:
all_memories = []
# Si pas de résultats de Qdrant, utiliser le stockage en mémoire
if not all_memories:
if not memories:
return json.dumps({
"status": "success",
"message": "Aucune mémoire dans le cerveau collectif",
"total": 0,
"memories": [],
"user": user_name,
"team": team_id
})
for memory_id, memory in memories.items():
if memory.team_id == team_id: # Isolation par équipe
all_memories.append({
"memory_id": memory_id,
"content": memory.content,
"tags": memory.tags,
"timestamp": memory.timestamp,
"user_id": memory.user_id,
"category": memory.category,
"confidence": memory.confidence
})
return json.dumps({
"status": "success",
"total": len(all_memories),
"memories": all_memories,
"user": user_name,
"team": team_id
})
def get_team_insights(user_token: str) -> str:
"""Obtenir des insights sur l'activité de l'équipe avec authentification"""
# Vérifier le token utilisateur
user_info = verify_user_token(user_token)
if not user_info:
return json.dumps({
"status": "error",
"message": "Token utilisateur invalide"
})
team_id = user_info["team_id"]
user_name = user_info["user_name"]
# Récupérer toutes les mémoires de l'équipe
all_memories = []
storage = get_storage()
if storage:
try:
all_memories = storage.list_memories(team_id)
except Exception as e:
print(f"⚠️ Erreur Qdrant, fallback vers mémoire: {e}")
# Fallback vers mémoire locale
if not all_memories:
for memory_id, memory in memories.items():
if memory.team_id == team_id:
all_memories.append({
"memory_id": memory_id,
"content": memory.content,
"category": getattr(memory, 'category', 'general'),
"tags": memory.tags,
"timestamp": memory.timestamp,
"user_id": getattr(memory, 'user_id', 'unknown'),
"confidence": getattr(memory, 'confidence', 0.5)
})
# Analyser les patterns
categories = {}
contributors = {}
tags_count = {}
high_confidence = 0
for memory in all_memories:
# Compter les catégories
category = memory.get('category', 'general')
categories[category] = categories.get(category, 0) + 1
# Compter les contributeurs
user = memory.get('user_id', 'unknown')
contributors[user] = contributors.get(user, 0) + 1
# Compter les tags
for tag in memory.get('tags', []):
tags_count[tag] = tags_count.get(tag, 0) + 1
# Compter les mémoires importantes
if memory.get('confidence', 0) > 0.7:
high_confidence += 1
# Top 5 de chaque catégorie
top_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True)[:5]
top_contributors = sorted(contributors.items(), key=lambda x: x[1], reverse=True)[:5]
top_tags = sorted(tags_count.items(), key=lambda x: x[1], reverse=True)[:5]
insights = {
"team_id": team_id,
"total_memories": len(all_memories),
"high_confidence_memories": high_confidence,
"top_categories": [{"category": cat, "count": count} for cat, count in top_categories],
"top_contributors": [{"user": user, "contributions": count} for user, count in top_contributors],
"trending_tags": [{"tag": tag, "count": count} for tag, count in top_tags],
"knowledge_health": {
"coverage": len(categories),
"activity_level": "high" if len(all_memories) > 20 else "medium" if len(all_memories) > 5 else "low",
"collaboration_score": len(contributors) / max(len(all_memories), 1)
}
}
return json.dumps({
"status": "success",
"insights": insights,
"user": user_name,
"team": team_id
})
# Initialisation paresseuse de MCP
def initialize_mcp():
"""Initialiser MCP de manière paresseuse"""
global mcp
if mcp is None:
mcp = get_mcp()
if mcp:
# Enregistrer les outils
mcp.tool(
title="Add Memory",
description="Ajouter une mémoire au cerveau collectif de l'équipe",
)(add_memory)
mcp.tool(
title="Search Memories",
description="Rechercher dans le cerveau collectif de l'équipe",
)(search_memories)
mcp.tool(
title="Delete Memory",
description="Supprimer une mémoire du cerveau collectif",
)(delete_memory)
mcp.tool(
title="List All Memories",
description="Lister toutes les mémoires du cerveau collectif",
)(list_memories)
mcp.tool(
title="Get Team Insights",
description="Obtenir des insights sur l'activité de l'équipe",
)(get_team_insights)
print("✅ MCP initialisé avec succès")
else:
print("❌ Impossible d'initialiser MCP")
return mcp
if __name__ == "__main__":
print("🎯 Démarrage du serveur MCP Collective Brain...")
# Initialisation paresseuse
mcp = initialize_mcp()
if mcp:
print("🚀 Serveur MCP Collective Brain démarré - prêt à recevoir des requêtes")
mcp.run(transport="streamable-http")
else:
print("❌ Impossible de démarrer le serveur MCP")