Skip to main content
Glama

Track-It Process Monitor

by scottmsilver
track-it•12.5 kB
#!/usr/bin/env python3 """ track-it-clean: Clean implementation that creates independent copies of stdout/stderr without interfering with the tracked process's normal output behavior. This version: - Lets the tracked process output normally to stdout/stderr - Independently captures copies to separate log files - Does not require any environment variables in the tracked process - Works with colored output, interactive programs, etc. Usage: track-it-clean python my_app.py --foo --bar track-it-clean --id my-service python app.py track-it-clean --dir /path/to/workdir ./script.sh """ import argparse import os import select import signal import subprocess import sys import threading from datetime import datetime from pathlib import Path from process_registry import ProcessRegistry def generate_process_id() -> str: """Generate a unique process ID based on timestamp.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") return f"proc_{timestamp}" def get_log_paths(process_id: str) -> tuple[Path, Path, Path]: """Get the log file paths for a process (combined, stdout, stderr).""" log_dir = Path(os.getenv("MCP_PROCESS_WRAPPER_LOG_DIR", "./process_logs")) log_dir = log_dir.resolve() # Convert to absolute path log_dir.mkdir(exist_ok=True) return ( log_dir / f"{process_id}.log", # Combined log log_dir / f"{process_id}.stdout.log", # Stdout only log_dir / f"{process_id}.stderr.log", # Stderr only ) def stream_copier(pipe, destinations, stop_event): """ Copy data from a pipe to multiple destinations in a thread. Args: pipe: File descriptor or file object to read from destinations: List of file objects to write to stop_event: Threading event to signal when to stop """ try: while not stop_event.is_set(): # Use select to check if data is available ready, _, _ = select.select([pipe], [], [], 0.1) if ready: try: # Read available data if hasattr(pipe, 'read'): data = pipe.read(4096) else: data = os.read(pipe, 4096) if not data: break # EOF reached # Decode if bytes if isinstance(data, bytes): text = data.decode('utf-8', errors='replace') else: text = data # Write to all destinations for dest in destinations: dest.write(text) dest.flush() except (OSError, IOError): # Pipe closed or error reading break except Exception as e: print(f"[track-it] Stream copier error: {e}", file=sys.stderr) def run_process(command, working_dir, log_combined, log_stdout, log_stderr, env=None): """ Run a process with independent stdout/stderr copying. The process outputs normally to the console, and we create independent copies in log files. """ # Start the process with separate pipes for stdout and stderr process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=sys.stdin, cwd=working_dir, env=env, bufsize=0, # Unbuffered for real-time output ) # Event to signal threads to stop stop_event = threading.Event() # Create threads to copy streams stdout_thread = threading.Thread( target=stream_copier, args=(process.stdout, [sys.stdout, log_stdout, log_combined], stop_event), daemon=True ) stderr_thread = threading.Thread( target=stream_copier, args=(process.stderr, [sys.stderr, log_stderr, log_combined], stop_event), daemon=True ) # Start the threads stdout_thread.start() stderr_thread.start() return process, stdout_thread, stderr_thread, stop_event def parse_custom_args(argv): """ Parse arguments with support for environment variables. Format: track-it [options] [ENV=value ...] [--] command [args...] """ process_id = None working_dir = None env_vars = {} command = [] i = 0 # First, parse track-it options while i < len(argv): arg = argv[i] if arg == '--id' and i + 1 < len(argv): process_id = argv[i + 1] i += 2 elif arg == '--dir' and i + 1 < len(argv): working_dir = argv[i + 1] i += 2 elif arg in ['-h', '--help']: return None, None, None, None # Signal to show help elif arg == '--': # Everything after -- is the command command = argv[i + 1:] break elif '=' in arg and not arg.startswith('-'): # This looks like an environment variable key, value = arg.split('=', 1) if key and key.isidentifier(): # Valid env var name env_vars[key] = value i += 1 else: # Not a valid env var, must be start of command command = argv[i:] break elif arg.startswith('-'): # Unknown option print(f"Error: Unknown option: {arg}", file=sys.stderr) sys.exit(1) else: # First non-option, non-env-var argument is the start of the command command = argv[i:] break return process_id, working_dir, env_vars, command def main(): # Custom argument parsing to support env vars process_id, working_dir, env_vars, command = parse_custom_args(sys.argv[1:]) if process_id is None and working_dir is None and env_vars is None and command is None: # Show help print("""Usage: track-it [options] [ENV=value ...] [--] command [args...] Track commands with independent stdout/stderr logging. Options: --id ID Custom process ID (auto-generated if not provided) --dir DIR Working directory for the command -h, --help Show this help message Environment Variables: You can set environment variables by specifying KEY=value pairs before the command. Use -- to separate env vars from the command if needed. Examples: track-it echo hello track-it --id myapp python app.py track-it PORT=8080 -- python server.py track-it --id web PORT=3000 DEBUG=true -- npm start """) sys.exit(0) if not command: print("Error: No command specified", file=sys.stderr) sys.exit(1) # Setup process_id = process_id or generate_process_id() working_dir = working_dir or os.getcwd() log_combined, log_stdout, log_stderr = get_log_paths(process_id) command_str = " ".join(command) # Initialize registry registry = ProcessRegistry() # Prepare environment if env_vars: # Merge with existing environment env = os.environ.copy() env.update(env_vars) else: env = None # Use parent's environment as-is # Open all log files with open(log_combined, "w", buffering=1) as log_comb, \ open(log_stdout, "w", buffering=1) as log_out, \ open(log_stderr, "w", buffering=1) as log_err: # Build header with env vars if present env_section = "" if env_vars: env_section = "Environment variables:\n" for key, value in env_vars.items(): env_section += f" {key}={value}\n" # Write headers to all logs header = f"""{'='*60} Process ID: {process_id} Command: {command_str} Working directory: {working_dir} {env_section}Started at: {datetime.now().isoformat()} {'='*60} """ for log in [log_comb, log_out, log_err]: log.write(header) log.flush() print(f"[track-it] Process ID: {process_id}") print(f"[track-it] Command: {command_str}") print(f"[track-it] Working directory: {working_dir}") if env_vars: print(f"[track-it] Environment variables:") for key, value in env_vars.items(): print(f" {key}={value}") print(f"[track-it] Logs:") print(f" Combined: {log_combined}") print(f" Stdout: {log_stdout}") print(f" Stderr: {log_stderr}") print(f"[track-it] Starting process...\n") try: # Run the process with stream copying process, stdout_thread, stderr_thread, stop_event = run_process( command, working_dir, log_comb, log_out, log_err, env ) # Register process with all log files registry.register_process( process_id=process_id, command=command_str, pid=process.pid, log_file=str(log_combined), working_dir=working_dir, stdout_log=str(log_stdout), stderr_log=str(log_stderr), ) # Handle signals gracefully def signal_handler(signum, frame): print(f"\n[track-it] Received signal {signum}, stopping process...") stop_event.set() process.terminate() try: exit_code = process.wait(timeout=5) except subprocess.TimeoutExpired: print("[track-it] Process didn't terminate, killing...") process.kill() exit_code = process.wait() # Update registry with interrupted status registry.update_process_status( process_id=process_id, status="failed", exit_code=exit_code if exit_code is not None else -1, ) # Write interruption footer to logs footer = f""" {'='*60} Process interrupted by signal {signum} Exit code: {exit_code if exit_code is not None else -1} Completed at: {datetime.now().isoformat()} {'='*60} """ for log in [log_comb, log_out, log_err]: log.write(footer) log.flush() print(f"[track-it] Process terminated with exit code {exit_code}") sys.exit(0) # Exit the track-it process signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Wait for process to complete exit_code = process.wait() # Signal threads to stop and wait for them stop_event.set() stdout_thread.join(timeout=1) stderr_thread.join(timeout=1) # Write footers footer = f""" {'='*60} Process exited with code {exit_code} Completed at: {datetime.now().isoformat()} {'='*60} """ for log in [log_comb, log_out, log_err]: log.write(footer) # Update registry status = "completed" if exit_code == 0 else "failed" registry.update_process_status( process_id=process_id, status=status, exit_code=exit_code, ) print(f"\n[track-it] Process {status}") print(f"[track-it] Exit code: {exit_code}") sys.exit(exit_code) except FileNotFoundError: error_msg = f"Command not found: {args.command[0]}" for log in [log_comb, log_out, log_err]: log.write(f"\nERROR: {error_msg}\n") print(f"[track-it] ERROR: {error_msg}", file=sys.stderr) registry.update_process_status( process_id=process_id, status="failed", exit_code=127, ) sys.exit(127) except Exception as e: error_msg = f"Failed to execute command: {e}" for log in [log_comb, log_out, log_err]: log.write(f"\nERROR: {error_msg}\n") print(f"[track-it] ERROR: {error_msg}", file=sys.stderr) registry.update_process_status( process_id=process_id, status="failed", exit_code=1, ) sys.exit(1) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/scottmsilver/mcp-track-it'

If you have feedback or need assistance with the MCP directory API, please join our Discord server