Skip to main content
Glama

MCP Multiagent Bridge

by dannystocker
yolo_guard.py10.7 kB
#!/usr/bin/env python3 """ YOLO Mode Guard - Multi-stage confirmation system Prevents accidental/unauthorized command execution by requiring: 1. Environment variable flag (YOLO_MODE=1) 2. Typed confirmation phrase 3. One-time random code 4. Time-limited approval token for actual execution Author: Danny Stocker License: MIT """ import os import sys import secrets import json from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Optional class YOLOGuard: """Multi-stage confirmation system for dangerous operations""" TOKEN_FILE = Path.home() / ".yolo_tokens.json" AUDIT_LOG = Path.home() / "yolo_audit.log" @classmethod def require_confirmation(cls) -> bool: """ Stage 1: Manual confirmation with typed phrases Returns: True if user completes confirmation flow """ # Check environment variable if os.getenv("YOLO_MODE") != "1": print("❌ YOLO mode is disabled.") print(" Set YOLO_MODE=1 to enable.") return False # Display warning print("\n" + "="*70) print("⚠️ WARNING: YOLO MODE ENABLES COMMAND EXECUTION") print("="*70) print("\nThis allows AI agents to run commands on your system.") print("Commands will have access to your files and permissions.") print("\nOnly proceed if:") print(" • You understand the security implications") print(" • You are in an isolated/sandboxed environment") print(" • You have backups of important data") print(" • You will supervise all operations") print() # Require exact confirmation phrase required_phrase = "I UNDERSTAND THE RISKS" confirmation = input(f"Type '{required_phrase}' to continue: ").strip() if confirmation != required_phrase: print("❌ Confirmation phrase incorrect. Aborting.") cls._log_audit("CONFIRMATION_FAILED", { "reason": "incorrect_phrase", "provided": confirmation[:20] + "..." }) return False # Generate and require one-time code code = secrets.token_hex(3) # 6-character hex string print(f"\nOne-time code: {code}") user_code = input("Retype the code above: ").strip() if user_code != code: print("❌ Code mismatch. Aborting.") cls._log_audit("CONFIRMATION_FAILED", { "reason": "code_mismatch" }) return False # Success cls._log_audit("YOLO_ENABLED", { "method": "interactive_confirmation", "timestamp": datetime.now().isoformat() }) print("\n✅ YOLO mode enabled for this session") print(" Use --generate-token to create execution tokens\n") return True @classmethod def generate_approval_token(cls, ttl_seconds: int = 300) -> str: """ Stage 2: Generate time-limited execution token Args: ttl_seconds: Token lifetime in seconds (default: 5 minutes) Returns: URL-safe token string """ token = secrets.token_urlsafe(32) expires_at = datetime.now() + timedelta(seconds=ttl_seconds) # Load existing tokens tokens = cls._load_tokens() # Store new token tokens[token] = { "created_at": datetime.now().isoformat(), "expires_at": expires_at.isoformat(), "ttl_seconds": ttl_seconds, "used": False } cls._save_tokens(tokens) cls._log_audit("TOKEN_GENERATED", { "token_preview": token[:10] + "...", "ttl_seconds": ttl_seconds, "expires_at": expires_at.isoformat() }) print(f"\n✅ Approval token generated") print(f" Token: {token}") print(f" Valid for: {ttl_seconds} seconds ({ttl_seconds//60} minutes)") print(f" Expires at: {expires_at.strftime('%Y-%m-%d %H:%M:%S')}") print(f"\nUse with:") print(f" --execute --approval-token {token}") print() return token @classmethod def validate_approval_token(cls, token: str) -> bool: """ Stage 3: Validate and consume approval token Args: token: Token to validate Returns: True if token is valid, False otherwise """ tokens = cls._load_tokens() # Check if token exists if token not in tokens: cls._log_audit("TOKEN_INVALID", { "token_preview": token[:10] + "...", "reason": "not_found" }) return False token_data = tokens[token] # Check if already used if token_data["used"]: cls._log_audit("TOKEN_INVALID", { "token_preview": token[:10] + "...", "reason": "already_used", "used_at": token_data.get("used_at", "unknown") }) return False # Check expiration expires_at = datetime.fromisoformat(token_data["expires_at"]) if datetime.now() > expires_at: cls._log_audit("TOKEN_INVALID", { "token_preview": token[:10] + "...", "reason": "expired", "expired_at": token_data["expires_at"] }) return False # Mark as used tokens[token]["used"] = True tokens[token]["used_at"] = datetime.now().isoformat() cls._save_tokens(tokens) cls._log_audit("TOKEN_VALIDATED", { "token_preview": token[:10] + "...", "created_at": token_data["created_at"], "used_at": tokens[token]["used_at"] }) return True @classmethod def _load_tokens(cls) -> Dict: """Load tokens from file""" if not cls.TOKEN_FILE.exists(): return {} try: return json.loads(cls.TOKEN_FILE.read_text()) except json.JSONDecodeError: # Corrupted file, start fresh return {} @classmethod def _save_tokens(cls, tokens: Dict): """Save tokens to file with restricted permissions""" cls.TOKEN_FILE.write_text(json.dumps(tokens, indent=2)) cls.TOKEN_FILE.chmod(0o600) # Owner read/write only @classmethod def _log_audit(cls, action: str, details: Dict): """Append audit entry to log file""" entry = { "timestamp": datetime.now().isoformat(), "action": action, "details": details } # Ensure log directory exists cls.AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True) # Append as JSON lines with open(cls.AUDIT_LOG, "a") as f: f.write(json.dumps(entry) + "\n") @classmethod def cleanup_expired_tokens(cls) -> int: """Remove expired tokens from storage""" tokens = cls._load_tokens() now = datetime.now() expired = [] for token, data in tokens.items(): expires_at = datetime.fromisoformat(data["expires_at"]) if now > expires_at: expired.append(token) for token in expired: del tokens[token] if expired: cls._save_tokens(tokens) cls._log_audit("TOKENS_CLEANED", { "count": len(expired) }) return len(expired) @classmethod def list_active_tokens(cls) -> list: """List all valid (non-expired, unused) tokens""" tokens = cls._load_tokens() now = datetime.now() active = [] for token, data in tokens.items(): expires_at = datetime.fromisoformat(data["expires_at"]) if not data["used"] and now <= expires_at: active.append({ "token_preview": token[:10] + "...", "created_at": data["created_at"], "expires_at": data["expires_at"], "ttl_seconds": data["ttl_seconds"] }) return active def main(): """CLI interface for YOLO guard""" import argparse parser = argparse.ArgumentParser( description="YOLO Mode Guard - Safe command execution gating", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Enable YOLO mode (requires confirmation) export YOLO_MODE=1 python yolo_guard.py --enable-yolo # Generate 5-minute token python yolo_guard.py --generate-token --ttl 300 # List active tokens python yolo_guard.py --list-tokens # Clean up expired tokens python yolo_guard.py --cleanup """ ) parser.add_argument( "--enable-yolo", action="store_true", help="Enable YOLO mode with interactive confirmation" ) parser.add_argument( "--generate-token", action="store_true", help="Generate time-limited approval token" ) parser.add_argument( "--ttl", type=int, default=300, help="Token TTL in seconds (default: 300 = 5 minutes)" ) parser.add_argument( "--list-tokens", action="store_true", help="List active (valid) tokens" ) parser.add_argument( "--cleanup", action="store_true", help="Remove expired tokens" ) args = parser.parse_args() # Enable YOLO mode if args.enable_yolo: success = YOLOGuard.require_confirmation() sys.exit(0 if success else 1) # Generate token if args.generate_token: if os.getenv("YOLO_MODE") != "1": print("❌ YOLO_MODE not enabled. Set YOLO_MODE=1 first.") sys.exit(1) YOLOGuard.generate_approval_token(args.ttl) sys.exit(0) # List active tokens if args.list_tokens: tokens = YOLOGuard.list_active_tokens() if not tokens: print("No active tokens.") else: print(f"\nActive tokens: {len(tokens)}") for token in tokens: print(f"\n Token: {token['token_preview']}") print(f" Created: {token['created_at']}") print(f" Expires: {token['expires_at']}") print(f" TTL: {token['ttl_seconds']}s") sys.exit(0) # Cleanup expired if args.cleanup: count = YOLOGuard.cleanup_expired_tokens() print(f"✅ Removed {count} expired token(s)") sys.exit(0) # No arguments - show help parser.print_help() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dannystocker/mcp-multiagent-bridge'

If you have feedback or need assistance with the MCP directory API, please join our Discord server