Skip to main content
Glama
nazufel
by nazufel
console_monitor.py36.4 kB
""" Console monitor for capturing Xcode debug output and runtime logs. """ import subprocess import re import threading import queue from typing import List, Optional, Callable, NamedTuple, Dict from datetime import datetime import time class ConsoleLog(NamedTuple): """Represents a console log entry.""" timestamp: datetime level: str # 'debug', 'info', 'warning', 'error', 'fault' subsystem: str category: str message: str process: str class XcodeConsoleMonitor: """Monitors Xcode console output and debug logs using macOS log system.""" def __init__(self, log_file_path: Optional[str] = None): self.is_monitoring = False self.log_queue = queue.Queue() self.monitor_thread = None self.callbacks = [] self.log_file_path = log_file_path self.log_file = None # Patterns for parsing log entries # Updated pattern to match actual macOS log format with timezone and localhost self.log_pattern = re.compile( r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+[-+]\d{4})\s+' r'(?P<hostname>\w+)\s+' r'(?P<process>[\w\-\.]+)\[(?P<pid>\d+)\]:\s+' r'(?:\((?P<framework>[\w\.]+)\)\s+)?' r'(?:\[(?P<subsystem>[\w\.]+):(?P<category>[\w\.]+)\]\s+)?' r'(?P<message>.*)' ) # Xcode-related process names to monitor self.xcode_processes = [ 'Xcode', 'xcodebuild', 'xctest', 'Simulator', 'iOS Simulator', 'iPhone Simulator', 'iPad Simulator', 'swift', 'clang', 'ld', 'Metal', 'ibtool', 'actool', 'assetutil', 'debugserver', # Debug server for device debugging 'lldb', # LLDB debugger 'DTDeviceServiceBase', # Device service 'DTServiceHub', # Device service hub 'com.apple.dt.DeviceKit', # Device kit 'com.apple.dt.IDE.IDEiOSSupportCore' # iOS support core ] # Patterns for build errors and warnings self.build_error_patterns = [ r'error:', r'warning:', r'note:', r'BUILD FAILED', r'Compile Swift', r'CompileC', r'Ld ', r'CodeSign', r'PhaseScriptExecution' ] def start_monitoring(self, app_bundle_id: Optional[str] = None, include_devices: bool = True): """Start monitoring console logs. Args: app_bundle_id: Optional app bundle ID to filter logs include_devices: Whether to include logs from connected devices """ if self.is_monitoring: print("Already monitoring logs...") return device_info = " (including connected devices)" if include_devices else "" print(f"Starting console monitoring{f' for bundle ID: {app_bundle_id}' if app_bundle_id else ''}{device_info}...") self.is_monitoring = True self.monitor_thread = threading.Thread( target=self._monitor_logs, args=(app_bundle_id, include_devices), daemon=True ) self.monitor_thread.start() print("Console monitoring started successfully.") def stop_monitoring(self): """Stop monitoring console logs.""" self.is_monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=2) def add_callback(self, callback: Callable[[ConsoleLog], None]): """Add a callback to be called when new logs are received.""" self.callbacks.append(callback) def get_recent_logs(self, count: int = 100) -> List[ConsoleLog]: """Get recent console logs.""" logs = [] try: while len(logs) < count and not self.log_queue.empty(): logs.append(self.log_queue.get_nowait()) except queue.Empty: pass return logs def _monitor_logs(self, app_bundle_id: Optional[str], include_devices: bool = True): """Monitor logs using macOS log command.""" # Build the log command cmd = ['log', 'stream', '--style', 'syslog', '--level', 'debug'] # Add predicate to filter for relevant processes predicates = [] # Filter by Xcode-related processes process_predicates = [f'process == "{proc}"' for proc in self.xcode_processes] if process_predicates: predicates.append(f'({" OR ".join(process_predicates)})') # Filter by app bundle ID if provided if app_bundle_id: # Include both subsystem and sender predicates for better device log capture app_predicates = [ f'subsystem == "{app_bundle_id}"', f'sender == "{app_bundle_id}"', f'processImagePath CONTAINS "{app_bundle_id}"' ] predicates.append(f'({" OR ".join(app_predicates)})') # Add device-specific filtering if enabled if include_devices and app_bundle_id: # This helps capture logs from apps running on connected devices predicates.append('eventType == logEvent OR eventType == traceEvent') if predicates: predicate_str = ' AND '.join(predicates) if len(predicates) > 1 else predicates[0] cmd.extend(['--predicate', predicate_str]) print(f"Using predicate: {predicate_str}") else: print("No predicate specified - monitoring all processes") print(f"Running command: {' '.join(cmd)}") try: # Start the log streaming process process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) print("Log streaming process started, waiting for logs...") # Read logs line by line line_count = 0 while self.is_monitoring and process.poll() is None: line = process.stdout.readline() if line: line = line.strip() if line and not line.startswith("Filtering"): line_count += 1 if line_count <= 3: # Show first few lines for debugging print(f"Raw log line {line_count}: {line}") log_entry = self._parse_log_line(line) if log_entry: if line_count <= 3: print(f"Parsed log entry: {log_entry}") # Add to queue self.log_queue.put(log_entry) # Call callbacks for callback in self.callbacks: try: callback(log_entry) except Exception as e: print(f"Error in log callback: {e}") elif line_count <= 3: print(f"Failed to parse line: {line}") time.sleep(0.01) # Small delay to prevent CPU spinning print(f"Stopping log monitoring. Processed {line_count} lines.") # Clean up process.terminate() process.wait(timeout=5) except Exception as e: print(f"Error monitoring logs: {e}") def _parse_log_line(self, line: str) -> Optional[ConsoleLog]: """Parse a single log line.""" match = self.log_pattern.match(line) if not match: return None try: timestamp_str = match.group('timestamp') # Handle timezone in timestamp (remove timezone for parsing) timestamp_base = timestamp_str[:-5] # Remove timezone part like "-0400" timestamp = datetime.strptime(timestamp_base, '%Y-%m-%d %H:%M:%S.%f') # Determine log level from context (macOS logs don't have explicit levels in syslog style) # We'll infer the level from the message content or default to 'info' level = 'info' # Default level message = match.group('message') # Try to infer level from message content message_lower = message.lower() if any(word in message_lower for word in ['error', 'failed', 'exception', 'crash']): level = 'error' elif any(word in message_lower for word in ['warning', 'warn']): level = 'warning' elif any(word in message_lower for word in ['debug', 'trace']): level = 'debug' return ConsoleLog( timestamp=timestamp, level=level, subsystem=match.group('subsystem') or match.group('framework') or '', category=match.group('category') or '', message=message, process=match.group('process') ) except Exception as e: print(f"Error parsing log line '{line}': {e}") return None def get_error_logs(self, since_minutes: int = 10) -> List[ConsoleLog]: """Get error logs from the last N minutes.""" from datetime import timedelta cutoff_time = datetime.now() - timedelta(minutes=since_minutes) error_logs = [] logs = self.get_recent_logs(1000) # Get more logs to search through for log in logs: if (log.timestamp >= cutoff_time and log.level in ['error', 'fault'] and any(proc in log.process for proc in self.xcode_processes)): error_logs.append(log) return error_logs def get_debug_logs(self, filter_text: Optional[str] = None, since_minutes: int = 5) -> List[ConsoleLog]: """Get debug logs, optionally filtered by text.""" from datetime import timedelta cutoff_time = datetime.now() - timedelta(minutes=since_minutes) debug_logs = [] logs = self.get_recent_logs(1000) for log in logs: if log.timestamp >= cutoff_time: if filter_text and filter_text.lower() not in log.message.lower(): continue debug_logs.append(log) return debug_logs def get_build_errors(self, since_minutes: int = 10) -> List[ConsoleLog]: """Get build-related error logs from the last N minutes.""" from datetime import timedelta cutoff_time = datetime.now() - timedelta(minutes=since_minutes) build_errors = [] logs = self.get_recent_logs(1000) for log in logs: if log.timestamp >= cutoff_time: # Check if it's a build-related process is_build_process = any(proc in log.process for proc in self.xcode_processes) # Check if message contains build error patterns is_build_error = any( re.search(pattern, log.message, re.IGNORECASE) for pattern in self.build_error_patterns ) if is_build_process and (is_build_error or log.level in ['error', 'fault']): build_errors.append(log) return build_errors def start_build_monitoring(self, project_name: Optional[str] = None): """Start monitoring specifically for build errors and warnings.""" print(f"Starting build monitoring{f' for project: {project_name}' if project_name else ''}...") # Use a more targeted predicate for build monitoring predicates = [] # Monitor Xcode and build tools build_processes = ['Xcode', 'xcodebuild', 'swift', 'clang', 'ld'] process_predicates = [f'process == "{proc}"' for proc in build_processes] if process_predicates: predicates.append(f'({" OR ".join(process_predicates)})') # Add project-specific filtering if provided if project_name: predicates.append(f'eventMessage CONTAINS "{project_name}"') # Monitor for build-related events predicates.append('eventType == logEvent') if predicates: predicate_str = ' AND '.join(predicates) print(f"Build monitoring predicate: {predicate_str}") # Start monitoring with the build-specific predicate self._start_build_log_monitoring(predicate_str) else: print("No build monitoring predicate - monitoring all processes") self.start_monitoring() def _start_build_log_monitoring(self, predicate: str): """Start monitoring with a specific predicate for build logs.""" if self.is_monitoring: print("Already monitoring logs...") return self.is_monitoring = True # Build the log command with the predicate cmd = ['log', 'stream', '--style', 'syslog', '--level', 'debug', '--predicate', predicate] print(f"Running build monitoring command: {' '.join(cmd)}") self.monitor_thread = threading.Thread( target=self._monitor_build_logs, args=(cmd,), daemon=True ) self.monitor_thread.start() print("Build monitoring started successfully.") def _monitor_build_logs(self, cmd): """Monitor build logs using the specified command.""" try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) print("Build log streaming process started...") line_count = 0 while self.is_monitoring and process.poll() is None: line = process.stdout.readline() if line: line = line.strip() if line and not line.startswith("Filtering"): line_count += 1 log_entry = self._parse_log_line(line) if log_entry: # Add to queue self.log_queue.put(log_entry) # Call callbacks for callback in self.callbacks: try: callback(log_entry) except Exception as e: print(f"Error in build log callback: {e}") time.sleep(0.01) print(f"Build monitoring stopped. Processed {line_count} lines.") process.terminate() process.wait(timeout=5) except Exception as e: print(f"Error in build log monitoring: {e}") def get_connected_devices(self) -> List[Dict[str, str]]: """Get list of connected iOS devices including simulators and physical devices.""" devices = [] # Get iOS Simulator devices try: result = subprocess.run( ['xcrun', 'simctl', 'list', 'devices', '--json'], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: import json try: data = json.loads(result.stdout) devices_data = data.get('devices', {}) for runtime, device_list in devices_data.items(): if 'iOS' in runtime or 'iPadOS' in runtime: for device in device_list: devices.append({ 'name': device.get('name', 'Unknown Device'), 'udid': device.get('udid', ''), 'state': device.get('state', 'Unknown'), 'type': 'simulator', 'runtime': runtime, 'product_id': device.get('productType', '') }) except json.JSONDecodeError: pass except (subprocess.TimeoutExpired, subprocess.SubprocessError): pass # Get connected physical devices using xcrun devicectl (preferred method) try: result = subprocess.run( ['xcrun', 'devicectl', 'list', 'devices'], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: lines = result.stdout.strip().split('\n') if len(lines) > 1: # Skip header line for line in lines[1:]: # Skip header parts = line.split() if len(parts) >= 5: name = parts[0] state = parts[3] model = ' '.join(parts[4:]) if len(parts) > 4 else 'Unknown Model' # Extract device ID from the line (it's in the third column) device_id = parts[2] if len(parts) > 2 else '' # Check if it's an iOS device if ('iPhone' in model or 'iPad' in model or 'iPod' in model or 'iPhone' in name or 'iPad' in name or 'iPod' in name): devices.append({ 'name': name, 'udid': device_id, 'state': state, 'type': 'physical_device', 'runtime': 'Physical Device', 'product_id': model }) except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError): pass # Fallback: Get connected physical devices using system_profiler try: result = subprocess.run( ['system_profiler', 'SPUSBDataType', '-json'], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: import json try: data = json.loads(result.stdout) usb_data = data.get('SPUSBDataType', []) def find_ios_devices(items): ios_devices = [] for item in items: # Look for iOS devices if 'vendor_id' in item and item.get('vendor_id') == '0x05ac': # Apple vendor ID product_id = item.get('product_id', '') device_name = item.get('_name', 'Unknown iOS Device') # Common iOS device product IDs and names ios_indicators = [ '0x12a8', '0x12ab', '0x1281', '0x1227', '0x1290', '0x1291', '0x1292', '0x1293', '0x12a9', '0x12aa', '0x12ac', '0x12ad', '0x12ae', '0x12af', '0x12b0', '0x12b1' ] if (any(pid in product_id for pid in ios_indicators) or 'iPhone' in device_name or 'iPad' in device_name or 'iPod' in device_name): ios_devices.append({ 'name': device_name, 'product_id': product_id, 'type': 'physical_device', 'state': 'connected', 'udid': '', # Physical devices don't have UDID in USB data 'runtime': 'Physical Device' }) # Recursively search in sub-items if '_items' in item: ios_devices.extend(find_ios_devices(item['_items'])) return ios_devices devices.extend(find_ios_devices(usb_data)) except json.JSONDecodeError: pass except (subprocess.TimeoutExpired, subprocess.SubprocessError): pass return devices def get_device_logs(self, device_udid: str, count: int = 100, since_minutes: int = 10) -> List[ConsoleLog]: """Get logs from a specific iOS device (simulator or physical).""" logs = [] try: # Use xcrun simctl spawn to get device logs cmd = [ 'xcrun', 'simctl', 'spawn', device_udid, 'log', 'show', '--last', f'{since_minutes}m', '--style', 'syslog' ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=30 ) if result.returncode == 0: for line in result.stdout.split('\n'): line = line.strip() if line: log_entry = self._parse_log_line(line) if log_entry: logs.append(log_entry) except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError) as e: print(f"Error getting device logs for {device_udid}: {e}") return logs[-count:] if logs else [] def get_device_debug_logs(self, device_udid: str, app_bundle_id: str = None, count: int = 100) -> List[ConsoleLog]: """Get debug logs from a specific device, optionally filtered by app bundle ID.""" logs = [] try: # Build command for device-specific log streaming cmd = ['xcrun', 'simctl', 'spawn', device_udid, 'log', 'stream', '--style', 'syslog', '--level', 'debug'] # Add app-specific filtering if provided if app_bundle_id: cmd.extend(['--predicate', f'subsystem == "{app_bundle_id}" OR sender == "{app_bundle_id}"']) # Start the process and collect logs for a short time process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) # Collect logs for a few seconds import time start_time = time.time() while time.time() - start_time < 5: # Collect for 5 seconds line = process.stdout.readline() if line: line = line.strip() if line: log_entry = self._parse_log_line(line) if log_entry: logs.append(log_entry) if len(logs) >= count: break time.sleep(0.01) process.terminate() process.wait(timeout=2) except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError) as e: print(f"Error getting device debug logs for {device_udid}: {e}") return logs[-count:] if logs else [] def get_device_debug_logs_from_xcode(self, device_name: str = None, app_bundle_id: str = None, count: int = 100) -> List[ConsoleLog]: """Get debug logs from connected devices that are being debugged through Xcode.""" logs = [] try: # Use log command to get recent Xcode debug logs cmd = ['log', 'show', '--last', '10m', '--style', 'syslog', '--level', 'debug'] # Build predicate for Xcode debug processes and device-related logs predicates = [] # Monitor Xcode debug processes debug_processes = [ 'Xcode', 'debugserver', 'lldb', 'DTDeviceServiceBase', 'DTServiceHub', 'com.apple.dt.DeviceKit', 'com.apple.dt.IDE.IDEiOSSupportCore' ] process_predicates = [f'process == "{proc}"' for proc in debug_processes] if process_predicates: predicates.append(f'({" OR ".join(process_predicates)})') # Add device-specific filtering if provided if device_name: predicates.append(f'eventMessage CONTAINS "{device_name}"') # Add app-specific filtering if provided if app_bundle_id: predicates.append(f'(eventMessage CONTAINS "{app_bundle_id}" OR subsystem == "{app_bundle_id}" OR sender == "{app_bundle_id}")') # Add patterns for device debug logs (use OR instead of AND for better matching) debug_patterns = [ 'device', 'debug', 'console', 'log', 'print', 'NSLog', 'os_log', 'debugger', 'breakpoint', 'exception', 'crash' ] if debug_patterns: pattern_predicates = [f'eventMessage CONTAINS "{pattern}"' for pattern in debug_patterns] predicates.append(f'({" OR ".join(pattern_predicates)})') if predicates: predicate_str = ' AND '.join(predicates) if len(predicates) > 1 else predicates[0] cmd.extend(['--predicate', predicate_str]) print(f"Running debug log command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: for line in result.stdout.split('\n'): line = line.strip() if line: log_entry = self._parse_log_line(line) if log_entry: logs.append(log_entry) except (subprocess.TimeoutExpired, subprocess.SubprocessError, FileNotFoundError) as e: print(f"Error getting device debug logs from Xcode: {e}") return logs[-count:] if logs else [] def start_device_debug_monitoring(self, device_name: str = None, app_bundle_id: str = None): """Start monitoring debug logs from connected devices through Xcode.""" if self.is_monitoring: print("Already monitoring logs...") return print(f"Starting device debug monitoring{f' for device: {device_name}' if device_name else ''}") if app_bundle_id: print(f"Filtering for app: {app_bundle_id}") self.is_monitoring = True # Build command for Xcode debug log monitoring cmd = ['log', 'stream', '--style', 'syslog', '--level', 'debug'] # Build predicate for Xcode debug processes predicates = [] # Monitor Xcode debug processes debug_processes = [ 'Xcode', 'debugserver', 'lldb', 'DTDeviceServiceBase', 'DTServiceHub', 'com.apple.dt.DeviceKit', 'com.apple.dt.IDE.IDEiOSSupportCore' ] process_predicates = [f'process == "{proc}"' for proc in debug_processes] if process_predicates: predicates.append(f'({" OR ".join(process_predicates)})') # Add device-specific filtering if provided if device_name: predicates.append(f'eventMessage CONTAINS "{device_name}"') # Add app-specific filtering if provided if app_bundle_id: predicates.append(f'(eventMessage CONTAINS "{app_bundle_id}" OR subsystem == "{app_bundle_id}" OR sender == "{app_bundle_id}")') # Add patterns for device debug logs (use OR instead of AND for better matching) debug_patterns = [ 'device', 'debug', 'console', 'log', 'print', 'NSLog', 'os_log', 'debugger', 'breakpoint', 'exception', 'crash' ] if debug_patterns: pattern_predicates = [f'eventMessage CONTAINS "{pattern}"' for pattern in debug_patterns] predicates.append(f'({" OR ".join(pattern_predicates)})') if predicates: predicate_str = ' AND '.join(predicates) if len(predicates) > 1 else predicates[0] cmd.extend(['--predicate', predicate_str]) print(f"Running debug monitoring command: {' '.join(cmd)}") self.monitor_thread = threading.Thread( target=self._monitor_device_debug_logs, args=(cmd,), daemon=True ) self.monitor_thread.start() print("Device debug monitoring started successfully.") def _monitor_device_debug_logs(self, cmd): """Monitor device debug logs using the specified command.""" try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) print("Device debug log streaming process started...") line_count = 0 while self.is_monitoring and process.poll() is None: line = process.stdout.readline() if line: line = line.strip() if line and not line.startswith("Filtering"): line_count += 1 log_entry = self._parse_log_line(line) if log_entry: # Add to queue self.log_queue.put(log_entry) # Call callbacks for callback in self.callbacks: try: callback(log_entry) except Exception as e: print(f"Error in device debug log callback: {e}") time.sleep(0.01) print(f"Device debug monitoring stopped. Processed {line_count} lines.") process.terminate() process.wait(timeout=5) except Exception as e: print(f"Error in device debug log monitoring: {e}") def start_device_monitoring(self, device_udid: str, app_bundle_id: str = None): """Start monitoring logs from a specific device.""" if self.is_monitoring: print("Already monitoring logs...") return print(f"Starting device log monitoring for device: {device_udid}") if app_bundle_id: print(f"Filtering for app: {app_bundle_id}") self.is_monitoring = True # Build device-specific monitoring command cmd = ['xcrun', 'simctl', 'spawn', device_udid, 'log', 'stream', '--style', 'syslog', '--level', 'debug'] if app_bundle_id: cmd.extend(['--predicate', f'subsystem == "{app_bundle_id}" OR sender == "{app_bundle_id}"']) self.monitor_thread = threading.Thread( target=self._monitor_device_logs, args=(cmd,), daemon=True ) self.monitor_thread.start() print("Device log monitoring started successfully.") def _monitor_device_logs(self, cmd): """Monitor device logs using the specified command.""" try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) print("Device log streaming process started...") line_count = 0 while self.is_monitoring and process.poll() is None: line = process.stdout.readline() if line: line = line.strip() if line and not line.startswith("Filtering"): line_count += 1 log_entry = self._parse_log_line(line) if log_entry: # Add to queue self.log_queue.put(log_entry) # Call callbacks for callback in self.callbacks: try: callback(log_entry) except Exception as e: print(f"Error in device log callback: {e}") time.sleep(0.01) print(f"Device monitoring stopped. Processed {line_count} lines.") process.terminate() process.wait(timeout=5) except Exception as e: print(f"Error in device log monitoring: {e}") def save_logs_to_file(self, logs: List[ConsoleLog], file_path: str): """Save console logs to a file.""" try: with open(file_path, 'w', encoding='utf-8') as f: f.write(f"# Xcode Device Console Logs - {datetime.now().isoformat()}\n\n") for log in logs: timestamp = log.timestamp.isoformat() f.write(f"[{timestamp}] {log.level.upper()} {log.process}: {log.message}\n") if log.subsystem: f.write(f" Subsystem: {log.subsystem}\n") if log.category: f.write(f" Category: {log.category}\n") f.write("\n") print(f"Saved {len(logs)} logs to {file_path}") return True except Exception as e: print(f"Error saving logs to file: {e}") return False def main(): """Test the console monitor.""" monitor = XcodeConsoleMonitor() def log_callback(log: ConsoleLog): print(f"[{log.timestamp.strftime('%H:%M:%S')}] {log.level.upper()} " f"{log.process}: {log.message}") monitor.add_callback(log_callback) print("Starting console monitoring (press Ctrl+C to stop)...") monitor.start_monitoring() try: while True: time.sleep(1) except KeyboardInterrupt: print("\nStopping monitor...") monitor.stop_monitoring() if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nazufel/xcode-errors-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server