track-itā¢12.5 kB
#!/usr/bin/env python3
"""
track-it-clean: Clean implementation that creates independent copies of stdout/stderr
without interfering with the tracked process's normal output behavior.
This version:
- Lets the tracked process output normally to stdout/stderr
- Independently captures copies to separate log files
- Does not require any environment variables in the tracked process
- Works with colored output, interactive programs, etc.
Usage:
track-it-clean python my_app.py --foo --bar
track-it-clean --id my-service python app.py
track-it-clean --dir /path/to/workdir ./script.sh
"""
import argparse
import os
import select
import signal
import subprocess
import sys
import threading
from datetime import datetime
from pathlib import Path
from process_registry import ProcessRegistry
def generate_process_id() -> str:
"""Generate a unique process ID based on timestamp."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
return f"proc_{timestamp}"
def get_log_paths(process_id: str) -> tuple[Path, Path, Path]:
"""Get the log file paths for a process (combined, stdout, stderr)."""
log_dir = Path(os.getenv("MCP_PROCESS_WRAPPER_LOG_DIR", "./process_logs"))
log_dir = log_dir.resolve() # Convert to absolute path
log_dir.mkdir(exist_ok=True)
return (
log_dir / f"{process_id}.log", # Combined log
log_dir / f"{process_id}.stdout.log", # Stdout only
log_dir / f"{process_id}.stderr.log", # Stderr only
)
def stream_copier(pipe, destinations, stop_event):
"""
Copy data from a pipe to multiple destinations in a thread.
Args:
pipe: File descriptor or file object to read from
destinations: List of file objects to write to
stop_event: Threading event to signal when to stop
"""
try:
while not stop_event.is_set():
# Use select to check if data is available
ready, _, _ = select.select([pipe], [], [], 0.1)
if ready:
try:
# Read available data
if hasattr(pipe, 'read'):
data = pipe.read(4096)
else:
data = os.read(pipe, 4096)
if not data:
break # EOF reached
# Decode if bytes
if isinstance(data, bytes):
text = data.decode('utf-8', errors='replace')
else:
text = data
# Write to all destinations
for dest in destinations:
dest.write(text)
dest.flush()
except (OSError, IOError):
# Pipe closed or error reading
break
except Exception as e:
print(f"[track-it] Stream copier error: {e}", file=sys.stderr)
def run_process(command, working_dir, log_combined, log_stdout, log_stderr, env=None):
"""
Run a process with independent stdout/stderr copying.
The process outputs normally to the console, and we create
independent copies in log files.
"""
# Start the process with separate pipes for stdout and stderr
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=sys.stdin,
cwd=working_dir,
env=env,
bufsize=0, # Unbuffered for real-time output
)
# Event to signal threads to stop
stop_event = threading.Event()
# Create threads to copy streams
stdout_thread = threading.Thread(
target=stream_copier,
args=(process.stdout, [sys.stdout, log_stdout, log_combined], stop_event),
daemon=True
)
stderr_thread = threading.Thread(
target=stream_copier,
args=(process.stderr, [sys.stderr, log_stderr, log_combined], stop_event),
daemon=True
)
# Start the threads
stdout_thread.start()
stderr_thread.start()
return process, stdout_thread, stderr_thread, stop_event
def parse_custom_args(argv):
"""
Parse arguments with support for environment variables.
Format: track-it [options] [ENV=value ...] [--] command [args...]
"""
process_id = None
working_dir = None
env_vars = {}
command = []
i = 0
# First, parse track-it options
while i < len(argv):
arg = argv[i]
if arg == '--id' and i + 1 < len(argv):
process_id = argv[i + 1]
i += 2
elif arg == '--dir' and i + 1 < len(argv):
working_dir = argv[i + 1]
i += 2
elif arg in ['-h', '--help']:
return None, None, None, None # Signal to show help
elif arg == '--':
# Everything after -- is the command
command = argv[i + 1:]
break
elif '=' in arg and not arg.startswith('-'):
# This looks like an environment variable
key, value = arg.split('=', 1)
if key and key.isidentifier(): # Valid env var name
env_vars[key] = value
i += 1
else:
# Not a valid env var, must be start of command
command = argv[i:]
break
elif arg.startswith('-'):
# Unknown option
print(f"Error: Unknown option: {arg}", file=sys.stderr)
sys.exit(1)
else:
# First non-option, non-env-var argument is the start of the command
command = argv[i:]
break
return process_id, working_dir, env_vars, command
def main():
# Custom argument parsing to support env vars
process_id, working_dir, env_vars, command = parse_custom_args(sys.argv[1:])
if process_id is None and working_dir is None and env_vars is None and command is None:
# Show help
print("""Usage: track-it [options] [ENV=value ...] [--] command [args...]
Track commands with independent stdout/stderr logging.
Options:
--id ID Custom process ID (auto-generated if not provided)
--dir DIR Working directory for the command
-h, --help Show this help message
Environment Variables:
You can set environment variables by specifying KEY=value pairs
before the command. Use -- to separate env vars from the command
if needed.
Examples:
track-it echo hello
track-it --id myapp python app.py
track-it PORT=8080 -- python server.py
track-it --id web PORT=3000 DEBUG=true -- npm start
""")
sys.exit(0)
if not command:
print("Error: No command specified", file=sys.stderr)
sys.exit(1)
# Setup
process_id = process_id or generate_process_id()
working_dir = working_dir or os.getcwd()
log_combined, log_stdout, log_stderr = get_log_paths(process_id)
command_str = " ".join(command)
# Initialize registry
registry = ProcessRegistry()
# Prepare environment
if env_vars:
# Merge with existing environment
env = os.environ.copy()
env.update(env_vars)
else:
env = None # Use parent's environment as-is
# Open all log files
with open(log_combined, "w", buffering=1) as log_comb, \
open(log_stdout, "w", buffering=1) as log_out, \
open(log_stderr, "w", buffering=1) as log_err:
# Build header with env vars if present
env_section = ""
if env_vars:
env_section = "Environment variables:\n"
for key, value in env_vars.items():
env_section += f" {key}={value}\n"
# Write headers to all logs
header = f"""{'='*60}
Process ID: {process_id}
Command: {command_str}
Working directory: {working_dir}
{env_section}Started at: {datetime.now().isoformat()}
{'='*60}
"""
for log in [log_comb, log_out, log_err]:
log.write(header)
log.flush()
print(f"[track-it] Process ID: {process_id}")
print(f"[track-it] Command: {command_str}")
print(f"[track-it] Working directory: {working_dir}")
if env_vars:
print(f"[track-it] Environment variables:")
for key, value in env_vars.items():
print(f" {key}={value}")
print(f"[track-it] Logs:")
print(f" Combined: {log_combined}")
print(f" Stdout: {log_stdout}")
print(f" Stderr: {log_stderr}")
print(f"[track-it] Starting process...\n")
try:
# Run the process with stream copying
process, stdout_thread, stderr_thread, stop_event = run_process(
command,
working_dir,
log_comb,
log_out,
log_err,
env
)
# Register process with all log files
registry.register_process(
process_id=process_id,
command=command_str,
pid=process.pid,
log_file=str(log_combined),
working_dir=working_dir,
stdout_log=str(log_stdout),
stderr_log=str(log_stderr),
)
# Handle signals gracefully
def signal_handler(signum, frame):
print(f"\n[track-it] Received signal {signum}, stopping process...")
stop_event.set()
process.terminate()
try:
exit_code = process.wait(timeout=5)
except subprocess.TimeoutExpired:
print("[track-it] Process didn't terminate, killing...")
process.kill()
exit_code = process.wait()
# Update registry with interrupted status
registry.update_process_status(
process_id=process_id,
status="failed",
exit_code=exit_code if exit_code is not None else -1,
)
# Write interruption footer to logs
footer = f"""
{'='*60}
Process interrupted by signal {signum}
Exit code: {exit_code if exit_code is not None else -1}
Completed at: {datetime.now().isoformat()}
{'='*60}
"""
for log in [log_comb, log_out, log_err]:
log.write(footer)
log.flush()
print(f"[track-it] Process terminated with exit code {exit_code}")
sys.exit(0) # Exit the track-it process
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Wait for process to complete
exit_code = process.wait()
# Signal threads to stop and wait for them
stop_event.set()
stdout_thread.join(timeout=1)
stderr_thread.join(timeout=1)
# Write footers
footer = f"""
{'='*60}
Process exited with code {exit_code}
Completed at: {datetime.now().isoformat()}
{'='*60}
"""
for log in [log_comb, log_out, log_err]:
log.write(footer)
# Update registry
status = "completed" if exit_code == 0 else "failed"
registry.update_process_status(
process_id=process_id,
status=status,
exit_code=exit_code,
)
print(f"\n[track-it] Process {status}")
print(f"[track-it] Exit code: {exit_code}")
sys.exit(exit_code)
except FileNotFoundError:
error_msg = f"Command not found: {args.command[0]}"
for log in [log_comb, log_out, log_err]:
log.write(f"\nERROR: {error_msg}\n")
print(f"[track-it] ERROR: {error_msg}", file=sys.stderr)
registry.update_process_status(
process_id=process_id,
status="failed",
exit_code=127,
)
sys.exit(127)
except Exception as e:
error_msg = f"Failed to execute command: {e}"
for log in [log_comb, log_out, log_err]:
log.write(f"\nERROR: {error_msg}\n")
print(f"[track-it] ERROR: {error_msg}", file=sys.stderr)
registry.update_process_status(
process_id=process_id,
status="failed",
exit_code=1,
)
sys.exit(1)
if __name__ == "__main__":
main()