Skip to main content
Glama
pure_mcp_server.py22.9 kB
""" Serveur MCP pur pour Kodi Implémente exactement le protocole MCP standard : JSON-RPC 2.0 via SSE Compatible avec le node MCP Client de n8n """ import json import logging import asyncio import time from typing import Any, Dict, List, Optional from contextlib import asynccontextmanager from fastapi import FastAPI, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from starlette.responses import Response from .config import get_settings from .kodi_client import KodiClient # Configuration du logger logger = logging.getLogger(__name__) # Instance globale des settings settings = get_settings() # Client Kodi global kodi = KodiClient() # Gestionnaire des connexions SSE MCP class MCPSSEManager: def __init__(self): self.clients: List[asyncio.Queue] = [] self.request_handlers = {} async def add_client(self) -> asyncio.Queue: client_queue = asyncio.Queue() self.clients.append(client_queue) logger.info(f"Client MCP connecté, total: {len(self.clients)}") return client_queue def remove_client(self, client_queue: asyncio.Queue): if client_queue in self.clients: self.clients.remove(client_queue) logger.info(f"Client MCP déconnecté, total: {len(self.clients)}") async def send_to_client(self, client_queue: asyncio.Queue, message: dict): """Envoie un message JSON-RPC à un client spécifique""" try: await client_queue.put(json.dumps(message)) except Exception as e: logger.error(f"Erreur envoi message MCP: {e}") mcp_manager = MCPSSEManager() # Spécification des tools MCP selon le standard MCP_TOOLS_SPEC = [ { "name": "getnowplaying", "description": "Récupère ce qui joue actuellement sur Kodi (titre, durée, temps écoulé, etc.)", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "playerplaypause", "description": "Toggle play/pause sur le player actif de Kodi", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "playerstop", "description": "Arrêter la lecture sur Kodi", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "setvolume", "description": "Régler le volume de Kodi (0-100)", "inputSchema": { "type": "object", "properties": { "level": { "type": "integer", "title": "Level", "description": "Niveau de volume (0-100)", "minimum": 0, "maximum": 100 } }, "required": ["level"] } }, { "name": "navigatemenu", "description": "Navigation dans l'interface de Kodi", "inputSchema": { "type": "object", "properties": { "direction": { "type": "string", "title": "Direction", "description": "Direction de navigation", "enum": ["up", "down", "left", "right", "select", "back"] } }, "required": ["direction"] } }, { "name": "searchmovies", "description": "Chercher des films dans la bibliothèque Kodi", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "title": "Query", "description": "Terme de recherche" } }, "required": ["query"] } }, { "name": "listrecentmovies", "description": "Liste les films récemment ajoutés à Kodi", "inputSchema": { "type": "object", "properties": { "limit": { "type": "integer", "title": "Limit", "description": "Nombre de films à retourner (défaut: 20)", "minimum": 1, "maximum": 100, "default": 20 } }, "required": [] } }, { "name": "listtvshows", "description": "Liste toutes les séries TV dans Kodi", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "playmovie", "description": "Lancer un film par son ID dans Kodi", "inputSchema": { "type": "object", "properties": { "movieid": { "type": "integer", "title": "Movie ID", "description": "ID du film dans la bibliothèque Kodi", "minimum": 1 } }, "required": ["movieid"] } }, { "name": "playepisode", "description": "Lancer un épisode de série dans Kodi", "inputSchema": { "type": "object", "properties": { "tvshowid": { "type": "integer", "title": "TV Show ID", "description": "ID de la série", "minimum": 1 }, "season": { "type": "integer", "title": "Season", "description": "Numéro de saison", "minimum": 1 }, "episode": { "type": "integer", "title": "Episode", "description": "Numéro d'épisode", "minimum": 1 } }, "required": ["tvshowid", "season", "episode"] } }, { "name": "getlibrarystats", "description": "Récupérer les statistiques de la bibliothèque Kodi (films, séries, épisodes, musique)", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "scanlibrary", "description": "Lancer un scan de la bibliothèque Kodi", "inputSchema": { "type": "object", "properties": { "librarytype": { "type": "string", "title": "Library Type", "description": "Type de bibliothèque à scanner", "enum": ["video", "audio"], "default": "video" } }, "required": [] } }, { "name": "listdownloads", "description": "Liste tous les fichiers vidéo dans le dossier downloads (récursif)", "inputSchema": { "type": "object", "properties": { "limit": { "type": "integer", "title": "Limit", "description": "Nombre de fichiers à retourner (défaut: 50)", "minimum": 1, "maximum": 100, "default": 50 } }, "required": [] } }, { "name": "playfile", "description": "Lance un fichier vidéo par son chemin complet", "inputSchema": { "type": "object", "properties": { "filepath": { "type": "string", "title": "File Path", "description": "Chemin complet du fichier vidéo à lancer" } }, "required": ["filepath"] } }, { "name": "searchdownloads", "description": "Cherche des fichiers dans le dossier downloads par nom (insensible à la casse)", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "title": "Query", "description": "Terme de recherche pour filtrer les fichiers" } }, "required": ["query"] } } ] def list_downloads_files(limit: int = 50): """Liste les fichiers dans le dossier downloads en utilisant les méthodes du client Kodi""" try: # Utilise la nouvelle méthode du client Kodi return kodi.list_directory(settings.kodi_downloads_path, limit) except Exception as e: logger.error(f"Erreur lors de la liste des downloads: {e}") from .kodi_client import KodiResponse return KodiResponse( success=False, error=f"Erreur lors de la liste des downloads: {e}", error_code="DOWNLOAD_LIST_ERROR" ) def search_downloads_files(query: str): """Recherche des fichiers dans le dossier downloads""" try: # Utilise la nouvelle méthode de recherche du client Kodi return kodi.search_in_directory(settings.kodi_downloads_path, query) except Exception as e: logger.error(f"Erreur lors de la recherche dans downloads: {e}") from .kodi_client import KodiResponse return KodiResponse( success=False, error=f"Erreur lors de la recherche dans downloads: {e}", error_code="DOWNLOAD_SEARCH_ERROR" ) def execute_kodi_tool(name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Exécute un tool Kodi et retourne le résultat MCP""" logger.info(f"Exécution tool MCP: {name} avec arguments: {arguments}") try: if name == "getnowplaying": res = kodi.get_now_playing() elif name == "playerplaypause": res = kodi.player_play_pause() elif name == "playerstop": res = kodi.player_stop() elif name == "setvolume": level = arguments.get("level") if level is None: return { "success": False, "error": "Paramètre 'level' manquant" } res = kodi.set_volume(int(level)) elif name == "navigatemenu": direction = arguments.get("direction") if direction is None: return { "success": False, "error": "Paramètre 'direction' manquant" } res = kodi.navigate_menu(str(direction)) elif name == "searchmovies": query = arguments.get("query") if query is None: return { "success": False, "error": "Paramètre 'query' manquant" } res = kodi.search_movies(str(query)) elif name == "listrecentmovies": limit = arguments.get("limit", 20) res = kodi.list_recent_movies(int(limit)) elif name == "listtvshows": res = kodi.list_tv_shows() elif name == "playmovie": movie_id = arguments.get("movieid") if movie_id is None: return { "success": False, "error": "Paramètre 'movieid' manquant" } res = kodi.play_movie(int(movie_id)) elif name == "playepisode": tvshow_id = arguments.get("tvshowid") season = arguments.get("season") episode = arguments.get("episode") if any(x is None for x in [tvshow_id, season, episode]): return { "success": False, "error": "Paramètres 'tvshowid', 'season', 'episode' requis" } res = kodi.play_episode(int(tvshow_id), int(season), int(episode)) elif name == "getlibrarystats": res = kodi.get_library_stats() elif name == "scanlibrary": library_type = arguments.get("librarytype", "video") res = kodi.scan_library(str(library_type)) elif name == "listdownloads": limit = arguments.get("limit", 50) res = list_downloads_files(int(limit)) elif name == "playfile": file_path = arguments.get("filepath") if file_path is None: return { "success": False, "error": "Paramètre 'filepath' manquant" } res = kodi.play_file(str(file_path)) elif name == "searchdownloads": query = arguments.get("query") if query is None: return { "success": False, "error": "Paramètre 'query' manquant" } res = search_downloads_files(str(query)) else: return { "success": False, "error": f"Tool inconnu: {name}" } # Formatage du résultat selon MCP if res.success: return { "success": True, "data": res.data, "tool": name } else: return { "success": False, "error": res.error, "error_code": res.error_code, "tool": name } except Exception as e: logger.exception(f"Erreur lors de l'exécution du tool {name}") return { "success": False, "error": str(e), "tool": name } def handle_jsonrpc_request(message: dict) -> dict: """Traite une requête JSON-RPC MCP et retourne la réponse""" method = message.get("method") msg_id = message.get("id") params = message.get("params", {}) # Initialize - Initialisation du serveur MCP if method == "initialize": return { "jsonrpc": "2.0", "id": msg_id, "result": { "protocolVersion": "2024-11-05", "capabilities": { "tools": { "listChanged": True } }, "serverInfo": { "name": "kodi-controller", "version": "1.0.0" } } } # Tools/list - Liste des tools disponibles elif method == "tools/list": # Validation des tools avant envoi validated_tools = [] for tool in MCP_TOOLS_SPEC: # Vérifier que toutes les propriétés requises sont présentes if not tool.get("name"): logger.error(f"Tool sans nom détecté: {tool}") continue if not tool.get("description"): logger.error(f"Tool {tool.get('name')} sans description") continue if not tool.get("inputSchema"): logger.error(f"Tool {tool.get('name')} sans inputSchema") continue # Nettoyer le tool de toute propriété None/undefined clean_tool = { "name": str(tool["name"]), "description": str(tool["description"]), "inputSchema": tool["inputSchema"] } validated_tools.append(clean_tool) # Debug pour Langfuse logger.error('=== DEBUG MCP TOOLS ===') for idx, tool in enumerate(validated_tools): logger.error(f"Tool {idx}: {json.dumps(tool, indent=2, ensure_ascii=False)}") logger.error('=======================') logger.info(f"Retour de {len(validated_tools)} tools validés") return { "jsonrpc": "2.0", "id": msg_id, "result": { "tools": validated_tools } } # Tools/call - Exécution d'un tool elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if not tool_name: return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32602, "message": "Paramètre 'name' manquant" } } # Vérifier que le tool existe if not any(tool["name"] == tool_name for tool in MCP_TOOLS_SPEC): return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32601, "message": f"Tool non trouvé: {tool_name}" } } # Exécuter le tool result = execute_kodi_tool(tool_name, arguments) if result.get("success"): # Succès - format MCP content content_text = json.dumps(result, indent=2) return { "jsonrpc": "2.0", "id": msg_id, "result": { "content": [ { "type": "text", "text": content_text } ] } } else: # Erreur tool return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32603, "message": result.get("error", "Erreur lors de l'exécution du tool"), "data": result } } # Méthode non supportée else: return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": -32601, "message": f"Méthode non supportée: {method}" } } # Lifespan context manager @asynccontextmanager async def lifespan(app: FastAPI): logger.info("🎬 Démarrage du serveur MCP pur...") logger.info(f"Configuration Kodi: {settings.kodi_host}:{settings.kodi_port}") # Test de connexion Kodi if kodi.test_connection(): logger.info("✅ Connexion Kodi: OK") else: logger.warning("⚠️ Connexion Kodi: ÉCHEC") yield logger.info("🛑 Arrêt du serveur MCP pur") # Application FastAPI app = FastAPI( title="Kodi MCP Server (Standard)", version="1.0.0", description="Serveur MCP pur pour Kodi - JSON-RPC 2.0 via SSE", lifespan=lifespan ) # CORS app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post("/") async def mcp_endpoint(request: Request): """ Endpoint principal MCP - JSON-RPC 2.0 via HTTP POST Compatible avec les clients MCP standard """ try: # Parse de la requête JSON-RPC body = await request.json() # Traitement de la requête response = handle_jsonrpc_request(body) # Retour de la réponse JSON-RPC return response except json.JSONDecodeError: return { "jsonrpc": "2.0", "id": None, "error": { "code": -32700, "message": "Erreur de parsing JSON" } } except Exception as e: logger.exception("Erreur endpoint MCP") return { "jsonrpc": "2.0", "id": None, "error": { "code": -32603, "message": f"Erreur interne: {str(e)}" } } @app.get("/sse") async def mcp_sse_endpoint(request: Request): """ Endpoint SSE pour le protocole MCP JSON-RPC 2.0 via Server-Sent Events Compatible avec le node MCP Client de n8n """ client_queue = await mcp_manager.add_client() async def event_generator(): try: # Message d'initialisation automatique init_message = { "jsonrpc": "2.0", "method": "notifications/initialized", "params": { "protocolVersion": "2024-11-05", "capabilities": { "tools": { "listChanged": True } }, "serverInfo": { "name": "kodi-controller", "version": "1.0.0" } } } # Envoi du message d'initialisation yield f"data: {json.dumps(init_message)}\n\n" # Boucle principale SSE while True: try: # Attendre un message ou timeout pour heartbeat message = await asyncio.wait_for(client_queue.get(), timeout=30.0) yield f"data: {message}\n\n" except asyncio.TimeoutError: # Heartbeat MCP standard heartbeat = { "jsonrpc": "2.0", "method": "notifications/ping", "params": { "timestamp": int(time.time()) } } yield f"data: {json.dumps(heartbeat)}\n\n" except asyncio.CancelledError: mcp_manager.remove_client(client_queue) raise except Exception as e: logger.error(f"Erreur SSE MCP: {e}") mcp_manager.remove_client(client_queue) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "Cache-Control" } ) @app.get("/health") async def health(): """Health check simple""" kodi_ok = kodi.test_connection() return { "status": "ok" if kodi_ok else "degraded", "kodi": "ok" if kodi_ok else "down", "mcp_version": "2024-11-05", "server": "kodi-controller" } # Point d'entrée pour uvicorn if __name__ == "__main__": import uvicorn uvicorn.run( "src.pure_mcp_server:app", host=settings.server_host, port=settings.server_port, reload=False, log_level=settings.log_level.lower(), )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/laHeud/kodi-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server