Skip to main content
Glama

Bluetooth MCP Server

bluetooth_service.py25 kB
""" Service principal pour la gestion des opérations Bluetooth. Ce service orchestre l'utilisation des scanners BLE, classique et Windows spécifique. """ import logging import platform import re import asyncio import concurrent.futures from typing import List, Optional, Dict, Any, Set from app.models.bluetooth import BluetoothDevice from app.services.ble_scanner import ble_scanner from app.services.windows_advanced_scanner import windows_advanced_scanner from app.services.classic_scanner import classic_scanner, CLASSIC_BT_AVAILABLE from app.utils.bluetooth_utils import merge_device_info, normalize_mac_address # Import conditionnel du scanner Windows IS_WINDOWS = platform.system() == "Windows" if IS_WINDOWS: from app.services.windows_scanner import windows_scanner # Configurer le logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Pool d'exécuteurs pour les opérations multithreads thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=4) class BluetoothScanError(Exception): """Exception personnalisée pour les erreurs de scan Bluetooth""" pass class BluetoothService: """Service pour gérer les opérations Bluetooth (BLE et classique)""" async def scan_for_devices(self, duration: float = 5.0, filter_name: Optional[str] = None, connect_for_details: bool = False, include_classic: bool = True, extended_freebox_detection: bool = True, deduplicate_devices: bool = True, parallel_scans: bool = True) -> List[BluetoothDevice]: """ Effectue un scan des appareils Bluetooth à proximité (BLE et classique) avec une gestion améliorée des doublons et exécution parallèle Args: duration: Durée du scan en secondes filter_name: Filtre optionnel sur le nom des appareils connect_for_details: Si True, se connecte à chaque appareil pour plus d'informations include_classic: Si True, inclut les appareils Bluetooth classiques extended_freebox_detection: Si True, utilise des méthodes spéciales pour détecter la Freebox deduplicate_devices: Si True, fusionne les appareils en double parallel_scans: Si True, exécute les différents scans en parallèle Returns: Liste des appareils Bluetooth détectés Raises: BluetoothScanError: En cas d'erreur pendant le scan """ try: logger.debug(f"Démarrage du scan Bluetooth complet (durée: {duration}s, filtre: {filter_name}, " f"connexion détaillée: {connect_for_details}, Bluetooth classique: {include_classic}, " f"détection Freebox: {extended_freebox_detection}, " f"déduplication: {deduplicate_devices}, " f"scans parallèles: {parallel_scans})") # Dictionnaire pour stocker tous les appareils découverts all_devices = {} if parallel_scans: # Exécution parallèle des différents scans tasks = [] # 1. Scan BLE avec Bleak (toujours actif) tasks.append(self._ble_scan_task(duration, filter_name, connect_for_details)) # 2. Scan Bluetooth classique (si demandé et disponible) if include_classic and CLASSIC_BT_AVAILABLE: tasks.append(self._classic_scan_task(duration, filter_name)) # 3. Sur Windows, scanner spécifique (si demandé) if IS_WINDOWS and extended_freebox_detection: tasks.append(self._windows_scan_task(duration, filter_name)) # 4. Sur Windows, scanner avancé pour détecter plus d'appareils (TV, Freebox, etc.) if IS_WINDOWS and extended_freebox_detection: tasks.append(self._windows_advanced_scan_task(duration, filter_name)) # Attendre que tous les scans se terminent scan_results = await asyncio.gather(*tasks, return_exceptions=True) # Traiter les résultats for result in scan_results: if isinstance(result, Exception): logger.error(f"Une méthode de scan a échoué: {str(result)}") continue # Fusionner les appareils détectés for device in result: device_id = device["id"] if deduplicate_devices and device_id in all_devices: all_devices[device_id] = merge_device_info(all_devices[device_id], device) else: all_devices[device_id] = device else: # Exécution séquentielle (ancien comportement) # 1. Scan BLE avec Bleak logger.debug("Démarrage du scan BLE...") ble_devices = await ble_scanner.scan(duration, filter_name, connect_for_details) # Ajouter les appareils BLE au dictionnaire for device in ble_devices: device_id = device["id"] device["source_id"] = device_id # Conserver l'ID d'origine device["detected_by"] = "ble_scanner" if deduplicate_devices: # Vérifier si cet appareil est déjà connu if device_id in all_devices: all_devices[device_id] = merge_device_info(all_devices[device_id], device) else: all_devices[device_id] = device else: # Sans déduplication all_devices[device_id] = device # 2. Scan Bluetooth classique (si demandé et disponible) if include_classic and CLASSIC_BT_AVAILABLE: logger.debug("Démarrage du scan Bluetooth classique...") classic_devices = classic_scanner.scan(duration, filter_name) # Traiter les appareils classiques for device in classic_devices: device_id = device["id"] device["source_id"] = device_id device["detected_by"] = "classic_scanner" if deduplicate_devices: # Vérifier si cet appareil est déjà connu if device_id in all_devices: all_devices[device_id] = merge_device_info(all_devices[device_id], device) else: all_devices[device_id] = device else: # Sans déduplication all_devices[device_id] = device elif include_classic and not CLASSIC_BT_AVAILABLE and not IS_WINDOWS: logger.warning("Le module Bluetooth classique n'est pas disponible. " "Installation recommandée: pip install pybluez2") # 3. Sur Windows, scanner spécifique (si demandé) if IS_WINDOWS and extended_freebox_detection: logger.debug("Démarrage du scan Windows spécifique...") try: # Doubler la durée pour le scanner Windows qui peut être plus lent windows_devices = windows_scanner.scan(duration * 1.5, filter_name) # Traiter les appareils Windows for device in windows_devices: device_id = device["id"] device["source_id"] = device_id device["detected_by"] = device.get("detected_by", "windows_scanner") if deduplicate_devices: # Vérifier si cet appareil est déjà connu if device_id in all_devices: all_devices[device_id] = merge_device_info(all_devices[device_id], device) elif self._find_matching_device(device, all_devices): # Appareil similaire trouvé, fusion match_id = self._find_matching_device(device, all_devices) all_devices[match_id] = merge_device_info(all_devices[match_id], device) else: # Nouvel appareil all_devices[device_id] = device else: # Sans déduplication all_devices[device_id] = device except Exception as e: logger.error(f"Erreur lors du scan Windows: {str(e)}") # 4. Sur Windows, scanner avancé pour détecter plus d'appareils (TV, Freebox, etc.) if IS_WINDOWS and extended_freebox_detection: logger.debug("Démarrage du scan Windows avancé...") try: # Ce scanner a besoin de plus de temps pour détecter les appareils spéciaux windows_advanced_devices = windows_advanced_scanner.scan(duration * 2, filter_name) # Traiter les appareils Windows avancés for device in windows_advanced_devices: device_id = device["id"] device["source_id"] = device_id device["detected_by"] = device.get("detected_by", "windows_advanced_scanner") if deduplicate_devices: # Vérifier si cet appareil est déjà connu if device_id in all_devices: all_devices[device_id] = merge_device_info(all_devices[device_id], device) elif self._find_matching_device(device, all_devices): # Appareil similaire trouvé, fusion match_id = self._find_matching_device(device, all_devices) all_devices[match_id] = merge_device_info(all_devices[match_id], device) else: # Nouvel appareil all_devices[device_id] = device else: # Sans déduplication all_devices[device_id] = device except Exception as e: logger.error(f"Erreur lors du scan Windows avancé: {str(e)}") # Déduplication finale basée sur les critères avancés si demandé if deduplicate_devices: all_devices = self._advanced_deduplication(all_devices) logger.debug(f"Scan total terminé. {len(all_devices)} appareil(s) unique(s) trouvé(s)") # Convertir les dictionnaires en modèles BluetoothDevice return [BluetoothDevice(**device) for device in all_devices.values()] except Exception as e: logger.error(f"Erreur lors du scan Bluetooth: {str(e)}", exc_info=True) raise BluetoothScanError(f"Erreur lors du scan Bluetooth: {str(e)}") async def _windows_advanced_scan_task(self, duration: float, filter_name: Optional[str]) -> List[Dict[str, Any]]: """Tâche asynchrone pour le scan Windows avancé""" try: logger.debug("Démarrage du scan Windows avancé asynchrone...") devices = await windows_advanced_scanner.scan_async(duration, filter_name) # Marquer les appareils avec leur source for device in devices: device["source_id"] = device["id"] if "detected_by" not in device: device["detected_by"] = "windows_advanced_scanner" logger.debug(f"Scan Windows avancé terminé. {len(devices)} appareil(s) trouvé(s)") return devices except Exception as e: logger.error(f"Erreur lors du scan Windows avancé: {str(e)}") return [] async def _ble_scan_task(self, duration: float, filter_name: Optional[str], connect_for_details: bool) -> List[Dict[str, Any]]: """Tâche asynchrone pour le scan BLE""" try: logger.debug("Démarrage du scan BLE asynchrone...") devices = await ble_scanner.scan(duration, filter_name, connect_for_details) # Marquer les appareils avec leur source for device in devices: device["source_id"] = device["id"] device["detected_by"] = "ble_scanner" logger.debug(f"Scan BLE terminé. {len(devices)} appareil(s) trouvé(s)") return devices except Exception as e: logger.error(f"Erreur lors du scan BLE: {str(e)}") return [] async def _classic_scan_task(self, duration: float, filter_name: Optional[str]) -> List[Dict[str, Any]]: """Tâche asynchrone pour le scan Bluetooth classique""" try: logger.debug("Démarrage du scan Bluetooth classique asynchrone...") # Comme classic_scanner.scan est synchrone, l'exécuter dans un thread séparé devices = await asyncio.to_thread(classic_scanner.scan, duration * 1.5, filter_name) # Marquer les appareils avec leur source for device in devices: device["source_id"] = device["id"] device["detected_by"] = "classic_scanner" logger.debug(f"Scan Bluetooth classique terminé. {len(devices)} appareil(s) trouvé(s)") return devices except Exception as e: logger.error(f"Erreur lors du scan Bluetooth classique: {str(e)}") return [] async def _windows_scan_task(self, duration: float, filter_name: Optional[str]) -> List[Dict[str, Any]]: """Tâche asynchrone pour le scan Windows spécifique""" try: logger.debug("Démarrage du scan Windows asynchrone...") # Comme windows_scanner.scan est synchrone, l'exécuter dans un thread séparé devices = await asyncio.to_thread(windows_scanner.scan, duration * 2, filter_name) # Marquer les appareils avec leur source for device in devices: device["source_id"] = device["id"] if "detected_by" not in device: device["detected_by"] = "windows_scanner" logger.debug(f"Scan Windows terminé. {len(devices)} appareil(s) trouvé(s)") return devices except Exception as e: logger.error(f"Erreur lors du scan Windows: {str(e)}") return [] def _find_matching_device(self, device: Dict[str, Any], device_dict: Dict[str, Dict[str, Any]]) -> Optional[str]: """ Recherche un appareil correspondant dans le dictionnaire d'appareils Args: device: L'appareil à rechercher device_dict: Dictionnaire des appareils existants Returns: ID de l'appareil correspondant, ou None si aucune correspondance """ for existing_id, existing_device in device_dict.items(): if self._device_matches(existing_device, device): return existing_id return None def _advanced_deduplication(self, devices: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: """ Effectue une déduplication avancée en fusionnant les appareils similaires Args: devices: Dictionnaire des appareils à dédupliquer Returns: Dictionnaire des appareils dédupliqués """ result = {} processed_ids = set() # Trier les appareils par RSSI (du plus fort au plus faible) sorted_devices = sorted( devices.items(), key=lambda x: x[1].get("rssi", -100) if x[1].get("rssi") is not None else -100, reverse=True ) # Parcourir les appareils triés for device_id, device in sorted_devices: if device_id in processed_ids: continue # Ajouter l'appareil au résultat result[device_id] = device processed_ids.add(device_id) # Rechercher des appareils similaires for other_id, other_device in devices.items(): if other_id in processed_ids: continue if self._device_matches(device, other_device): # Fusionner l'appareil similaire avec l'appareil actuel result[device_id] = merge_device_info(result[device_id], other_device) processed_ids.add(other_id) return result def _normalize_device_id(self, address: str) -> str: """ Normalise une adresse d'appareil pour l'utiliser comme clé de déduplication Args: address: L'adresse de l'appareil Returns: L'adresse normalisée pour la déduplication """ if not address: return "" # Pour les adresses MAC, les normaliser au format standard if self._is_mac_address(address): return normalize_mac_address(address) # Pour les autres formats, retourner simplement l'adresse return address def _is_mac_address(self, address: str) -> bool: """ Vérifie si une chaîne ressemble à une adresse MAC Args: address: La chaîne à vérifier Returns: True si la chaîne ressemble à une adresse MAC, False sinon """ if not address: return False # Supprimer tous les séparateurs courants et vérifier s'il reste des caractères hexadécimaux clean = address.upper().replace(':', '').replace('-', '').replace('.', '') # Vérifier si la chaîne contient au moins 12 caractères hexadécimaux return len(clean) >= 12 and all(c in '0123456789ABCDEF' for c in clean[:12]) def _names_match(self, name1: str, name2: str, threshold: float = 0.7) -> bool: """ Vérifie si deux noms d'appareils semblent correspondre en utilisant une comparaison basique Args: name1: Premier nom name2: Deuxième nom threshold: Seuil de correspondance (0.0-1.0) Returns: True si les noms semblent correspondre, False sinon """ if not name1 or not name2: return False # Normalisation des noms (minuscules) name1 = name1.lower() name2 = name2.lower() # Correspondance parfaite if name1 == name2: return True # L'un contient l'autre if name1 in name2 or name2 in name1: return True # Calcul d'une distance simple (caractères communs) common_chars = set(name1) & set(name2) if len(common_chars) / max(len(set(name1)), len(set(name2))) >= threshold: return True # Vérifier les sous-chaînes communes for length in range(3, min(len(name1), len(name2)) + 1): for i in range(len(name1) - length + 1): if name1[i:i+length] in name2: return True return False def _device_matches(self, device1: Dict[str, Any], device2: Dict[str, Any]) -> bool: """ Détermine si deux appareils correspondent probablement au même appareil physique Args: device1: Premier appareil device2: Deuxième appareil Returns: True si les appareils semblent correspondre, False sinon """ # Cas 1: Adresses MAC identiques (normalisées) if self._is_mac_address(device1.get("address", "")) and self._is_mac_address(device2.get("address", "")): norm_addr1 = self._normalize_device_id(device1["address"]) norm_addr2 = self._normalize_device_id(device2["address"]) if norm_addr1 and norm_addr2 and norm_addr1 == norm_addr2: return True # Cas 2: Mêmes noms significatifs if (device1.get("name") and device2.get("name") and device1["name"] != "Unknown" and device2["name"] != "Unknown" and self._names_match(device1["name"], device2["name"])): return True # Cas 2.5: Noms décodés qui correspondent if device1.get("name") and device2.get("name"): from app.utils.bluetooth_utils import decode_ascii_name decoded_name1 = decode_ascii_name(device1["name"]) decoded_name2 = decode_ascii_name(device2["name"]) if decoded_name1 != device1["name"] or decoded_name2 != device2["name"]: if decoded_name1 and decoded_name2 and self._names_match(decoded_name1, decoded_name2): return True # Cas 3: Nom convivial identique et non générique if (device1.get("friendly_name") and device2.get("friendly_name") and "Device" not in device1["friendly_name"] and "Device" not in device2["friendly_name"] and self._names_match(device1["friendly_name"], device2["friendly_name"])): return True # Cas 4: Source ID contenu dans merged_from if device1.get("source_id") and device2.get("merged_from") and device1["source_id"] in device2["merged_from"]: return True if device2.get("source_id") and device1.get("merged_from") and device2["source_id"] in device1["merged_from"]: return True # Cas 5: Services UUID identiques (pour les UUID significatifs) significant_uuids = [ # Services communs bien connus "0000180f-0000-1000-8000-00805f9b34fb", # Battery Service "00001800-0000-1000-8000-00805f9b34fb", # Generic Access "00001801-0000-1000-8000-00805f9b34fb", # Generic Attribute "0000180a-0000-1000-8000-00805f9b34fb", # Device Information "0000180d-0000-1000-8000-00805f9b34fb", # Heart Rate "00001803-0000-1000-8000-00805f9b34fb", # Link Loss "00001805-0000-1000-8000-00805f9b34fb", # Current Time "00001812-0000-1000-8000-00805f9b34fb", # Human Interface Device "00001813-0000-1000-8000-00805f9b34fb", # Scan Parameters "00001819-0000-1000-8000-00805f9b34fb", # Location and Navigation ] if device1.get("service_uuids") and device2.get("service_uuids"): # Vérifier si au moins un UUID significatif est présent dans les deux appareils common_uuids = set(device1["service_uuids"]) & set(device2["service_uuids"]) for uuid in significant_uuids: if uuid in common_uuids: return True # Cas 6: Identifiants de fabricant identiques (si disponibles) if device1.get("manufacturer_data") and device2.get("manufacturer_data"): # Si les deux appareils ont des données de fabricant, vérifier s'ils partagent au moins un ID common_manufacturers = set(device1["manufacturer_data"].keys()) & set(device2["manufacturer_data"].keys()) if common_manufacturers: return True return False # Création d'une instance du service pour pouvoir l'importer facilement bluetooth_service = BluetoothService()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Hypijump31/bluetooth-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server