"""Command execution service implementation."""
import asyncio
import logging
import re
from pathlib import Path
from typing import List, Optional
from ..utils.artifacts import artifact_path
logger = logging.getLogger(__name__)
class ExecutorService:
"""
Service for executing commands with sandboxing.
Runs commands locally using asyncio.subprocess with security controls.
"""
def __init__(self, log_dir: str):
"""
Initialize executor service.
Args:
log_dir: Base log directory for artifacts
"""
self.log_dir = log_dir
async def run(
self,
command: str,
args: List[str],
service_account: Optional[str],
namespace: Optional[str],
deny_patterns: List[str],
run_id: str,
) -> tuple[bool, int, str, str, str, str]:
"""
Execute a command with sandboxing.
Args:
command: Command to execute
args: Command arguments
service_account: Service account (for k8s context)
namespace: Namespace (for k8s context)
deny_patterns: Regex patterns to deny
run_id: Scenario run identifier
Returns:
Tuple of (success, exit_code, stdout, stderr, artifact_path, message)
"""
full_command = [command] + args
cmd_str = " ".join(full_command)
logger.info(f"Executing command: {cmd_str}")
# Check deny patterns
for pattern in deny_patterns:
if re.search(pattern, cmd_str):
message = f"Command denied by pattern: {pattern}"
logger.error(message)
return False, -1, "", "", "", message
try:
# Execute command
process = await asyncio.create_subprocess_exec(
*full_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_bytes, stderr_bytes = await process.communicate()
stdout = stdout_bytes.decode("utf-8", errors="replace")
stderr = stderr_bytes.decode("utf-8", errors="replace")
exit_code = process.returncode or 0
# Save outputs to artifacts
artifact_file = artifact_path(
self.log_dir, run_id, f"cmd_{command.replace('/', '_')}.txt"
)
with open(artifact_file, "w") as f:
f.write(f"Command: {cmd_str}\n")
f.write(f"Exit Code: {exit_code}\n")
f.write(f"\n--- STDOUT ---\n{stdout}\n")
f.write(f"\n--- STDERR ---\n{stderr}\n")
success = exit_code == 0
message = f"Command {'succeeded' if success else 'failed'} with exit code {exit_code}"
logger.info(f"Command result: {message}")
if stdout:
logger.debug(f"stdout: {stdout[:200]}")
if stderr:
logger.debug(f"stderr: {stderr[:200]}")
return success, exit_code, stdout, stderr, str(artifact_file), message
except Exception as e:
message = f"Command execution failed: {e}"
logger.error(message, exc_info=True)
return False, -1, "", str(e), "", message