"""Browser controller for opening YouTube URLs across different operating systems."""
import os
import platform
import subprocess
import webbrowser
from typing import Any
import shutil
import tempfile
from .validators import validate_youtube_url
from .constants import OS_WINDOWS, OS_MACOS, OS_LINUX, OS_WSL, BROWSER_SUCCESS_MESSAGE
class BrowserController:
"""Handles opening URLs in browser across different operating systems."""
# OS detection constants
WINDOWS = OS_WINDOWS
MACOS = OS_MACOS
LINUX = OS_LINUX
WSL = OS_WSL
VALID_SYSTEMS = {WINDOWS, MACOS, LINUX, WSL}
def __init__(self) -> None:
"""Initialize browser controller with process tracking."""
self.current_process_id: int | None = None # Process ID of last opened browser
self._temp_profile_dir: str | None = None # Temp profile directory for isolated session
def _close_previous_browser(self) -> None:
"""Close the previously opened browser window if still running."""
if self.current_process_id is not None:
try:
system = self.get_system()
pid_to_close = self.current_process_id
if system == self.WINDOWS:
# Windows: use taskkill first, then PowerShell fallback
self._kill_process_windows(pid_to_close)
elif system == self.WSL:
# WSL: kill both parent process and child browser processes
self._kill_process_wsl(pid_to_close)
else:
# Linux/macOS: use SIGTERM, then SIGKILL if needed
self._kill_process_unix(pid_to_close)
except Exception:
# Any error, just continue
pass
finally:
self.current_process_id = None
# Clean up temp profile directory if exists
if self._temp_profile_dir:
try:
shutil.rmtree(self._temp_profile_dir, ignore_errors=True)
except Exception:
pass
finally:
self._temp_profile_dir = None
@staticmethod
def _kill_process_windows(pid: int) -> None:
"""Kill a process on Windows using taskkill."""
try:
subprocess.run(
["taskkill", "/PID", str(pid), "/F", "/T"], # /T kills child processes too
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5,
check=False
)
except (FileNotFoundError, subprocess.TimeoutExpired):
# Fallback to PowerShell
try:
subprocess.run(
[
"powershell.exe",
"-NoProfile",
"-Command",
f"Stop-Process -Id {pid} -Force -ErrorAction SilentlyContinue"
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5,
check=False
)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
@staticmethod
def _kill_process_wsl(pid: int) -> None:
"""Kill a process from WSL, with multiple strategies."""
import time
# Strategy 1: Try taskkill via cmd.exe from WSL with PID
try:
subprocess.run(
["cmd.exe", "/c", f"taskkill /PID {pid} /F /T"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5,
check=False
)
time.sleep(0.5)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Strategy 2: Try PowerShell Stop-Process
try:
subprocess.run(
["powershell.exe", "-NoProfile", "-Command",
f"Stop-Process -Id {pid} -Force -ErrorAction SilentlyContinue"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5,
check=False
)
time.sleep(0.5)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Strategy 3: Kill by process name patterns (Chrome, Edge, Firefox)
# This is more aggressive and catches all instances
for proc_name in ["chrome.exe", "msedge.exe", "firefox.exe"]:
try:
# Try via cmd.exe
subprocess.run(
["cmd.exe", "/c", f"taskkill /IM {proc_name} /F /T"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5,
check=False
)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
try:
# Also try PowerShell
subprocess.run(
["powershell.exe", "-NoProfile", "-Command",
f"Get-Process {proc_name.replace('.exe', '')} -ErrorAction SilentlyContinue | Stop-Process -Force"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=5,
check=False
)
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
@staticmethod
def _kill_process_unix(pid: int) -> None:
"""Kill a process on Linux/macOS using SIGTERM, then SIGKILL if needed."""
try:
os.kill(pid, 15) # SIGTERM
except (OSError, ProcessLookupError):
pass
# Wait a bit and try SIGKILL if SIGTERM didn't work
try:
import time
time.sleep(0.5)
os.kill(pid, 9) # SIGKILL
except (OSError, ProcessLookupError):
pass
@staticmethod
def get_system() -> str:
"""
Get the current operating system.
Returns:
String representing the OS: 'windows', 'darwin' (macOS), 'wsl', or 'linux'
"""
system = platform.system()
if system == "Windows":
return BrowserController.WINDOWS
elif system == "Darwin":
return BrowserController.MACOS
elif system == "Linux":
# Check if running in WSL by reading /proc/version
try:
with open("/proc/version", "r", encoding="utf-8") as f:
proc_version = f.read().lower()
if "microsoft" in proc_version or "wsl" in proc_version:
return BrowserController.WSL
except (FileNotFoundError, IOError):
pass
return BrowserController.LINUX
else:
# Fallback to Linux for unknown systems
return BrowserController.LINUX
def open_youtube_url(self, video_url: str) -> dict[str, Any]:
"""
Open YouTube URL in a new browser window.
The method automatically detects the operating system and uses
the appropriate command to open the URL in a new window.
Closes any previously opened window from this controller.
Args:
video_url: Full YouTube URL to open
Returns:
Dictionary with success status and message/error
Raises:
ValueError: If URL is not a valid YouTube URL
"""
# Validate URL
is_valid, message = validate_youtube_url(video_url)
if not is_valid:
return {
"success": False,
"error": f"Invalid URL: {message}",
"url": video_url,
}
try:
self._close_previous_browser()
system = self.get_system()
process = None
if system == self.WSL:
# WSL requires special handling to open Windows browsers
process = self._open_in_app_mode_wsl(video_url)
elif system == self.WINDOWS:
# Windows: use Chrome/Edge app-mode with isolated profile
process = self._open_in_app_mode_windows_or_wsl(video_url)
if process is None:
# Fallback to default opener
process = subprocess.Popen(
["cmd.exe", "/c", f"start {video_url}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
elif system == self.MACOS: # macOS
# App mode on macOS using Chrome/Edge if available
process = self._open_in_app_mode_macos(video_url)
if process is None:
process = subprocess.Popen(["open", "-n", video_url])
else: # Linux
# App mode on Linux using Chrome/Chromium/Edge if available
process = self._open_in_app_mode_linux(video_url)
if process is None:
process = self._open_in_new_window_linux(video_url)
if process:
self.current_process_id = process.pid
return {
"success": True,
"message": BROWSER_SUCCESS_MESSAGE,
"system": system,
"url": video_url,
}
except (FileNotFoundError, subprocess.TimeoutExpired, ValueError, OSError) as e:
system = self.get_system()
return {
"success": False,
"error": str(e),
"system": system,
"url": video_url,
}
@staticmethod
def _open_in_new_window_linux(video_url: str) -> subprocess.Popen | None:
"""
Open URL in a new browser window on Linux.
Tries multiple browsers in order of preference to open in a new window.
Args:
video_url: URL to open
Returns:
Process object if successful, None otherwise
"""
browsers = [
["google-chrome", "--new-window", video_url],
["chromium", "--new-window", video_url],
["chromium-browser", "--new-window", video_url],
["firefox", "--new-window", video_url],
["firefox-esr", "--new-window", video_url],
["opera", "--new-window", video_url],
]
for cmd in browsers:
try:
return subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
except FileNotFoundError:
continue
# Si no encuentra navegadores específicos, usar xdg-open
try:
return subprocess.Popen(["xdg-open", video_url])
except FileNotFoundError:
# Última opción: usar webbrowser
webbrowser.open(video_url)
return None
def _get_or_create_profile_dir(self) -> str:
"""Create a temporary browser profile directory for isolation.
Always returns a clean temporary profile to avoid compatibility issues.
"""
system = self.get_system()
# Always create a clean temporary profile (simpler and more reliable)
profile = self._create_temp_profile_dir()
# For WSL, convert to Windows path if needed
if system == self.WSL and profile:
return self._linux_to_windows_path(profile)
return profile
def _create_temp_profile_dir(self) -> str:
"""Create a temporary browser profile directory for isolation.
For WSL, returns Windows path format. For other systems, returns native paths.
"""
# Cleanup previous if any
if self._temp_profile_dir and os.path.isdir(self._temp_profile_dir):
try:
shutil.rmtree(self._temp_profile_dir, ignore_errors=True)
except Exception:
pass
finally:
self._temp_profile_dir = None
system = self.get_system()
linux_path = None
# Try to create in user's home directory first (better permissions)
try:
home_dir = os.path.expanduser("~")
temp_base = os.path.join(home_dir, ".cache", "youtube_mcp_profiles")
os.makedirs(temp_base, exist_ok=True)
linux_path = tempfile.mkdtemp(prefix="profile_", dir=temp_base)
except (OSError, PermissionError, IOError):
pass
# Fallback to system temp directory
if not linux_path:
try:
linux_path = tempfile.mkdtemp(prefix="youtube_mcp_profile_")
except (OSError, PermissionError, IOError):
pass
# Last resort: use a relative path in current directory
if not linux_path:
try:
current_dir = os.getcwd()
temp_base = os.path.join(current_dir, ".youtube_mcp_profiles")
os.makedirs(temp_base, exist_ok=True)
linux_path = tempfile.mkdtemp(prefix="profile_", dir=temp_base)
except (OSError, PermissionError, IOError):
pass
# If all else fails, use a hardcoded profile name without temp directory
if not linux_path:
current_dir = os.getcwd()
linux_path = os.path.join(current_dir, "youtube_mcp_profile_temp")
os.makedirs(linux_path, exist_ok=True)
self._temp_profile_dir = linux_path
# For WSL, convert Linux path to Windows path for Windows executables
if system == self.WSL:
return self._linux_to_windows_path(linux_path)
return linux_path
@staticmethod
def _linux_to_windows_path(linux_path: str) -> str:
"""Convert a Linux path to Windows path format for use in WSL.
Uses wslpath command to convert paths. Falls back to manual conversion if needed.
"""
try:
# Try using wslpath to convert the path
result = subprocess.run(
["wslpath", "-w", linux_path],
capture_output=True,
text=True,
timeout=2,
check=False
)
if result.returncode == 0:
windows_path = result.stdout.strip()
if windows_path:
return windows_path
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Fallback: manual conversion for common paths
# /home/username -> C:\Users\username
if linux_path.startswith("/home/"):
username = linux_path.split("/")[2] if len(linux_path.split("/")) > 2 else "user"
rest = "/".join(linux_path.split("/")[3:])
windows_base = f"C:\\Users\\{username}"
if rest:
return f"{windows_base}\\{rest.replace('/', chr(92))}"
return windows_base
# /tmp -> C:\Temp or C:\Users\<user>\AppData\Local\Temp
if linux_path.startswith("/tmp"):
rest = linux_path[4:].lstrip("/")
if rest:
return f"C:\\Temp\\{rest.replace('/', chr(92))}"
return "C:\\Temp"
# Default: try to use home directory variant
home_dir = os.path.expanduser("~")
if linux_path.startswith(home_dir):
# Already in home, convert it
relative = linux_path[len(home_dir):].lstrip("/")
try:
username = os.getenv("USER", "user")
if relative:
return f"C:\\Users\\{username}\\{relative.replace('/', chr(92))}"
return f"C:\\Users\\{username}"
except Exception:
pass
# Last resort: return as-is (Windows might handle forward slashes)
return linux_path.replace("/", "\\")
def _open_in_app_mode_linux(self, video_url: str) -> subprocess.Popen | None:
"""Open URL using Chrome/Chromium/Edge app mode on Linux (no tabs, isolated)."""
profile_dir = self._create_temp_profile_dir()
candidates = [
"google-chrome",
"chromium",
"chromium-browser",
"microsoft-edge",
"microsoft-edge-stable",
]
for binname in candidates:
if shutil.which(binname):
args = [
binname,
f"--app={video_url}",
f"--user-data-dir={profile_dir}",
"--no-first-run",
"--no-default-browser-check",
"--disable-logging",
"--autoplay-policy=no-user-gesture-required",
]
try:
return subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except FileNotFoundError:
continue
return None
def _open_in_app_mode_macos(self, video_url: str) -> subprocess.Popen | None:
"""Open URL using Chrome/Edge app mode on macOS (no tabs, isolated)."""
profile_dir = self._create_temp_profile_dir()
apps = [
("Google Chrome", ["open", "-na", "Google Chrome", "--args"]),
("Microsoft Edge", ["open", "-na", "Microsoft Edge", "--args"]),
("Chromium", ["open", "-na", "Chromium", "--args"]),
]
args_suffix = [
f"--app={video_url}",
f"--user-data-dir={profile_dir}",
"--no-first-run",
"--no-default-browser-check",
"--disable-logging",
"--autoplay-policy=no-user-gesture-required",
]
for _, prefix in apps:
try:
return subprocess.Popen(
prefix + args_suffix,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except FileNotFoundError:
continue
# Firefox fallback with kiosk (no standard app mode)
try:
return subprocess.Popen(
["open", "-na", "Firefox", "--args", "--kiosk", video_url],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except FileNotFoundError:
return None
@staticmethod
def _get_default_browser() -> str | None:
"""Detect the default browser in Windows.
Returns the executable name of the default browser or None.
"""
try:
# Try to get default browser from Windows registry via PowerShell
ps_command = (
"Get-ItemProperty 'HKCU:\\Software\\Microsoft\\Windows\\CurrentVersion\\Explorer\\FileExts\\.html\\UserChoice' "
"-ErrorAction SilentlyContinue | Select-Object -ExpandProperty 'ProgId' | "
"ForEach-Object { if ($_ -match 'chrome') { 'chrome.exe' } elseif ($_ -match 'edge|msedge') { 'msedge.exe' } elseif ($_ -match 'firefox') { 'firefox.exe' } }"
)
result = subprocess.run(
["powershell.exe", "-NoProfile", "-Command", ps_command],
capture_output=True,
text=True,
timeout=2,
check=False
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except Exception:
pass
# Fallback: try to detect which browser is installed
browsers = ["chrome.exe", "msedge.exe", "firefox.exe"]
for browser in browsers:
try:
# Try to check if browser exists in PATH
check_cmd = f"Get-Command {browser.replace('.exe', '')} -ErrorAction SilentlyContinue"
result = subprocess.run(
["powershell.exe", "-NoProfile", "-Command", check_cmd],
capture_output=True,
text=True,
timeout=1,
check=False
)
if result.returncode == 0 and result.stdout.strip():
return browser
except Exception:
pass
return None
def _open_in_app_mode_wsl(self, video_url: str) -> subprocess.Popen | None:
"""
Open URL in browser from WSL (Windows Subsystem for Linux).
WSL requires special handling to launch Windows applications.
Uses the default browser with its default profile.
Args:
video_url: YouTube URL to open
Returns:
Process object or None if unable to launch
"""
# Get the default browser and only try that one
default_browser = self._get_default_browser()
# Try default browser first
if default_browser:
browsers_to_try = [default_browser]
else:
# Fallback list if no default detected
browsers_to_try = ["chrome.exe", "msedge.exe", "firefox.exe"]
for browser in browsers_to_try:
# Use --app mode but let it use the browser's default profile
# This avoids profile errors and complications
args_list = f"'--app={video_url}'"
ps_command = (
f"$proc = Start-Process '{browser}' -ArgumentList {args_list} "
f"-PassThru; $proc.Id"
)
try:
# Execute via PowerShell and capture the browser PID
result = subprocess.run(
["powershell.exe", "-NoProfile", "-Command", ps_command],
capture_output=True,
text=True,
timeout=5,
check=False
)
if result.returncode == 0 and result.stdout.strip():
try:
browser_pid = int(result.stdout.strip())
self.current_process_id = browser_pid
# Return a dummy Popen for consistency
return subprocess.Popen(
["cmd.exe"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=0x08000000
)
except (ValueError, AttributeError):
# If we had a default browser and it failed, don't try others
if default_browser:
return None
continue
except (FileNotFoundError, subprocess.TimeoutExpired):
# If we had a default browser and it failed, don't try others
if default_browser:
return None
continue
# If we had a default browser and it worked or failed, don't try others
if default_browser:
return None
# Last resort: try wslview if available (opens with system default browser)
try:
process = subprocess.Popen(
["wslview", video_url],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if process.pid:
self.current_process_id = process.pid
return process
except FileNotFoundError:
return None
def _open_in_app_mode_windows_or_wsl(self, video_url: str) -> subprocess.Popen | None:
"""Open URL using the default browser in app mode on Windows (no tabs, isolated).
Uses the browser's default profile to avoid profile errors.
"""
# Get the default browser and only try that one
default_browser = self._get_default_browser()
if default_browser:
browsers_to_try = [default_browser]
else:
# Fallback list if no default detected
browsers_to_try = ["chrome.exe", "msedge.exe", "firefox.exe"]
for browser in browsers_to_try:
# Use --app mode with default profile (simplest and most reliable)
args = [
browser,
f"--app={video_url}",
]
try:
process = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Store PID for later closure
if process.pid:
self.current_process_id = process.pid
return process
except FileNotFoundError:
# If we had a default browser and it failed, don't try others
if default_browser:
return None
continue
# If we had a default browser, don't try others
if default_browser:
return None
return None