"""Safe command execution utilities."""
import asyncio
import os
import shutil
import time
from pathlib import Path
from typing import List, Optional, Dict, Any
import logging
from ..models import ToolResult
logger = logging.getLogger(__name__)
class ToolExecutor:
"""Safely executes external security tools."""
def __init__(
self,
timeout: int = 300,
working_dir: Optional[Path] = None,
proxy_url: Optional[str] = None
):
"""Initialize executor.
Args:
timeout: Default timeout for command execution in seconds
working_dir: Working directory for command execution
proxy_url: Optional proxy URL (e.g., http://127.0.0.1:8080)
"""
self.timeout = timeout
self.working_dir = working_dir or Path.cwd()
self.proxy_url = proxy_url
self.installed_tools: Dict[str, Optional[str]] = {}
async def execute(
self,
tool_name: str,
args: List[str],
timeout: Optional[int] = None,
check_install: bool = True,
use_proxy: bool = False,
) -> ToolResult:
"""Execute a tool with arguments.
Args:
tool_name: Name of the tool to execute
args: List of command-line arguments
timeout: Timeout in seconds (uses default if not specified)
check_install: Whether to check if tool is installed first
use_proxy: Whether to route traffic through configured proxy
Returns:
ToolResult with execution details
"""
start_time = time.time()
# Check if tool is installed
if check_install:
tool_path = await self.check_tool_installed(tool_name)
if not tool_path:
return ToolResult(
success=False,
tool_name=tool_name,
output="",
errors=[f"Tool '{tool_name}' is not installed or not found in PATH"],
execution_time=time.time() - start_time
)
# Add proxy arguments if enabled
if use_proxy and self.proxy_url:
args = self._add_proxy_args(tool_name, args, self.proxy_url)
# Build command
command = [tool_name] + args
timeout_value = timeout or self.timeout
logger.info(f"Executing: {' '.join(command)}")
try:
# Set environment variables for proxy if needed
env = os.environ.copy() if use_proxy and self.proxy_url else None
if env and use_proxy and self.proxy_url:
env['HTTP_PROXY'] = self.proxy_url
env['HTTPS_PROXY'] = self.proxy_url
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self.working_dir),
env=env
)
# Wait for completion with timeout
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout_value
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
return ToolResult(
success=False,
tool_name=tool_name,
output="",
errors=[f"Command timed out after {timeout_value} seconds"],
execution_time=time.time() - start_time
)
stdout_str = stdout.decode('utf-8', errors='ignore')
stderr_str = stderr.decode('utf-8', errors='ignore')
execution_time = time.time() - start_time
success = process.returncode == 0
result = ToolResult(
success=success,
tool_name=tool_name,
output=stdout_str,
errors=[stderr_str] if stderr_str and not success else [],
warnings=[stderr_str] if stderr_str and success else [],
execution_time=execution_time
)
logger.info(f"Execution completed in {execution_time:.2f}s with return code {process.returncode}")
return result
except FileNotFoundError:
return ToolResult(
success=False,
tool_name=tool_name,
output="",
errors=[f"Tool '{tool_name}' not found"],
execution_time=time.time() - start_time
)
except Exception as e:
logger.error(f"Error executing {tool_name}: {str(e)}")
return ToolResult(
success=False,
tool_name=tool_name,
output="",
errors=[f"Execution error: {str(e)}"],
execution_time=time.time() - start_time
)
async def check_tool_installed(self, tool_name: str) -> Optional[str]:
"""Check if a tool is installed and available.
Args:
tool_name: Name of the tool
Returns:
Path to tool if found, None otherwise
"""
# Check cache first
if tool_name in self.installed_tools:
return self.installed_tools[tool_name]
# Use shutil.which to find tool
tool_path = shutil.which(tool_name)
self.installed_tools[tool_name] = tool_path
return tool_path
async def get_tool_version(self, tool_name: str, version_flag: str = "--version") -> Optional[str]:
"""Get version of an installed tool.
Args:
tool_name: Name of the tool
version_flag: Flag to get version (default: --version)
Returns:
Version string if successful, None otherwise
"""
result = await self.execute(tool_name, [version_flag], timeout=10)
if result.success:
return result.output.strip()
return None
def get_installed_tools_status(self, tools: List[str]) -> Dict[str, bool]:
"""Check installation status of multiple tools.
Args:
tools: List of tool names to check
Returns:
Dictionary mapping tool names to installation status
"""
status = {}
for tool in tools:
status[tool] = shutil.which(tool) is not None
return status
async def validate_command_safety(self, command: List[str]) -> tuple[bool, str]:
"""Validate that a command is safe to execute.
Args:
command: Command and arguments
Returns:
Tuple of (is_safe, reason)
"""
# List of dangerous commands/patterns
dangerous_patterns = [
'rm -rf',
'dd if=',
'mkfs',
':(){ :|:& };:', # Fork bomb
'chmod 777',
'chown',
'/dev/sda',
'/dev/null',
'> /dev/sda',
]
command_str = ' '.join(command)
# Check for dangerous patterns
for pattern in dangerous_patterns:
if pattern in command_str:
return False, f"Command contains dangerous pattern: {pattern}"
# Check for shell injection attempts
shell_chars = ['&&', '||', ';', '|', '`', '$()']
for char in shell_chars:
if char in command_str:
return False, f"Command contains potentially dangerous shell character: {char}"
return True, "Command appears safe"
def _add_proxy_args(self, tool_name: str, args: List[str], proxy_url: str) -> List[str]:
"""Add proxy arguments to tool command based on tool type.
Args:
tool_name: Name of the tool
args: Current arguments list
proxy_url: Proxy URL to use
Returns:
Updated arguments list with proxy settings
"""
# Tool-specific proxy argument patterns
proxy_args_map = {
'nuclei': ['-proxy', proxy_url],
'ffuf': ['-x', proxy_url],
'sqlmap': ['--proxy', proxy_url],
'httpx': ['-http-proxy', proxy_url],
'dalfox': ['--proxy', proxy_url],
'nmap': ['--proxies', proxy_url],
}
if tool_name in proxy_args_map:
# Add proxy args at the beginning
return proxy_args_map[tool_name] + args
# For tools that use environment variables only
return args