command_executor.py.backupā¢17.8 kB
"""
Command execution utilities for PowerShell and CMD with support for long-running processes.
"""
import asyncio
import logging
import os
import platform
import signal
import subprocess
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from queue import Queue, Empty
from typing import Dict, Any, Optional, Callable, List, Tuple, Union
# Import utility functions from the current package
from . import create_temp_file, safe_cleanup_file, validate_directory, get_execution_result
logger = logging.getLogger(__name__)
@dataclass
class ProcessOutput:
"""Container for process output data."""
stdout: str = ""
stderr: str = ""
exit_code: Optional[int] = None
error: Optional[str] = None
execution_time: float = 0.0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
'success': self.exit_code == 0 and not self.error,
'stdout': self.stdout,
'stderr': self.stderr,
'exit_code': self.exit_code or 0, # Ensure exit_code is not None
'error': self.error,
'execution_time': self.execution_time,
'command': '', # Will be filled in by the caller if needed
'working_directory': '' # Will be filled in by the caller if needed
}
class CommandExecutor:
"""Handles command execution with reliable output capture and streaming support."""
# Default timeouts in seconds
DEFAULT_TIMEOUT = 3600 # 1 hour default timeout
HEARTBEAT_INTERVAL = 30 # Send heartbeat every 30 seconds
@classmethod
def _enqueue_output(cls, stream, queue):
"""Helper to read stream lines into a queue."""
try:
for line in iter(stream.readline, b''):
if line:
queue.put(line)
except Exception as e:
logger.warning(f"Error reading from stream: {e}")
finally:
stream.close()
@classmethod
def _process_output(
cls,
process: subprocess.Popen,
output_callback: Optional[Callable[[str, str], None]] = None,
timeout_seconds: int = DEFAULT_TIMEOUT,
max_output_size: Optional[int] = None
) -> ProcessOutput:
"""Process output from a running process with streaming support.
Args:
process: The running subprocess
output_callback: Optional callback function that receives (stream_type, data)
timeout_seconds: Maximum execution time in seconds
max_output_size: Maximum output size in bytes
Returns:
ProcessOutput containing the command results
"""
output = ProcessOutput()
start_time = time.time()
last_heartbeat = time.time()
# Create queues for stdout/stderr
stdout_queue = Queue()
stderr_queue = Queue()
# Start threads to read output
stdout_thread = threading.Thread(
target=cls._enqueue_output,
args=(process.stdout, stdout_queue)
)
stderr_thread = threading.Thread(
target=cls._enqueue_output,
args=(process.stderr, stderr_queue)
)
# Set threads as daemon so they'll exit when the main thread exits
stdout_thread.daemon = True
stderr_thread.daemon = True
stdout_thread.start()
stderr_thread.start()
try:
while True:
# Check if process has finished
if process.poll() is not None:
output.exit_code = process.returncode
break
# Check for timeout
elapsed = time.time() - start_time
if elapsed > timeout_seconds:
logger.warning(f"Command timed out after {timeout_seconds} seconds")
output.error = f"Command timed out after {timeout_seconds} seconds"
output.exit_code = -1
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
break
# Check for new output
got_output = False
# Process stdout
try:
while True:
line = stdout_queue.get_nowait()
line = line.decode('utf-8', errors='replace').rstrip()
output.stdout += line + '\n'
if output_callback:
output_callback('stdout', line)
got_output = True
except Empty:
pass
# Process stderr
try:
while True:
line = stderr_queue.get_nowait()
line = line.decode('utf-8', errors='replace').rstrip()
output.stderr += line + '\n'
if output_callback:
output_callback('stderr', line)
got_output = True
except Empty:
pass
# Send heartbeat if needed
current_time = time.time()
if current_time - last_heartbeat >= cls.HEARTBEAT_INTERVAL:
if output_callback:
output_callback('heartbeat', f'Still running... ({elapsed:.1f}s)')
last_heartbeat = current_time
# If we didn't get any output, sleep briefly to avoid busy-waiting
if not got_output:
time.sleep(0.1)
# Check output size limits
if max_output_size and len(output.stdout) + len(output.stderr) > max_output_size:
output.error = "Output size limit exceeded"
output.exit_code = -1
process.terminate()
break
# Ensure threads are done
stdout_thread.join(timeout=1)
stderr_thread.join(timeout=1)
# Get any remaining output
try:
remaining_stdout = process.stdout.read().decode('utf-8', errors='replace')
remaining_stderr = process.stderr.read().decode('utf-8', errors='replace')
output.stdout += remaining_stdout
output.stderr += remaining_stderr
except Exception as e:
logger.warning(f"Error reading final output: {e}")
# Set final execution time
output.execution_time = time.time() - start_time
return output
except Exception as e:
logger.error(f"Error processing output: {e}", exc_info=True)
output.error = str(e)
output.exit_code = -1
return output
finally:
# Ensure process is terminated
try:
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
except Exception as e:
logger.warning(f"Error terminating process: {e}")
@classmethod
def execute_powershell(
cls,
command: str,
working_directory: Optional[str] = None,
timeout_seconds: int = None,
capture_output: bool = True,
output_encoding: str = 'utf-8',
max_output_size: Optional[int] = 10 * 1024 * 1024, # 10MB default
output_callback: Optional[Callable[[str, str], None]] = None,
**kwargs
) -> Dict[str, Any]:
"""
Execute PowerShell commands with streaming output support.
Args:
command: The PowerShell command to execute
working_directory: Working directory for the command
timeout_seconds: Maximum execution time in seconds (default: 1 hour)
capture_output: Whether to capture command output
output_encoding: Output encoding (default: utf-8)
max_output_size: Maximum output size in bytes (default: 10MB)
output_callback: Optional callback function that receives (stream_type, data)
**kwargs: Additional arguments (for backward compatibility)
Returns:
Dictionary containing command execution results
"""
start_time = time.time()
# Use default timeout if not specified
if timeout_seconds is None:
timeout_seconds = cls.DEFAULT_TIMEOUT
try:
logger.info(f"Executing PowerShell: {command[:100]}...")
# Validate and set working directory
current_dir = os.getcwd()
if working_directory:
dir_validation = validate_directory(working_directory)
if not dir_validation["valid"]:
return get_execution_result(
success=False,
command=command,
execution_time=time.time() - start_time,
error=dir_validation["error"]
)
current_dir = working_directory
if not capture_output:
# Execute without capturing output
process = subprocess.Popen(
["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command],
cwd=current_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if platform.system() == 'Windows' else 0
)
# Just wait for process to complete
try:
process.wait(timeout=timeout_seconds)
exit_code = process.returncode
except subprocess.TimeoutExpired:
process.terminate()
return get_execution_result(
success=False,
command=command,
execution_time=time.time() - start_time,
error=f"Command timed out after {timeout_seconds} seconds"
)
return get_execution_result(
success=exit_code == 0,
command=command,
exit_code=exit_code,
execution_time=time.time() - start_time,
working_directory=current_dir
)
# Execute with output capture and streaming
process = subprocess.Popen(
["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command],
cwd=current_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if platform.system() == 'Windows' else 0
)
# Process output with our streaming handler
output = cls._process_output(
process=process,
output_callback=output_callback,
timeout_seconds=timeout_seconds,
max_output_size=max_output_size
)
# Convert to the expected result format
result = output.to_dict()
result.update({
'command': command,
'working_directory': current_dir
})
return result
except Exception as e:
error_msg = f"PowerShell execution failed: {str(e)}"
logger.error(error_msg, exc_info=True)
return get_execution_result(
success=False,
command=command,
execution_time=time.time() - start_time,
error=error_msg
)
finally:
# Restore original working directory if it was changed
if working_directory and os.getcwd() != current_dir:
os.chdir(current_dir)
@classmethod
def execute_cmd(
cls,
command: str,
working_directory: Optional[str] = None,
timeout_seconds: int = None,
capture_output: bool = True,
output_encoding: str = 'utf-8',
max_output_size: Optional[int] = 10 * 1024 * 1024, # 10MB default
output_callback: Optional[Callable[[str, str], None]] = None,
**kwargs
) -> Dict[str, Any]:
"""
Execute CMD commands with streaming output support.
Args:
command: The command to execute
working_directory: Working directory for the command
timeout_seconds: Maximum execution time in seconds (default: 1 hour)
capture_output: Whether to capture command output
output_encoding: Output encoding (default: utf-8)
max_output_size: Maximum output size in bytes (default: 10MB)
output_callback: Optional callback function that receives (stream_type, data)
**kwargs: Additional arguments (for backward compatibility)
Returns:
Dictionary containing command execution results
"""
start_time = time.time()
# Use default timeout if not specified
if timeout_seconds is None:
timeout_seconds = cls.DEFAULT_TIMEOUT
try:
logger.info(f"Executing CMD: {command[:100]}...")
# Validate and set working directory
current_dir = os.getcwd()
if working_directory:
dir_validation = validate_directory(working_directory)
if not dir_validation["valid"]:
return get_execution_result(
success=False,
command=command,
execution_time=time.time() - start_time,
error=dir_validation["error"]
)
current_dir = working_directory
if not capture_output:
# Execute without capturing output
process = subprocess.Popen(
["cmd.exe", "/c", command],
cwd=current_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if platform.system() == 'Windows' else 0,
shell=True
)
# Just wait for process to complete
try:
process.wait(timeout=timeout_seconds)
exit_code = process.returncode
except subprocess.TimeoutExpired:
process.terminate()
return get_execution_result(
success=False,
command=command,
execution_time=time.time() - start_time,
error=f"Command timed out after {timeout_seconds} seconds"
)
return get_execution_result(
success=exit_code == 0,
command=command,
exit_code=exit_code,
execution_time=time.time() - start_time,
working_directory=current_dir
)
# Execute with output capture and streaming
process = subprocess.Popen(
["cmd.exe", "/c", command],
cwd=current_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if platform.system() == 'Windows' else 0,
shell=True
)
# Process output with our streaming handler
output = cls._process_output(
process=process,
output_callback=output_callback,
timeout_seconds=timeout_seconds,
max_output_size=max_output_size
)
# Convert to the expected result format
result = output.to_dict()
result.update({
'command': command,
'working_directory': current_dir
})
return result
except Exception as e:
error_msg = f"Command execution failed: {str(e)}"
logger.error(error_msg, exc_info=True)
return get_execution_result(
success=False,
command=command,
execution_time=time.time() - start_time,
error=error_msg
)
finally:
# Restore original working directory if it was changed
if working_directory and os.getcwd() != current_dir:
os.chdir(current_dir)