Skip to main content
Glama
kodi_client.py30.1 kB
""" Client Kodi pour l'API JSON-RPC Wrapper avec retry logic et gestion d'erreurs robuste """ import asyncio import json import logging from typing import Any, Dict, List, Optional, Union from dataclasses import dataclass from enum import Enum import aiohttp import requests from requests.auth import HTTPBasicAuth from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from .config import settings # Configuration du logger logger = logging.getLogger(__name__) class KodiError(Exception): """Exception de base pour les erreurs Kodi""" pass class KodiConnectionError(KodiError): """Erreur de connexion à Kodi""" pass class KodiAPIError(KodiError): """Erreur de l'API Kodi""" pass class NavigationDirection(Enum): """Directions de navigation dans l'interface Kodi""" UP = "up" DOWN = "down" LEFT = "left" RIGHT = "right" SELECT = "select" BACK = "back" @dataclass class KodiResponse: """Réponse formatée de l'API Kodi""" success: bool data: Any = None error: Optional[str] = None error_code: Optional[int] = None class KodiClient: """ Client pour l'API JSON-RPC de Kodi avec retry logic et gestion d'erreurs """ def __init__(self): self.base_url = settings.kodi_url self.auth = None self.timeout = settings.kodi_timeout self.retry_attempts = settings.kodi_retry_attempts self.retry_delay = settings.kodi_retry_delay # Configuration de l'authentification si nécessaire if settings.kodi_auth: self.auth = HTTPBasicAuth(*settings.kodi_auth) logger.info(f"Client Kodi initialisé pour {settings.kodi_host}:{settings.kodi_port}") @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type((requests.RequestException, KodiConnectionError)) ) def _make_request(self, method: str, params: Optional[Dict] = None) -> KodiResponse: """ Effectue une requête JSON-RPC vers Kodi avec retry automatique Args: method: Méthode JSON-RPC à appeler params: Paramètres de la méthode Returns: KodiResponse avec le résultat """ payload = { "jsonrpc": "2.0", "method": method, "id": 1 } if params: payload["params"] = params try: logger.debug(f"Requête Kodi: {method} avec params: {params}") response = requests.post( self.base_url, json=payload, auth=self.auth, timeout=self.timeout ) if response.status_code != 200: error_msg = f"Erreur HTTP {response.status_code}: {response.text}" logger.error(error_msg) raise KodiConnectionError(error_msg) response_data = response.json() # Vérifier les erreurs JSON-RPC if "error" in response_data: error = response_data["error"] error_msg = f"Erreur API Kodi: {error.get('message', 'Erreur inconnue')}" logger.error(error_msg) return KodiResponse( success=False, error=error.get('message'), error_code=error.get('code') ) # Succès result = response_data.get("result") logger.debug(f"Réponse Kodi réussie pour {method}: {result}") return KodiResponse(success=True, data=result) except requests.exceptions.Timeout: error_msg = f"Timeout lors de la requête {method} (>{self.timeout}s)" logger.error(error_msg) raise KodiConnectionError(error_msg) except requests.exceptions.ConnectionError as e: error_msg = f"Impossible de se connecter à Kodi: {str(e)}" logger.error(error_msg) raise KodiConnectionError(error_msg) except json.JSONDecodeError as e: error_msg = f"Réponse JSON invalide de Kodi: {str(e)}" logger.error(error_msg) raise KodiAPIError(error_msg) except Exception as e: error_msg = f"Erreur inattendue lors de la requête {method}: {str(e)}" logger.error(error_msg) raise KodiAPIError(error_msg) def ping(self) -> KodiResponse: """Test de connexion à Kodi""" return self._make_request("JSONRPC.Ping") def get_now_playing(self) -> KodiResponse: """Récupère les informations du média en cours de lecture""" response = self._make_request("Player.GetActivePlayers") if not response.success or not response.data: return KodiResponse(success=True, data={"status": "nothing_playing"}) # Récupérer les détails du player actif player_id = response.data[0]["playerid"] # Propriétés de base compatibles avec tous les types de média basic_properties = [ "title", "duration", "file", "thumbnail" ] # Essayer d'abord avec les propriétés de base item_response = self._make_request( "Player.GetItem", { "playerid": player_id, "properties": basic_properties } ) if not item_response.success: logger.warning(f"Échec Player.GetItem avec propriétés de base: {item_response.error}") # Essayer sans propriétés item_response = self._make_request( "Player.GetItem", {"playerid": player_id} ) # Récupérer les propriétés du player (plus fiable) player_props = ["time", "totaltime", "percentage", "speed"] props_response = self._make_request( "Player.GetProperties", { "playerid": player_id, "properties": player_props } ) if not props_response.success: logger.warning(f"Échec Player.GetProperties: {props_response.error}") # Essayer avec moins de propriétés props_response = self._make_request( "Player.GetProperties", { "playerid": player_id, "properties": ["time", "totaltime"] } ) # Combiner les données disponibles result = { "status": "playing", "player_id": player_id, "player_type": response.data[0].get("type", "unknown") } if item_response.success and item_response.data: result["item"] = item_response.data.get("item", {}) if props_response.success and props_response.data: result["properties"] = props_response.data return KodiResponse(success=True, data=result) def player_play_pause(self) -> KodiResponse: """Toggle play/pause du player actif""" # Récupérer le player actif response = self._make_request("Player.GetActivePlayers") if not response.success or not response.data: return KodiResponse(success=False, error="Aucun player actif") player_id = response.data[0]["playerid"] return self._make_request("Player.PlayPause", {"playerid": player_id}) def player_stop(self) -> KodiResponse: """Arrêter la lecture""" response = self._make_request("Player.GetActivePlayers") if not response.success or not response.data: return KodiResponse(success=False, error="Aucun player actif") player_id = response.data[0]["playerid"] return self._make_request("Player.Stop", {"playerid": player_id}) def set_volume(self, level: int) -> KodiResponse: """ Régler le volume Args: level: Niveau de volume (0-100) """ if not 0 <= level <= 100: return KodiResponse(success=False, error="Le volume doit être entre 0 et 100") return self._make_request("Application.SetVolume", {"volume": level}) def navigate_menu(self, direction: str) -> KodiResponse: """ Navigation dans l'interface Kodi Args: direction: Direction (up/down/left/right/select/back) """ try: nav_direction = NavigationDirection(direction.lower()) except ValueError: valid_directions = [d.value for d in NavigationDirection] return KodiResponse( success=False, error=f"Direction invalide. Doit être un de: {valid_directions}" ) # Mapping des directions vers les méthodes Kodi direction_mapping = { NavigationDirection.UP: "Input.Up", NavigationDirection.DOWN: "Input.Down", NavigationDirection.LEFT: "Input.Left", NavigationDirection.RIGHT: "Input.Right", NavigationDirection.SELECT: "Input.Select", NavigationDirection.BACK: "Input.Back" } method = direction_mapping[nav_direction] return self._make_request(method) def search_movies(self, query: str) -> KodiResponse: """ Chercher des films dans la bibliothèque Args: query: Terme de recherche """ if not query.strip(): return KodiResponse(success=False, error="La requête de recherche ne peut pas être vide") properties = [ "title", "year", "rating", "runtime", "plot", "director", "genre", "thumbnail", "fanart", "file" ] return self._make_request( "VideoLibrary.GetMovies", { "filter": { "operator": "contains", "field": "title", "value": query }, "properties": properties, "sort": {"order": "ascending", "method": "title"} } ) def list_recent_movies(self, limit: int = 20) -> KodiResponse: """ Liste les films récemment ajoutés Args: limit: Nombre de films à retourner (défaut: 20) """ properties = [ "title", "year", "rating", "runtime", "plot", "director", "genre", "thumbnail", "fanart", "dateadded", "file" ] return self._make_request( "VideoLibrary.GetRecentlyAddedMovies", { "properties": properties, "limits": {"end": limit} } ) def list_tv_shows(self) -> KodiResponse: """Liste toutes les séries TV""" properties = [ "title", "year", "rating", "plot", "genre", "thumbnail", "fanart", "premiered", "studio", "mpaa", "file" ] return self._make_request( "VideoLibrary.GetTVShows", { "properties": properties, "sort": {"order": "ascending", "method": "title"} } ) def play_movie(self, movie_id: int) -> KodiResponse: """ Lancer un film Args: movie_id: ID du film dans la bibliothèque Kodi """ return self._make_request( "Player.Open", { "item": { "movieid": movie_id } } ) def play_episode(self, tvshow_id: int, season: int, episode: int) -> KodiResponse: """ Lancer un épisode de série Args: tvshow_id: ID de la série season: Numéro de saison episode: Numéro d'épisode """ # D'abord, récupérer l'ID de l'épisode episodes_response = self._make_request( "VideoLibrary.GetEpisodes", { "tvshowid": tvshow_id, "season": season, "properties": ["episode"] } ) if not episodes_response.success or not episodes_response.data: return KodiResponse(success=False, error="Épisode introuvable") # Trouver l'épisode correspondant target_episode = None for ep in episodes_response.data.get("episodes", []): if ep.get("episode") == episode: target_episode = ep break if not target_episode: return KodiResponse(success=False, error=f"Épisode {season}x{episode:02d} introuvable") return self._make_request( "Player.Open", { "item": { "episodeid": target_episode["episodeid"] } } ) def get_library_stats(self) -> KodiResponse: """Récupérer les statistiques de la bibliothèque""" try: # Récupérer les stats des films movies_response = self._make_request("VideoLibrary.GetMovies", {"limits": {"end": 0}}) movies_count = 0 if movies_response.success and movies_response.data: movies_count = movies_response.data.get("limits", {}).get("total", 0) # Récupérer les stats des séries shows_response = self._make_request("VideoLibrary.GetTVShows", {"limits": {"end": 0}}) shows_count = 0 if shows_response.success and shows_response.data: shows_count = shows_response.data.get("limits", {}).get("total", 0) # Récupérer les stats des épisodes episodes_response = self._make_request("VideoLibrary.GetEpisodes", {"limits": {"end": 0}}) episodes_count = 0 if episodes_response.success and episodes_response.data: episodes_count = episodes_response.data.get("limits", {}).get("total", 0) # Récupérer les stats musicales songs_response = self._make_request("AudioLibrary.GetSongs", {"limits": {"end": 0}}) songs_count = 0 if songs_response.success and songs_response.data: songs_count = songs_response.data.get("limits", {}).get("total", 0) stats = { "movies": movies_count, "tv_shows": shows_count, "episodes": episodes_count, "songs": songs_count, "total_video_items": movies_count + episodes_count } return KodiResponse(success=True, data=stats) except Exception as e: logger.error(f"Erreur lors de la récupération des statistiques: {e}") return KodiResponse(success=False, error=str(e)) def scan_library(self, library_type: str = "video") -> KodiResponse: """ Lancer un scan de la bibliothèque Args: library_type: Type de bibliothèque ("video" ou "audio") """ if library_type.lower() == "video": return self._make_request("VideoLibrary.Scan") elif library_type.lower() == "audio": return self._make_request("AudioLibrary.Scan") else: return KodiResponse( success=False, error="Type de bibliothèque invalide. Doit être 'video' ou 'audio'" ) def get_volume(self) -> KodiResponse: """Récupérer le niveau de volume actuel""" return self._make_request("Application.GetProperties", {"properties": ["volume", "muted"]}) def format_file_size(self, size_bytes: int) -> str: """ Formate une taille en octets en format lisible (GB/MB/KB) Args: size_bytes: Taille en octets Returns: Chaîne formatée (ex: "1.5 GB", "245 MB") """ if size_bytes == 0: return "0 B" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_bytes < 1024: if unit == 'B': return f"{size_bytes} {unit}" else: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024 return f"{size_bytes:.1f} PB" def list_directory(self, path: str, limit: int = 50) -> KodiResponse: """ Liste les fichiers vidéo dans un répertoire via l'API Kodi Files.GetDirectory Args: path: Chemin du répertoire à lister limit: Nombre maximum de fichiers à retourner Returns: KodiResponse avec la liste des fichiers vidéo """ try: # Paramètres pour Files.GetDirectory selon la doc Kodi params = { "directory": path } response = self._make_request("Files.GetDirectory", params) if not response.success: return KodiResponse( success=False, error=f"Impossible d'accéder au dossier: {response.error}", error_code="DIRECTORY_ACCESS_ERROR" ) files = [] total_processed = 0 if response.data and "files" in response.data: video_extensions = [".mkv", ".mp4", ".avi", ".mov", ".wmv", ".m4v", ".flv", ".webm"] for file_info in response.data["files"]: if total_processed >= limit: break # Ne traiter que les fichiers (pas les dossiers pour l'instant) if file_info.get("filetype") == "file": file_path = file_info.get("file", "") # Vérifier si c'est un fichier vidéo if any(file_path.lower().endswith(ext) for ext in video_extensions): file_size = file_info.get("size", 0) files.append({ "name": file_path.split("/")[-1], # Nom du fichier "path": file_path, "size": file_size, "size_formatted": self.format_file_size(file_size), "modified": file_info.get("datemodified", ""), "type": "video" }) total_processed += 1 return KodiResponse( success=True, data={ "files": files, "total": len(files), "path": path, "limit": limit } ) except Exception as e: logger.error(f"Erreur lors du listing du répertoire {path}: {e}") return KodiResponse( success=False, error=f"Erreur lors du listing: {str(e)}", error_code="DIRECTORY_LIST_ERROR" ) def play_file(self, file_path: str) -> KodiResponse: """ Lance la lecture d'un fichier par son chemin via Player.Open Args: file_path: Chemin complet du fichier à lire Returns: KodiResponse avec le résultat de l'opération """ if not file_path or not file_path.strip(): return KodiResponse(success=False, error="Le chemin du fichier ne peut pas être vide") try: params = { "item": { "file": file_path.strip() } } response = self._make_request("Player.Open", params) if response.success: return KodiResponse( success=True, data={ "message": f"Lecture lancée: {file_path.split('/')[-1]}", "file_path": file_path, "status": "playing" } ) else: return KodiResponse( success=False, error=f"Impossible de lancer le fichier: {response.error}", error_code="FILE_PLAY_ERROR" ) except Exception as e: logger.error(f"Erreur lors de la lecture du fichier {file_path}: {e}") return KodiResponse( success=False, error=f"Erreur lors de la lecture: {str(e)}", error_code="FILE_PLAY_EXCEPTION" ) def search_in_directory(self, path: str, query: str) -> KodiResponse: """ Cherche des fichiers vidéo dans un répertoire par nom (insensible à la casse) Args: path: Chemin du répertoire à chercher query: Terme de recherche Returns: KodiResponse avec les fichiers trouvés """ if not query or not query.strip(): return KodiResponse(success=False, error="Le terme de recherche ne peut pas être vide") try: # D'abord lister tous les fichiers du répertoire list_response = self.list_directory(path, limit=100) # Plus de fichiers pour la recherche if not list_response.success: return KodiResponse( success=False, error=f"Impossible de lister le répertoire: {list_response.error}", error_code="SEARCH_DIRECTORY_ERROR" ) # Filtrer par le terme de recherche (insensible à la casse) query_lower = query.strip().lower() filtered_files = [] files_data = list_response.data.get("files", []) for file_info in files_data: file_name = file_info.get("name", "") file_path = file_info.get("path", "") # Rechercher dans le nom du fichier ET dans le chemin if (query_lower in file_name.lower() or query_lower in file_path.lower()): filtered_files.append(file_info) return KodiResponse( success=True, data={ "files": filtered_files, "total": len(filtered_files), "query": query, "path": path } ) except Exception as e: logger.error(f"Erreur lors de la recherche dans {path} avec '{query}': {e}") return KodiResponse( success=False, error=f"Erreur lors de la recherche: {str(e)}", error_code="SEARCH_EXCEPTION" ) def calculate_match_score(self, filename: str, query: str) -> float: """ Calcule un score de pertinence pour un fichier par rapport à une requête Args: filename: Nom du fichier query: Terme de recherche Returns: Score de pertinence (plus élevé = plus pertinent) """ filename_lower = filename.lower() query_lower = query.lower().strip() if not query_lower: return 0.0 score = 0.0 # Score de base si le terme est trouvé if query_lower in filename_lower: score += 10.0 # Bonus si c'est au début du nom if filename_lower.startswith(query_lower): score += 20.0 # Bonus pour correspondance de mots complets query_words = query_lower.split() filename_words = filename_lower.replace(".", " ").replace("_", " ").replace("-", " ").split() for query_word in query_words: if query_word in filename_words: score += 15.0 # Bonus pour correspondance partielle de mot for filename_word in filename_words: if query_word in filename_word: score += 5.0 # Malus pour fichiers très longs (sample, trailer, etc.) suspicious_keywords = ["sample", "trailer", "preview", "demo"] for keyword in suspicious_keywords: if keyword in filename_lower: score -= 10.0 # Bonus pour qualité (1080p, 720p, etc.) quality_keywords = ["1080p", "720p", "4k", "hd", "bluray", "webrip"] for keyword in quality_keywords: if keyword in filename_lower: score += 2.0 return score def find_best_match_and_play(self, path: str, query: str, auto_play: bool = True) -> KodiResponse: """ Recherche intelligente et lecture automatique du meilleur fichier trouvé Args: path: Chemin du répertoire à chercher query: Terme de recherche (peut être partiel) auto_play: Si True, lance automatiquement le meilleur résultat Returns: KodiResponse avec les résultats et l'action effectuée """ if not query or not query.strip(): return KodiResponse(success=False, error="Le terme de recherche ne peut pas être vide") try: # Rechercher tous les fichiers correspondants search_response = self.search_in_directory(path, query) if not search_response.success: return KodiResponse( success=False, error=f"Erreur de recherche: {search_response.error}", error_code="SMART_SEARCH_ERROR" ) files = search_response.data.get("files", []) if not files: return KodiResponse( success=False, error=f"Aucun fichier trouvé pour '{query}' dans {path}", error_code="NO_MATCH_FOUND" ) # Calculer le score de pertinence pour chaque fichier scored_files = [] for file_info in files: filename = file_info.get("name", "") score = self.calculate_match_score(filename, query) scored_files.append({ **file_info, "relevance_score": score }) # Trier par score décroissant scored_files.sort(key=lambda x: x.get("relevance_score", 0), reverse=True) best_match = scored_files[0] if scored_files else None if not best_match: return KodiResponse( success=False, error="Aucun fichier valide trouvé", error_code="NO_VALID_MATCH" ) result_data = { "query": query, "total_found": len(files), "best_match": best_match, "all_matches": scored_files[:5], # Top 5 pour information "path": path, "auto_played": False } # Lancer automatiquement si demandé if auto_play: file_path = best_match.get("path") if file_path: play_response = self.play_file(file_path) if play_response.success: result_data["auto_played"] = True result_data["play_status"] = "success" result_data["message"] = f"Lecture lancée: {best_match.get('name')}" else: result_data["play_status"] = "failed" result_data["play_error"] = play_response.error return KodiResponse( success=True, data=result_data ) except Exception as e: logger.error(f"Erreur lors de la recherche intelligente pour '{query}': {e}") return KodiResponse( success=False, error=f"Erreur lors de la recherche intelligente: {str(e)}", error_code="SMART_SEARCH_EXCEPTION" ) def test_connection(self) -> bool: """ Test simple de connexion à Kodi Returns: True si la connexion fonctionne, False sinon """ try: response = self.ping() return response.success and response.data == "pong" except Exception as e: logger.error(f"Test de connexion échoué: {e}") return False def test_connection(self) -> bool: """ Test simple de connexion à Kodi Returns: True si la connexion fonctionne, False sinon """ try: response = self.ping() return response.success and response.data == "pong" except Exception as e: logger.error(f"Test de connexion échoué: {e}") return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/laHeud/kodi-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server