Skip to main content
Glama

Bluetooth MCP Server

windows_scanner.py29.9 kB
""" Module spécialisé pour le scan Bluetooth sur Windows en utilisant les API natives. Ceci est une alternative au scanner classique qui nécessite pybluez2. """ import logging import platform import subprocess import re import time import os import asyncio from typing import Dict, List, Optional, Any from app.utils.bluetooth_utils import decode_ascii_name from app.utils.bluetooth_utils import get_friendly_device_name from app.data.mac_prefixes import get_device_info # Configurer le logging logger = logging.getLogger(__name__) # Vérifier si nous sommes sur Windows IS_WINDOWS = platform.system() == "Windows" class WindowsBTScanner: """Scanner Bluetooth spécifique à Windows utilisant les commandes système""" def scan(self, duration: float = 10.0, filter_name: Optional[str] = None) -> List[Dict[str, Any]]: """ Effectue un scan Bluetooth sur Windows via les commandes système. Args: duration: Durée du scan en secondes (utilisé comme timeout) filter_name: Filtre optionnel sur le nom des appareils Returns: Liste de dictionnaires contenant les informations des appareils Bluetooth """ if not IS_WINDOWS: logger.warning("Ce scanner est spécifique à Windows et ne fonctionnera pas sur d'autres systèmes") return [] # Dictionnaire pour stocker tous les appareils détectés (par adresse MAC ou ID unique) all_devices = {} try: # Exécution parallèle des différentes méthodes de scan Windows # pour accélérer le processus all_devices = self._run_parallel_scans(duration, filter_name) # Convertir le dictionnaire en liste devices = list(all_devices.values()) logger.debug(f"Scan Windows terminé. {len(devices)} appareil(s) unique(s) trouvé(s)") return devices except Exception as e: logger.error(f"Erreur lors du scan Bluetooth Windows: {str(e)}", exc_info=True) return [] async def scan_async(self, duration: float = 10.0, filter_name: Optional[str] = None) -> List[Dict[str, Any]]: """ Version asynchrone du scan Windows. Args: duration: Durée du scan en secondes filter_name: Filtre optionnel sur le nom des appareils Returns: Liste de dictionnaires contenant les informations des appareils Bluetooth """ # Exécuter le scan synchrone dans un thread pour ne pas bloquer la boucle d'événements return await asyncio.to_thread(self.scan, duration, filter_name) def _run_parallel_scans(self, duration: float, filter_name: Optional[str]) -> Dict[str, Dict[str, Any]]: """ Exécute les différentes méthodes de scan Windows en parallèle. Args: duration: Durée du scan en secondes filter_name: Filtre optionnel sur le nom des appareils Returns: Dictionnaire des appareils détectés """ import concurrent.futures all_devices = {} # Liste des méthodes de scan à exécuter en parallèle scan_methods = [ self._scan_pnp_devices, self._scan_bluetooth_adapter, self._scan_wmi_devices, self._scan_netsh_devices, self._scan_registry_devices ] # Exécuter les méthodes de scan en parallèle with concurrent.futures.ThreadPoolExecutor(max_workers=len(scan_methods)) as executor: # Démarrer toutes les tâches futures = {executor.submit(method, duration / len(scan_methods), filter_name): method.__name__ for method in scan_methods} # Collecter les résultats au fur et à mesure qu'ils sont disponibles for future in concurrent.futures.as_completed(futures): method_name = futures[future] try: devices = future.result() logger.debug(f"Méthode {method_name} terminée. {len(devices)} appareil(s) trouvé(s)") # Fusionner les résultats for device_id, device in devices.items(): if device_id in all_devices: # Fusionner les informations all_devices[device_id].update(device) else: # Nouvel appareil all_devices[device_id] = device except Exception as e: logger.error(f"Erreur lors de l'exécution de {method_name}: {str(e)}") return all_devices def _scan_pnp_devices(self, duration: float, filter_name: Optional[str]) -> Dict[str, Dict[str, Any]]: """ Scan des appareils PnP via PowerShell Get-PnpDevice. Args: duration: Durée du scan en secondes filter_name: Filtre optionnel sur le nom des appareils Returns: Dictionnaire des appareils détectés """ devices = {} # Commande PowerShell avec sortie encodée en UTF-8 powershell_cmd = [ 'powershell', '-NoProfile', '-Command', """ $OutputEncoding = [Console]::OutputEncoding = [System.Text.Encoding]::UTF8 # Essayons d'abord via Get-PnpDevice try { $btDevices = Get-PnpDevice -Class Bluetooth | Where-Object { $_.Status -eq 'OK' } foreach ($dev in $btDevices) { Write-Output ("Device: " + $dev.FriendlyName + " | ID: " + $dev.DeviceID + " | Status: " + $dev.Status) } } catch { Write-Output "Error: $_" } """ ] # Exécuter la commande avec un timeout et encodage UTF-8 try: ps_result = subprocess.run( powershell_cmd, capture_output=True, text=True, timeout=duration, encoding='utf-8' ) logger.debug(f"Résultat Get-PnpDevice: {len(ps_result.stdout.splitlines())} lignes") # Analyse des résultats device_pattern = r'Device: (.*) \| ID: (.*) \| Status: (.*)' for line in ps_result.stdout.splitlines(): match = re.match(device_pattern, line) if match: name = match.group(1).strip() device_id = match.group(2).strip() status = match.group(3).strip() # Appliquer le filtre si nécessaire if filter_name is not None and filter_name.lower() not in name.lower(): continue if "freebox" in name.lower() or "free" in name.lower(): logger.info(f"Freebox trouvée via Get-PnpDevice: {name}") # Créer un ID unique pour l'appareil device_id_clean = device_id.replace('&', '-').replace('\\', '-') unique_id = f"WIN-PNP-{device_id_clean}" # Ajouter l'appareil au dictionnaire devices[unique_id] = { "id": unique_id, "address": unique_id[:17] if len(unique_id) > 17 else unique_id, "name": name, "rssi": -60, # Valeur fictive "manufacturer_data": {}, "service_uuids": [], "service_data": {}, "tx_power": None, "appearance": None, "company_name": "Unknown (Windows)", "device_type": "Windows-PnP", "friendly_name": name, "detected_by": "windows_pnp", "raw_info": f"ID: {device_id}, Status: {status}" } except (subprocess.SubprocessError, UnicodeDecodeError) as e: logger.error(f"Erreur avec Get-PnpDevice: {str(e)}") return devices def _scan_bluetooth_adapter(self, duration: float, filter_name: Optional[str]) -> Dict[str, Dict[str, Any]]: """ Scan via PowerShell BluetoothAdapter. Args: duration: Durée du scan en secondes filter_name: Filtre optionnel sur le nom des appareils Returns: Dictionnaire des appareils détectés """ devices = {} bt_adapter_cmd = [ 'powershell', '-NoProfile', '-Command', """ $OutputEncoding = [Console]::OutputEncoding = [System.Text.Encoding]::UTF8 try { Add-Type -AssemblyName System.Runtime.WindowsRuntime $asTaskGeneric = ([System.WindowsRuntimeSystemExtensions].GetMethods() | Where-Object { $_.Name -eq 'AsTask' -and $_.GetParameters().Count -eq 1 -and $_.GetParameters()[0].ParameterType.Name -eq 'IAsyncOperation`1' })[0] Function Await($WinRtTask, $ResultType) { $asTask = $asTaskGeneric.MakeGenericMethod($ResultType) $netTask = $asTask.Invoke($null, @($WinRtTask)) $netTask.Wait() $netTask.Result } [Windows.Devices.Enumeration.DeviceInformation,Windows.Devices.Enumeration,ContentType=WindowsRuntime] | Out-Null [Windows.Devices.Bluetooth.BluetoothAdapter,Windows.Devices.Bluetooth,ContentType=WindowsRuntime] | Out-Null $bluetooth = [Windows.Devices.Bluetooth.BluetoothAdapter]::GetDefaultAsync() $adapter = Await $bluetooth ([Windows.Devices.Bluetooth.BluetoothAdapter]) if ($adapter) { Write-Output "Bluetooth Adapter Info:" Write-Output "--------------------" Write-Output ("Name: " + $adapter.Name) Write-Output ("ID: " + $adapter.BluetoothAddress) Write-Output ("Status: " + $adapter.ConnectionStatus) # Get paired devices $devices = [Windows.Devices.Enumeration.DeviceInformation]::FindAllAsync([Windows.Devices.Bluetooth.BluetoothDevice]::GetDeviceSelector()) $btDevices = Await $devices ([System.Collections.Generic.IReadOnlyList[Windows.Devices.Enumeration.DeviceInformation]]) Write-Output "" Write-Output ("Found " + $btDevices.Count + " Bluetooth devices:") Write-Output "--------------------" foreach ($device in $btDevices) { Write-Output ("BT-DEVICE: " + $device.Name + " | ID: " + $device.Id + " | Status: " + $device.Pairing.CanPair) } } else { Write-Output "No Bluetooth adapter found" } } catch { Write-Output "Error: $_" } """ ] try: bt_result = subprocess.run( bt_adapter_cmd, capture_output=True, text=True, timeout=duration, encoding='utf-8' ) # Analyse des résultats bt_device_pattern = r'BT-DEVICE: (.*) \| ID: (.*) \| Status: (.*)' device_count = 0 for line in bt_result.stdout.splitlines(): match = re.match(bt_device_pattern, line) if match: device_count += 1 name = match.group(1).strip() device_id = match.group(2).strip() status = match.group(3).strip() # Appliquer le filtre si nécessaire if filter_name is not None and filter_name.lower() not in name.lower(): continue if "freebox" in name.lower() or "free" in name.lower(): logger.info(f"Freebox trouvée via BluetoothAdapter: {name}") # Créer un ID unique pour l'appareil device_id_clean = device_id.replace('#', '-') device_id_clean = device_id_clean[-17:] if len(device_id_clean) > 17 else device_id_clean unique_id = f"WIN-BT-{device_id_clean}" # Extraire l'adresse MAC potentielle du device_id mac_match = re.search(r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})', device_id) address = mac_match.group(0) if mac_match else unique_id[:17] # Ajouter l'appareil au dictionnaire devices[unique_id] = { "id": unique_id, "address": address, "name": name, "rssi": -55, # Valeur fictive "manufacturer_data": {}, "service_uuids": [], "service_data": {}, "tx_power": None, "appearance": None, "company_name": "Unknown (Windows)", "device_type": "Windows-BT", "friendly_name": name, "detected_by": "windows_bluetooth_adapter", "raw_info": f"ID: {device_id}, Status: {status}" } logger.info(f"Trouvé {device_count} périphériques via BluetoothAdapter") except (subprocess.SubprocessError, UnicodeDecodeError) as e: logger.error(f"Erreur avec BluetoothAdapter: {str(e)}") return devices def _scan_wmi_devices(self, duration: float, filter_name: Optional[str]) -> Dict[str, Dict[str, Any]]: """ Scan via WMI. Args: duration: Durée du scan en secondes filter_name: Filtre optionnel sur le nom des appareils Returns: Dictionnaire des appareils détectés """ devices = {} wmi_cmd = [ 'powershell', '-NoProfile', '-Command', """ $OutputEncoding = [Console]::OutputEncoding = [System.Text.Encoding]::UTF8 try { $wmiDevices = Get-WmiObject -Query "SELECT * FROM Win32_PnPEntity WHERE PNPClass = 'Bluetooth'" | Select-Object Name, DeviceID, Status, Description if ($wmiDevices) { foreach ($device in $wmiDevices) { Write-Output ("WMI-BT: " + $device.Name + " | ID: " + $device.DeviceID + " | Status: " + $device.Status) } } else { Write-Output "No WMI Bluetooth devices found" } } catch { Write-Output "Error: $_" } """ ] try: wmi_result = subprocess.run( wmi_cmd, capture_output=True, text=True, timeout=duration, encoding='utf-8' ) # Analyse des résultats wmi_device_pattern = r'WMI-BT: (.*) \| ID: (.*) \| Status: (.*)' for line in wmi_result.stdout.splitlines(): match = re.match(wmi_device_pattern, line) if match: name = match.group(1).strip() device_id = match.group(2).strip() status = match.group(3).strip() # Appliquer le filtre si nécessaire if filter_name is not None and filter_name.lower() not in name.lower(): continue if "freebox" in name.lower() or "free" in name.lower(): logger.info(f"Freebox trouvée via WMI: {name}") # Créer un ID unique pour l'appareil device_id_clean = device_id.replace('&', '-').replace('\\', '-') unique_id = f"WIN-WMI-{device_id_clean}" # Ajouter l'appareil au dictionnaire s'il n'existe pas déjà devices[unique_id] = { "id": unique_id, "address": unique_id[:17] if len(unique_id) > 17 else unique_id, "name": name, "rssi": -65, # Valeur fictive "manufacturer_data": {}, "service_uuids": [], "service_data": {}, "tx_power": None, "appearance": None, "company_name": "Unknown (Windows)", "device_type": "Windows-WMI", "friendly_name": name, "detected_by": "windows_wmi", "raw_info": f"ID: {device_id}, Status: {status}" } except (subprocess.SubprocessError, UnicodeDecodeError) as e: logger.error(f"Erreur avec WMI: {str(e)}") return devices def _scan_netsh_devices(self, duration: float, filter_name: Optional[str]) -> Dict[str, Dict[str, Any]]: """ Scan via netsh. Args: duration: Durée du scan en secondes filter_name: Filtre optionnel sur le nom des appareils Returns: Dictionnaire des appareils détectés """ devices = {} netsh_cmd = [ 'netsh', 'bluetooth', 'show', 'devices' ] try: netsh_result = subprocess.run( netsh_cmd, capture_output=True, text=True, timeout=duration, encoding='utf-8' ) # Analyse des résultats # Exemple de sortie: # Device 1 # Device Name: DEV-1234 # Bluetooth Address: xx:xx:xx:xx:xx:xx device_block_pattern = r'Device \d+\s+Device Name: (.*)\s+Bluetooth Address: ([0-9a-fA-F:]{17})' for match in re.finditer(device_block_pattern, netsh_result.stdout, re.DOTALL): name = match.group(1).strip() address = match.group(2).strip() # Appliquer le filtre si nécessaire if filter_name is not None and filter_name.lower() not in name.lower(): continue if "freebox" in name.lower() or "free" in name.lower(): logger.info(f"Freebox trouvée via netsh: {name}") # Créer un ID unique pour l'appareil unique_id = f"WIN-NETSH-{address}" # Ajouter l'appareil au dictionnaire s'il n'existe pas déjà devices[unique_id] = { "id": address, "address": address, "name": name, "rssi": -55, # Valeur fictive "manufacturer_data": {}, "service_uuids": [], "service_data": {}, "tx_power": None, "appearance": None, "company_name": "Unknown (Windows)", "device_type": "Windows-NETSH", "friendly_name": name, "detected_by": "windows_netsh", "raw_info": f"Netsh Device: {name}" } except subprocess.SubprocessError as e: logger.error(f"Erreur avec netsh: {str(e)}") return devices def _scan_registry_devices(self, duration: float, filter_name: Optional[str]) -> Dict[str, Dict[str, Any]]: """ Recherche spécifique dans le registre Windows. Args: duration: Durée du scan en secondes filter_name: Filtre optionnel sur le nom des appareils Returns: Dictionnaire des appareils détectés """ devices = {} registry_cmd = [ 'powershell', '-NoProfile', '-Command', """ $OutputEncoding = [Console]::OutputEncoding = [System.Text.Encoding]::UTF8 try { # Rechercher des clés de registre contenant des appareils Bluetooth $regKeys = @( "HKLM:\\SYSTEM\\CurrentControlSet\\Services\\BTHPORT\\Parameters\\Devices", "HKLM:\\SYSTEM\\CurrentControlSet\\Services\\BTH\\Parameters\\Devices", "HKLM:\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Bluetooth\\Devices" ) foreach ($regKey in $regKeys) { if (Test-Path $regKey) { $devices = Get-ChildItem $regKey -ErrorAction SilentlyContinue foreach ($device in $devices) { $props = Get-ItemProperty -Path $device.PSPath -ErrorAction SilentlyContinue $name = "" # Essayer différentes propriétés qui pourraient contenir un nom if ($props.Name) { $name = $props.Name } elseif ($props.FriendlyName) { $name = $props.FriendlyName } elseif ($props.DeviceName) { $name = $props.DeviceName } elseif ($props.DeviceDesc) { $name = $props.DeviceDesc } # Si le nom contient "free" ou "freebox", c'est potentiellement une Freebox if ($name -match "free" -or $name -match "freebox") { Write-Output ("FREEBOX-REG: " + $name + " | ID: " + $device.PSChildName) } # Sinon, afficher quand même tous les appareils Bluetooth elseif ($name) { Write-Output ("BT-REG: " + $name + " | ID: " + $device.PSChildName) } } } } } catch { Write-Output "Error: $_" } """ ] try: registry_result = subprocess.run( registry_cmd, capture_output=True, text=True, timeout=duration, encoding='utf-8' ) # Analyse des résultats freebox_reg_pattern = r'FREEBOX-REG: (.*) \| ID: (.*)' bt_reg_pattern = r'BT-REG: (.*) \| ID: (.*)' # D'abord, chercher les entrées Freebox spécifiques for line in registry_result.stdout.splitlines(): match = re.match(freebox_reg_pattern, line) if match: name = match.group(1).strip() reg_id = match.group(2).strip() # Appliquer le filtre si nécessaire if filter_name is not None and filter_name.lower() not in name.lower(): continue logger.info(f"Freebox trouvée dans le registre: {name}") # Construire une adresse MAC à partir de l'ID du registre si possible address = None if len(reg_id) >= 12: # Convertir l'ID du registre en format MAC address = ':'.join([reg_id[i:i+2] for i in range(0, min(12, len(reg_id)), 2)]) else: address = f"FB:FX:{reg_id[-6:] if len(reg_id) >= 6 else '000000'}" # Ajouter la Freebox au dictionnaire devices[f"FREEBOX-REG-{reg_id}"] = { "id": address, "address": address, "name": "Freebox (" + name + ")", "rssi": -50, # Valeur fictive, priorité plus élevée "manufacturer_data": {}, "service_uuids": [], "service_data": {}, "tx_power": None, "appearance": None, "company_name": "Freebox SA", "device_type": "Windows-Registry", "friendly_name": "Freebox Player", "detected_by": "windows_registry_specific", "raw_info": f"Registry ID: {reg_id}, Name: {name}" } # Ensuite, traiter les entrées Bluetooth génériques for line in registry_result.stdout.splitlines(): match = re.match(bt_reg_pattern, line) if match: name = match.group(1).strip() reg_id = match.group(2).strip() # Appliquer le filtre si nécessaire if filter_name is not None and filter_name.lower() not in name.lower(): continue # Tenter de décoder le nom si c'est une séquence de chiffres séparés par des espaces decoded_name = decode_ascii_name(name) if decoded_name != name: logger.debug(f"Nom décodé de {name} en {decoded_name}") name = decoded_name # Si le nom contient "free" ou "freebox", c'est potentiellement une Freebox if "free" in name.lower() or "freebox" in name.lower(): logger.info(f"Freebox trouvée dans le registre (nom générique): {name}") # Construire une adresse MAC address = None if len(reg_id) >= 12: address = ':'.join([reg_id[i:i+2] for i in range(0, min(12, len(reg_id)), 2)]) else: address = f"FB:FX:{reg_id[-6:] if len(reg_id) >= 6 else '000000'}" # Ajouter la Freebox au dictionnaire devices[f"FREEBOX-REG-{reg_id}"] = { "id": address, "address": address, "name": "Freebox (" + name + ")", "rssi": -50, "manufacturer_data": {}, "service_uuids": [], "service_data": {}, "tx_power": None, "appearance": None, "company_name": "Freebox SA", "device_type": "Windows-Registry", "friendly_name": "Freebox Player", "detected_by": "windows_registry_specific", "raw_info": f"Registry ID: {reg_id}, Name: {name}" } else: # Créer un ID unique pour l'appareil unique_id = f"WIN-REG-{reg_id}" # Ajouter l'appareil au dictionnaire s'il n'existe pas déjà devices[unique_id] = { "id": unique_id, "address": unique_id[:17] if len(unique_id) > 17 else unique_id, "name": name, "rssi": -70, # Valeur fictive "manufacturer_data": {}, "service_uuids": [], "service_data": {}, "tx_power": None, "appearance": None, "company_name": "Unknown (Windows)", "device_type": "Windows-Registry", "friendly_name": name, # Utiliser le nom décodé comme nom convivial "detected_by": "windows_registry", "raw_info": f"Registry ID: {reg_id}" } except (subprocess.SubprocessError, UnicodeDecodeError) as e: logger.error(f"Erreur avec le registre: {str(e)}") return devices # Instance singleton pour faciliter l'importation windows_scanner = WindowsBTScanner()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Hypijump31/bluetooth-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server