Skip to main content
Glama
share_to_github_pages.py52.7 kB
#!/usr/bin/env python3 """Interactive wizard for exporting mailboxes and deploying to GitHub Pages. This script automates the entire workflow: 1. Select projects and export options 2. Preview the bundle locally 3. Create/update GitHub repository 4. Enable GitHub Pages 5. Push and deploy Requirements: - gh CLI installed and authenticated (gh auth status) - git configured with user.name and user.email """ from __future__ import annotations import json import re import secrets import shlex import shutil import subprocess import sys import time import webbrowser from contextlib import suppress from importlib import resources as _resources from pathlib import Path from typing import Any from rich.console import Console from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, TextColumn from rich.prompt import Confirm, Prompt from rich.table import Table console = Console() # Configuration directory CONFIG_DIR = Path.home() / ".mcp-agent-mail" CONFIG_FILE = CONFIG_DIR / "wizard-config.json" SESSION_DIR = CONFIG_DIR / "wizard-session" SESSION_STATE_FILE = SESSION_DIR / "session.json" SESSION_BUNDLE_DIR = SESSION_DIR / "bundle" SESSION_VERSION = 1 def find_available_port(start: int = 9000, end: int = 9100) -> int: """Find an available port in the given range (inclusive).""" import socket for port in range(start, end + 1): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", port)) return port except OSError: continue raise RuntimeError(f"No available ports in range {start}-{end}") def parse_selection(choice: str, max_items: int) -> list[int]: """Parse selection string like '1,3,5' or '1-3,5' into list of indices.""" if choice.strip().lower() == "all": return list(range(max_items)) indices = [] try: for part in choice.split(","): part = part.strip() if "-" in part: start_str, end_str = part.split("-", 1) start, end = int(start_str.strip()), int(end_str.strip()) if start < 1 or end > max_items or start > end: raise ValueError(f"Invalid range: {part}") indices.extend(range(start - 1, end)) else: idx = int(part) if idx < 1 or idx > max_items: raise ValueError(f"Invalid index: {idx}") indices.append(idx - 1) return sorted(set(indices)) # Remove duplicates and sort except ValueError as e: console.print(f"[red]Invalid selection:[/] {e}") return [] def save_config(config: dict[str, Any]) -> None: """Save wizard configuration for next run.""" try: CONFIG_DIR.mkdir(parents=True, exist_ok=True) import json with CONFIG_FILE.open("w") as f: json.dump(config, f, indent=2) except Exception as e: console.print(f"[yellow]Could not save config:[/] {e}") def load_last_config() -> dict[str, Any] | None: """Load last wizard configuration if it exists.""" if not CONFIG_FILE.exists(): return None try: with CONFIG_FILE.open("r") as f: return json.load(f) except Exception: return None def clear_resume_state() -> None: """Remove any saved in-progress session and workspace bundle.""" with suppress(FileNotFoundError): SESSION_STATE_FILE.unlink() if SESSION_BUNDLE_DIR.exists(): shutil.rmtree(SESSION_BUNDLE_DIR, ignore_errors=True) def save_resume_state(state: dict[str, Any]) -> None: """Persist current wizard session state for resuming later.""" SESSION_DIR.mkdir(parents=True, exist_ok=True) payload = dict(state) payload["version"] = SESSION_VERSION payload["bundle_path"] = str(SESSION_BUNDLE_DIR) with SESSION_STATE_FILE.open("w") as fh: json.dump(payload, fh, indent=2) def load_resume_state() -> dict[str, Any] | None: """Load in-progress session state if available and consistent.""" if not SESSION_STATE_FILE.exists(): return None try: with SESSION_STATE_FILE.open("r") as fh: data = json.load(fh) except Exception: clear_resume_state() return None if data.get("version") != SESSION_VERSION: clear_resume_state() return None bundle_path = Path(data.get("bundle_path", "")).expanduser() if not bundle_path.exists(): clear_resume_state() return None data["bundle_path"] = bundle_path return data def _refresh_viewer_assets(bundle_dir: Path) -> None: """Overwrite viewer assets in an existing bundle with the latest packaged files. This ensures CSP and scripts are up to date even when reusing a prior export. """ try: viewer_root = bundle_dir / "viewer" viewer_root.mkdir(parents=True, exist_ok=True) # Prefer copying from the live source tree in development source_tree = Path(__file__).resolve().parents[1] / "src" / "mcp_agent_mail" / "viewer_assets" if source_tree.exists() and source_tree.is_dir(): for src in sorted(p for p in source_tree.rglob("*") if p.is_file()): rel = src.relative_to(source_tree) dst = viewer_root / rel dst.parent.mkdir(parents=True, exist_ok=True) dst.write_bytes(src.read_bytes()) return # Fallback to packaged resources package_root = _resources.files("mcp_agent_mail.viewer_assets") def _walk(node: Any, rel: Path) -> None: # type: ignore[no-any-explicit] for child in node.iterdir(): child_rel = rel / child.name if child.is_dir(): _walk(child, child_rel) else: dest = viewer_root / child_rel dest.parent.mkdir(parents=True, exist_ok=True) dest.write_bytes(child.read_bytes()) _walk(package_root, Path()) except Exception as exc: console.print(f"[yellow]Warning:[/] Failed to refresh viewer assets: {exc}") def estimate_bundle_size(projects: list[str]) -> str: """Estimate bundle size based on project count (rough approximation).""" # Very rough estimate: 5-20MB per project depending on message count base_size = 2 # Static assets ~2MB project_size = len(projects) * 10 # ~10MB per project average total_mb = base_size + project_size if total_mb < 1: return "< 1 MB" elif total_mb < 1024: return f"~{total_mb} MB" else: return f"~{total_mb / 1024:.1f} GB" def validate_github_repo_available(repo_name: str) -> tuple[bool, str]: """Check if GitHub repo name is available.""" try: result = subprocess.run( ["gh", "repo", "view", repo_name], capture_output=True, text=True, check=False, ) if result.returncode == 0: return False, f"Repository '{repo_name}' already exists in your account" return True, "" except FileNotFoundError: return True, "" # Can't check, assume available def show_deployment_summary( projects: list[str], scrub_preset: str, deployment: dict[str, Any], signing_key: Path | None, ) -> bool: """Show deployment summary and ask for confirmation.""" console.print("\n[bold cyan]═══ Deployment Summary ═══[/]\n") # Projects console.print(f"[bold]Projects:[/] {len(projects)} selected") console.print(f"[bold]Bundle size:[/] {estimate_bundle_size(projects)}") console.print(f"[bold]Redaction:[/] {scrub_preset}") # Deployment target deploy_type = deployment.get("type", "unknown") if deploy_type == "local": console.print(f"[bold]Target:[/] Local export to {deployment.get('path', './mailbox-export')}") elif deploy_type == "github-new": console.print("[bold]Target:[/] GitHub Pages") console.print(f" Repository: {deployment.get('repo_name', '(not set)')}") console.print(f" Visibility: {'Private' if deployment.get('private', False) else 'Public'}") elif deploy_type == "cloudflare-pages": console.print("[bold]Target:[/] Cloudflare Pages") console.print(f" Project: {deployment.get('project_name', '(not set)')}") # Signing if signing_key: console.print("[bold]Signing:[/] Enabled (Ed25519)") else: console.print("[bold]Signing:[/] Disabled") console.print() return Confirm.ask("[bold]Proceed with export and deployment?[/]", default=True) def detect_existing_github_repo(repo_name: str) -> bool: """Check if GitHub repo already exists.""" try: result = subprocess.run( ["gh", "repo", "view", repo_name], capture_output=True, text=True, check=False, ) return result.returncode == 0 except FileNotFoundError: return False def detect_package_manager() -> str | None: """Detect the available package manager on this system.""" managers = { "brew": ["brew", "--version"], "apt": ["apt", "--version"], "dnf": ["dnf", "--version"], "npm": ["npm", "--version"], } for name, cmd in managers.items(): try: subprocess.run(cmd, capture_output=True, check=True) return name except (FileNotFoundError, subprocess.CalledProcessError): continue return None def install_gh_cli() -> bool: """Offer to install gh CLI automatically.""" console.print("\n[yellow]gh CLI is not installed.[/]") pkg_mgr = detect_package_manager() if not pkg_mgr or pkg_mgr == "npm": console.print("[cyan]Install gh CLI from:[/] https://cli.github.com/") return False # Only brew is simple enough to automate reliably if pkg_mgr == "brew": if Confirm.ask("Install gh CLI using Homebrew?", default=True): console.print("[cyan]Running:[/] brew install gh") try: subprocess.run(["brew", "install", "gh"], check=True) console.print("[green]✓ gh CLI installed successfully[/]") return True except subprocess.CalledProcessError: console.print("[red]Installation failed.[/]") return False return False # For apt/dnf, show manual instructions (requires adding repo first) if pkg_mgr == "apt": console.print("\n[cyan]To install gh CLI on Ubuntu/Debian:[/]") console.print(" curl -fsSL https://cli.github.com/packages/githubcli-archive-keyring.gpg | sudo dd of=/usr/share/keyrings/githubcli-archive-keyring.gpg") console.print(' echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/githubcli-archive-keyring.gpg] https://cli.github.com/packages stable main" | sudo tee /etc/apt/sources.list.d/github-cli.list > /dev/null') console.print(" sudo apt update") console.print(" sudo apt install gh -y") elif pkg_mgr == "dnf": console.print("\n[cyan]To install gh CLI on Fedora/RHEL:[/]") console.print(" sudo dnf install 'dnf-command(config-manager)'") console.print(" sudo dnf config-manager --add-repo https://cli.github.com/packages/rpm/gh-cli.repo") console.print(" sudo dnf install gh -y") else: console.print("[cyan]Install gh CLI from:[/] https://cli.github.com/") console.print("\n[yellow]Press Enter after installing to continue...[/]") input() # Check if it's now available try: subprocess.run(["gh", "--version"], capture_output=True, check=True) console.print("[green]✓ gh CLI detected[/]") return True except (FileNotFoundError, subprocess.CalledProcessError): console.print("[red]gh CLI still not found. Continuing anyway...[/]") return False def install_wrangler_cli() -> bool: """Offer to install wrangler CLI automatically.""" console.print("\n[yellow]wrangler CLI is not installed.[/]") # Check if npm is available try: subprocess.run(["npm", "--version"], capture_output=True, check=True) except (FileNotFoundError, subprocess.CalledProcessError): console.print("[red]npm is required to install wrangler.[/]") console.print("[cyan]Install Node.js from:[/] https://nodejs.org/") return False if Confirm.ask("Install wrangler CLI using npm?", default=True): console.print("[cyan]Running:[/] npm install -g wrangler") try: subprocess.run(["npm", "install", "-g", "wrangler"], check=True) console.print("[green]✓ wrangler CLI installed successfully[/]") return True except subprocess.CalledProcessError: console.print("[red]Installation failed. Try manually:[/] npm install -g wrangler") return False return False def authenticate_gh_cli() -> bool: """Guide user through gh CLI authentication.""" console.print("\n[bold cyan]GitHub CLI Authentication[/]") console.print("You need to authenticate with GitHub to create repositories and enable Pages.") console.print("\n[dim]The next command will open your browser to authenticate.[/]") if not Confirm.ask("Run 'gh auth login' now?", default=True): console.print("[yellow]Skipping authentication. You can run 'gh auth login' manually later.[/]") return False try: # Run gh auth login interactively (don't capture output) subprocess.run(["gh", "auth", "login"], check=True) console.print("[green]✓ GitHub authentication complete[/]") return True except subprocess.CalledProcessError: console.print("[red]Authentication failed.[/]") return False def authenticate_wrangler_cli() -> bool: """Guide user through wrangler CLI authentication.""" console.print("\n[bold cyan]Cloudflare Wrangler Authentication[/]") console.print("You need to authenticate with Cloudflare to deploy to Pages.") console.print("\n[dim]The next command will open your browser to authenticate.[/]") if not Confirm.ask("Run 'wrangler login' now?", default=True): console.print("[yellow]Skipping authentication. You can run 'wrangler login' manually later.[/]") return False try: # Run wrangler login interactively subprocess.run(["wrangler", "login"], check=True) console.print("[green]✓ Cloudflare authentication complete[/]") return True except subprocess.CalledProcessError: console.print("[red]Authentication failed.[/]") return False def check_prerequisites(require_github: bool = False, require_cloudflare: bool = False) -> bool: """Check if required tools are installed and configured.""" all_satisfied = True # Check gh CLI (only if GitHub deployment selected) if require_github: try: result = subprocess.run( ["gh", "auth", "status"], capture_output=True, text=True, check=False, ) if result.returncode == 0: console.print("[green]✓ gh CLI installed and authenticated[/]") else: console.print("[yellow]⚠ gh CLI installed but not authenticated[/]") if not authenticate_gh_cli(): all_satisfied = False except FileNotFoundError: if install_gh_cli(): # After install, try to authenticate if not authenticate_gh_cli(): all_satisfied = False else: all_satisfied = False # Check wrangler CLI (only if Cloudflare deployment selected) if require_cloudflare: try: result = subprocess.run( ["wrangler", "whoami"], capture_output=True, text=True, check=False, ) if result.returncode == 0: console.print("[green]✓ wrangler CLI installed and authenticated[/]") else: console.print("[yellow]⚠ wrangler CLI installed but not authenticated[/]") if not authenticate_wrangler_cli(): all_satisfied = False except FileNotFoundError: if install_wrangler_cli(): if not authenticate_wrangler_cli(): all_satisfied = False else: all_satisfied = False # Check git config (only needed for GitHub deployment) if require_github: try: subprocess.run(["git", "config", "user.name"], capture_output=True, check=True, text=True) subprocess.run(["git", "config", "user.email"], capture_output=True, check=True, text=True) console.print("[green]✓ git configured[/]") except (FileNotFoundError, subprocess.CalledProcessError): console.print("[red]❌ git not configured[/]") console.print("[cyan]Run:[/] git config --global user.name \"Your Name\"") console.print("[cyan]Run:[/] git config --global user.email \"you@example.com\"") all_satisfied = False if not all_satisfied: console.print("\n[yellow]Some prerequisites are missing. Please address them and try again.[/]") return all_satisfied def get_projects() -> list[dict[str, str]]: """Get list of projects from the database.""" try: result = subprocess.run( ["uv", "run", "python", "-m", "mcp_agent_mail.cli", "list-projects", "--json"], capture_output=True, text=True, check=True, ) # Parse JSON output import json projects_data = json.loads(result.stdout) return [{"slug": p["slug"], "human_key": p["human_key"]} for p in projects_data] except (subprocess.CalledProcessError, json.JSONDecodeError, KeyError, TypeError): return [] def select_projects(projects: list[dict[str, str]]) -> list[str]: """Interactive project selection with support for ranges and lists.""" if not projects: console.print("[yellow]No projects found. Create some messages first![/]") sys.exit(1) console.print("\n[bold]Available Projects:[/]") table = Table(show_header=True, header_style="bold cyan") table.add_column("#", style="dim") table.add_column("Slug") table.add_column("Path") for idx, proj in enumerate(projects, 1): table.add_row(str(idx), proj["slug"], proj["human_key"]) console.print(table) while True: choice = Prompt.ask( "\n[bold]Select projects to export[/] (e.g., 'all', '1,3,5', or '1-3')", default="all", ) indices = parse_selection(choice, len(projects)) if indices: # Valid selection return [projects[idx]["human_key"] for idx in indices] # If empty, parse_selection already printed error, loop continues def select_scrub_preset() -> str: """Select redaction preset.""" console.print("\n[bold]Redaction Preset:[/]") console.print(" [cyan]standard[/]: Scrub secrets (API keys, tokens), keep agent names and message bodies") console.print(" [cyan]strict[/]: Replace all message bodies with placeholders, remove attachments") return Prompt.ask( "Choose preset", choices=["standard", "strict"], default="standard", ) def select_deployment_target() -> dict[str, Any]: """Select where to deploy.""" console.print("\n[bold]Deployment Target:[/]") console.print(" 1. GitHub Pages (create new repository)") console.print(" 2. Cloudflare Pages (fast global CDN)") console.print(" 3. Export locally only") choice = Prompt.ask("Choose option", choices=["1", "2", "3"], default="1") if choice == "3": output_dir = Prompt.ask("Output directory", default="./mailbox-export") return {"type": "local", "path": output_dir} if choice == "2": # Cloudflare Pages deployment project_name = Prompt.ask("Cloudflare Pages project name", default="mailbox-viewer") return { "type": "cloudflare-pages", "project_name": project_name, } # GitHub Pages deployment - create new repo repo_name = Prompt.ask("Repository name", default="mailbox-viewer") is_private = Confirm.ask("Make repository private?", default=False) description = Prompt.ask( "Repository description", default="MCP Agent Mail static viewer", ) return { "type": "github-new", "repo_name": repo_name, "private": is_private, "description": description, } def generate_signing_key() -> Path: """Generate Ed25519 signing key in current directory.""" # Save to current directory (not /tmp) so it persists key_path = Path.cwd() / f"signing-{secrets.token_hex(4)}.key" key_path.write_bytes(secrets.token_bytes(32)) # Set secure permissions (best-effort on Windows where this may not apply) with suppress(OSError, NotImplementedError): key_path.chmod(0o600) console.print(f"[yellow]⚠ Private signing key saved to:[/] {key_path}") console.print("[yellow]⚠ Back up this file securely - you'll need it to update the bundle[/]") return key_path def export_bundle( output_dir: Path, projects: list[str], scrub_preset: str, signing_key: Path | None = None, ) -> tuple[bool, Path | None]: """Export mailbox bundle.""" base_cmd = [ "uv", "run", "python", "-m", "mcp_agent_mail.cli", "share", "export", "--output", str(output_dir), "--no-zip", ] cmd = list(base_cmd) if scrub_preset != "none": cmd.extend(["--scrub-preset", scrub_preset]) for project in projects: cmd.extend(["--project", project]) signing_pub_path = None if signing_key: signing_pub_path = signing_key.with_suffix(".pub") # Always sign; only write public key file if it doesn't already exist cmd.extend([ "--signing-key", str(signing_key), ]) if not signing_pub_path.exists(): cmd.extend([ "--signing-public-out", str(signing_pub_path), ]) else: console.print(f"[dim]Reusing existing signing public key at {signing_pub_path}[/]") console.print("\n[bold]Exporting mailbox bundle...[/]") with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task("Exporting...", total=None) try: subprocess.run(cmd, capture_output=True, text=True, check=True) progress.update(task, completed=True) console.print("[green]✓ Export complete[/]") return True, signing_pub_path except subprocess.CalledProcessError as e: progress.update(task, completed=True) stdout_text = e.stdout or "" stderr_text = e.stderr or "" combined = (stdout_text + "\n" + stderr_text).strip() details = combined if combined else f"Exit code {e.returncode} (no output)" # Fallback: if export failed due to signing dependency, retry without signing if signing_key and ("PyNaCl is required" in details or "Failed to sign manifest" in details or "Public key output file already exists" in details): console.print("[yellow]Signing unavailable or failed. Retrying export without signing...[/]") no_sign_cmd = [arg for arg in cmd if arg not in {"--signing-key", "--signing-public-out", str(signing_key), str(signing_pub_path)}] try: subprocess.run(no_sign_cmd, capture_output=True, text=True, check=True) console.print("[green]✓ Export complete (unsigned)[/]") return True, None except subprocess.CalledProcessError as e2: out2 = (e2.stdout or "") + "\n" + (e2.stderr or "") console.print(f"[red]Export failed (retry without signing):[/]\n{out2.strip()}") return False, None console.print(f"[red]Export failed:[/]\n{details}") return False, None def preview_bundle(output_dir: Path) -> bool: """Launch preview server and ask user to confirm.""" import socket console.print("\n[bold cyan]Launching preview server...[/]") # Find available port try: port = find_available_port() console.print(f"[dim]Using port {port} (Ctrl+C to stop server)[/]") except RuntimeError as e: console.print(f"[red]{e}[/]") return False process = None try: # Start preview server in background process = subprocess.Popen( [ "uv", "run", "python", "-m", "mcp_agent_mail.cli", "share", "preview", str(output_dir), "--port", str(port), ] ) # Wait for server to be ready by polling the port console.print("[cyan]Waiting for server to start...[/]") max_attempts = 30 for _attempt in range(max_attempts): try: with socket.create_connection(("127.0.0.1", port), timeout=1): break except (ConnectionRefusedError, OSError): if process.poll() is not None: console.print("[red]Preview server failed to start[/]") return False time.sleep(0.5) else: console.print("[red]Preview server did not start in time[/]") process.terminate() return False # Server is ready, open browser (viewer entry) console.print(f"[green]✓ Server ready, opening browser at http://127.0.0.1:{port}/viewer/[/]") webbrowser.open(f"http://127.0.0.1:{port}/viewer/") # Wait for server process to complete (user will Ctrl+C or 'q'/'d') process.wait() # Special exit code 42 means the user pressed 'd' to request deployment if process.returncode == 42: console.print("\n[bold green]Deploy requested from preview ('d' key).[/]") return Confirm.ask("Proceed with deployment now?", default=True) # After server stops, ask if satisfied console.print("\n[bold]Preview complete.[/]") return Confirm.ask("Are you satisfied with the preview?", default=True) except KeyboardInterrupt: console.print("\n[yellow]Preview interrupted[/]") return Confirm.ask("Continue with deployment anyway?", default=False) finally: # Always clean up the process, regardless of how we exit if process is not None and process.poll() is None: console.print("[dim]Stopping preview server...[/]") process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: console.print("[yellow]Warning: Preview server did not stop cleanly, forcing shutdown...[/]") process.kill() process.wait() # Wait for kill to complete def create_github_repo(name: str, private: bool, description: str) -> tuple[bool, str]: """Create new GitHub repository and return owner/name.""" visibility = "--private" if private else "--public" try: result = subprocess.run( [ "gh", "repo", "create", name, visibility, "--description", description, "--clone=false", ], capture_output=True, text=True, check=True, ) # Get the full repo name (owner/repo) result = subprocess.run( ["gh", "repo", "view", name, "--json", "nameWithOwner", "-q", ".nameWithOwner"], capture_output=True, text=True, check=True, ) full_name = result.stdout.strip() console.print(f"[green]✓ Created repository: {full_name}[/]") return True, full_name except subprocess.CalledProcessError as e: stdout_msg = (e.stdout or "").strip() stderr_msg = (e.stderr or "").strip() combined = "\n".join(part for part in (stdout_msg, stderr_msg) if part) # If the repository already exists, reuse it instead of failing. if "already exists" in combined.lower(): console.print("[yellow]Repository already exists; reusing existing repo.[/]") try: view = subprocess.run( ["gh", "repo", "view", name, "--json", "nameWithOwner", "-q", ".nameWithOwner"], capture_output=True, text=True, check=True, ) full_name = view.stdout.strip() console.print(f"[green]✓ Using existing repository: {full_name}[/]") return True, full_name except subprocess.CalledProcessError as view_error: console.print( "[red]Failed to look up existing repository:[/]\n" f"{(view_error.stdout or '').strip()}\n{(view_error.stderr or '').strip()}" ) return False, "" console.print(f"[red]Failed to create repository:[/]\n{combined}") return False, "" def init_and_push_repo(output_dir: Path, repo_full_name: str, branch: str = "main") -> bool: """Initialize git repo and push to GitHub with real-time output.""" console.print("\n[bold cyan]Initializing git repository and pushing...[/]") commands = [ (["git", "init"], "Initializing repository"), (["git", "add", "."], "Adding files"), (["git", "commit", "-m", "Initial mailbox export"], "Creating commit"), (["git", "branch", "-M", branch], f"Setting branch to {branch}"), ] for cmd, description in commands: console.print(f"[cyan]{description}...[/]") try: # Stream output in real-time process = subprocess.Popen( cmd, cwd=output_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, # Line buffered ) # Print output as it arrives assert process.stdout is not None for line in process.stdout: console.print(f" [dim]{line.rstrip()}[/]") process.wait() if process.returncode != 0: console.print(f"[red]✗ {description} failed[/]") return False console.print(f"[green]✓ {description} complete[/]") except Exception as e: console.print(f"[red]Git operation failed:[/] {e}") return False remote_url = f"https://github.com/{repo_full_name}.git" console.print("[cyan]Configuring GitHub remote...[/]") add_remote = subprocess.run( ["git", "remote", "add", "origin", remote_url], cwd=output_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) if add_remote.returncode != 0: # If remote already exists, replace it with the HTTPS URL set_remote = subprocess.run( ["git", "remote", "set-url", "origin", remote_url], cwd=output_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) if set_remote.returncode != 0: console.print("[red]✗ Adding remote failed[/]") console.print(f" [dim]{(add_remote.stdout or '').strip()}[/]") console.print(f" [dim]{(set_remote.stdout or '').strip()}[/]") return False console.print("[green]✓ Configuring GitHub remote complete[/]") console.print("[cyan]Pushing to GitHub (force update)...[/]") push = subprocess.run( ["git", "push", "-u", "origin", branch, "--force"], cwd=output_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) for line in (push.stdout or "").splitlines(): console.print(f" [dim]{line}[/]") if push.returncode != 0: console.print("[red]✗ Pushing to GitHub failed[/]") return False console.print("[green]✓ Pushing to GitHub complete[/]") console.print(f"\n[bold green]✓ Successfully pushed to {repo_full_name}[/]") return True def enable_github_pages(repo_full_name: str, branch: str = "main") -> tuple[bool, str]: """Enable GitHub Pages for the repository (root directory).""" try: # Enable Pages via gh API (always use root "/" for our use case) subprocess.run( [ "gh", "api", f"repos/{repo_full_name}/pages", "-X", "POST", "-f", f"source[branch]={branch}", "-f", "source[path]=/", ], check=True, capture_output=True, text=True, ) # Get the Pages URL time.sleep(2) # Wait for Pages to be configured result = subprocess.run( ["gh", "api", f"repos/{repo_full_name}/pages", "-q", ".html_url"], capture_output=True, text=True, check=True, ) pages_url = result.stdout.strip() console.print("[green]✓ GitHub Pages enabled[/]") return True, pages_url except subprocess.CalledProcessError as e: # Pages might already be enabled if "already exists" in e.stderr or "409" in e.stderr: console.print("[yellow]GitHub Pages already enabled[/]") # Try to get the URL anyway try: result = subprocess.run( ["gh", "api", f"repos/{repo_full_name}/pages", "-q", ".html_url"], capture_output=True, text=True, check=True, ) pages_url = result.stdout.strip() return True, pages_url except subprocess.CalledProcessError: # Construct URL manually owner, name = repo_full_name.split("/") pages_url = f"https://{owner}.github.io/{name}/" return True, pages_url else: console.print(f"[red]Failed to enable GitHub Pages:[/]\n{e.stderr}") return False, "" def deploy_to_cloudflare_pages(output_dir: Path, project_name: str) -> tuple[bool, str]: """Deploy bundle to Cloudflare Pages with real-time output.""" console.print("\n[bold cyan]Deploying to Cloudflare Pages...[/]") try: # Use wrangler pages deploy command with real-time streaming process = subprocess.Popen( [ "wrangler", "pages", "deploy", str(output_dir), "--project-name", project_name, "--branch", "main", ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, # Line buffered ) # Collect output and parse for deployment URL pages_url = "" # Stream output in real-time assert process.stdout is not None for line in process.stdout: stripped = line.rstrip() console.print(f" [dim]{stripped}[/]") # Parse for deployment URL if ".pages.dev" in stripped and not pages_url: url_match = re.search(r"https://[^\s]+\.pages\.dev[^\s]*", stripped) if url_match: pages_url = url_match.group(0) process.wait() if process.returncode != 0: console.print(f"[red]Cloudflare Pages deployment failed with exit code {process.returncode}[/]") return False, "" if not pages_url: # Fallback: construct expected URL pages_url = f"https://{project_name}.pages.dev" console.print("\n[bold green]✓ Deployed to Cloudflare Pages[/]") return True, pages_url except Exception as e: console.print(f"[red]Cloudflare Pages deployment failed:[/] {e}") return False, "" def main() -> None: """Main wizard flow.""" console.print( Panel.fit( "[bold cyan]MCP Agent Mail → Deployment Wizard[/]\n\n" "This wizard will:\n" " 1. Export your mailbox to a static HTML bundle\n" " 2. Preview it locally\n" " 3. Deploy to GitHub Pages or Cloudflare Pages\n\n" "[dim]Press Ctrl+C anytime to cancel[/]", title="Welcome", ) ) resume_state = load_resume_state() resume_configured = False selected_projects: list[str] = [] scrub_preset = "standard" deployment: dict[str, Any] = {} use_signing = True generate_new_key = True signing_key: Path | None = None signing_pub: Path | None = None last_known_signing_path: Path | None = None bundle_ready = False if resume_state: console.print("\n[bold cyan]Incomplete session detected[/]") console.print(f" Projects: {len(resume_state.get('selected_projects', []))} selected") console.print(f" Stage: {resume_state.get('stage', 'preview')}") console.print(f" Workspace: {resume_state.get('bundle_path')}") if Confirm.ask("Resume where you left off?", default=True): resume_configured = True selected_projects = list(resume_state.get("selected_projects", [])) scrub_preset = resume_state.get("scrub_preset", "standard") deployment = resume_state.get("deployment", {}) use_signing = resume_state.get("use_signing", True) generate_new_key = resume_state.get("generate_new_key", True) bundle_ready = bool(resume_state.get("bundle_ready", False)) if use_signing: signing_candidate = resume_state.get("signing_key_path") if signing_candidate: candidate_path = Path(signing_candidate).expanduser() last_known_signing_path = candidate_path if generate_new_key: if candidate_path.exists(): signing_key = candidate_path else: console.print( f"[yellow]Saved signing key {candidate_path} is missing; a new key will be generated.[/]" ) signing_key = None signing_pub_candidate = resume_state.get("signing_pub_path") if signing_pub_candidate: candidate_path = Path(signing_pub_candidate) if candidate_path.exists(): signing_pub = candidate_path else: clear_resume_state() last_config = load_last_config() if not resume_configured else None use_last_config = False if last_config: console.print("\n[bold cyan]Previous Configuration Found[/]") console.print(f" Projects: {last_config.get('project_count', '?')} selected") console.print(f" Redaction: {last_config.get('scrub_preset', 'standard')}") console.print(f" Target: {last_config.get('deployment_type', 'unknown')}") use_last_config = Confirm.ask("\nUse these settings again?", default=True) if resume_configured: deployment = deployment or {} if not deployment.get("type"): console.print("[yellow]Saved session missing deployment target; starting a fresh run.[/]") clear_resume_state() resume_configured = False if not resume_configured: if not use_last_config: deployment = select_deployment_target() else: deployment = (last_config or {}).get("deployment", {}) if not deployment.get("type"): console.print("[yellow]Saved deployment config is invalid, please select again[/]") deployment = select_deployment_target() use_last_config = False require_gh = deployment.get("type") == "github-new" require_cf = deployment.get("type") == "cloudflare-pages" if not check_prerequisites(require_github=require_gh, require_cloudflare=require_cf): sys.exit(1) projects_list = get_projects() if not projects_list: console.print("[yellow]No projects found. Create some messages first![/]") sys.exit(1) console.print(f"\n[green]Found {len(projects_list)} project(s)[/]") selected_indices: list[int] = [] if resume_configured and last_config: last_known = last_config.get("signing_key_path") if last_known: last_known_signing_path = Path(last_known).expanduser() if resume_configured: available = {p["human_key"]: idx for idx, p in enumerate(projects_list)} selected_projects = [proj for proj in selected_projects if proj in available] if not selected_projects: console.print("[yellow]Saved session references projects that no longer exist. Starting over.[/]") clear_resume_state() resume_configured = False else: selected_indices = [available[p] for p in selected_projects] if not resume_configured: if use_last_config and last_config: saved_indices = last_config.get("project_indices", list(range(len(projects_list)))) selected_indices = [i for i in saved_indices if i < len(projects_list)] if not selected_indices: console.print("[yellow]Saved project selection invalid, please select again[/]") selected_projects = select_projects(projects_list) selected_indices = [i for i, p in enumerate(projects_list) if p["human_key"] in selected_projects] scrub_preset = select_scrub_preset() else: selected_projects = [projects_list[idx]["human_key"] for idx in selected_indices] console.print(f"[green]Using saved selection: {len(selected_projects)} project(s)[/]") scrub_preset = last_config.get("scrub_preset", "standard") last_known = last_config.get("signing_key_path") if last_known: last_known_signing_path = Path(last_known).expanduser() else: selected_projects = select_projects(projects_list) selected_indices = [i for i, p in enumerate(projects_list) if p["human_key"] in selected_projects] scrub_preset = select_scrub_preset() use_signing = True generate_new_key = True signing_key = None if use_last_config and last_config: use_signing = last_config.get("use_signing", True) generate_new_key = last_config.get("generate_new_key", True) else: use_signing = Confirm.ask("\nSign the bundle with Ed25519?", default=True) generate_new_key = Confirm.ask("Generate a new signing key?", default=True) if use_signing else False if use_signing: if generate_new_key: signing_key = generate_signing_key() last_known_signing_path = signing_key else: if last_known_signing_path: key_path = Prompt.ask( "Path to existing signing key", default=str(last_known_signing_path), ) else: key_path = Prompt.ask("Path to existing signing key") signing_key = Path(key_path).expanduser().resolve() last_known_signing_path = signing_key signing_pub = None bundle_ready = False else: if use_signing: if generate_new_key: if signing_key is None or not signing_key.exists(): signing_key = generate_signing_key() last_known_signing_path = signing_key else: if last_known_signing_path: key_path = Prompt.ask( "Path to existing signing key", default=str(last_known_signing_path), ) else: key_path = Prompt.ask("Path to existing signing key") signing_key = Path(key_path).expanduser().resolve() last_known_signing_path = signing_key if not selected_projects: console.print("[yellow]No projects selected. Nothing to export.[/]") sys.exit(0) if not show_deployment_summary(selected_projects, scrub_preset, deployment, signing_key): console.print("[yellow]Deployment cancelled by user[/]") sys.exit(0) bundle_path = SESSION_BUNDLE_DIR reuse_existing = resume_configured and bundle_ready and bundle_path.exists() if reuse_existing: reuse_existing = Confirm.ask("\nReuse the previously exported bundle for preview?", default=False) if not reuse_existing: if bundle_path.exists(): shutil.rmtree(bundle_path, ignore_errors=True) bundle_path.mkdir(parents=True, exist_ok=True) success, signing_pub = export_bundle( bundle_path, selected_projects, scrub_preset, signing_key if use_signing else None, ) if not success: sys.exit(1) bundle_ready = True else: console.print("[green]Reusing existing bundle workspace for preview.[/]") bundle_ready = True # Always refresh viewer assets in the bundle to pick up latest CSP/scripts _refresh_viewer_assets(bundle_path) saved_path = signing_key or last_known_signing_path save_resume_state({ "stage": "preview", "selected_projects": selected_projects, "scrub_preset": scrub_preset, "deployment": deployment, "use_signing": use_signing, "generate_new_key": generate_new_key, "signing_key_path": str(saved_path) if saved_path else None, "signing_pub_path": str(signing_pub) if signing_pub else None, "bundle_ready": bundle_ready, "timestamp": time.time(), }) if not Confirm.ask("\nPreview the bundle before deploying?", default=True): satisfied = True else: satisfied = preview_bundle(bundle_path) if not satisfied: console.print( f"[yellow]Deployment cancelled. The bundle remains at {bundle_path}. Run the wizard again to resume.[/]" ) sys.exit(0) if deployment.get("type") == "local": output_path = Path(deployment.get("path", "./mailbox-export")).expanduser().resolve() output_path.parent.mkdir(parents=True, exist_ok=True) shutil.copytree(bundle_path, output_path, dirs_exist_ok=True) console.print(f"\n[bold green]✓ Exported to: {output_path}[/]") if signing_pub: console.print(f"[green]✓ Signing public key: {signing_pub}[/]") console.print( "\n[cyan]To refresh this bundle later, run:[/]\n uv run python -m mcp_agent_mail.cli share update " f"{shlex.quote(str(output_path))}" ) save_config({ "project_indices": selected_indices, "project_count": len(selected_projects), "scrub_preset": scrub_preset, "deployment": deployment, "deployment_type": "local", "use_signing": use_signing, "generate_new_key": generate_new_key, "signing_key_path": str(signing_key) if signing_key else None, }) clear_resume_state() elif deployment.get("type") == "github-new": success, repo_full_name = create_github_repo( deployment.get("repo_name", "mailbox-viewer"), deployment.get("private", False), deployment.get("description", "MCP Agent Mail static viewer"), ) if not success: sys.exit(1) if not init_and_push_repo(bundle_path, repo_full_name): sys.exit(1) success, pages_url = enable_github_pages(repo_full_name) if success: console.print( Panel.fit( f"[bold green]Deployment Complete![/]\n\n" f"Repository: https://github.com/{repo_full_name}\n" f"GitHub Pages: {pages_url}\n\n" f"[dim]Note: Pages may take 1-2 minutes to become available[/]", title="Success", border_style="green", ) ) if signing_pub: console.print(f"\n[cyan]Signing public key saved to:[/] {signing_pub}") console.print("[dim]Share this with viewers to verify bundle authenticity[/]") console.print( "\n[cyan]To refresh this deployment later, run:[/]\n" f" uv run python -m mcp_agent_mail.cli share update {shlex.quote(str(bundle_path))}\n" f" wrangler pages deploy {shlex.quote(str(bundle_path))} --project-name {deployment.get('project_name', 'mailbox-viewer')} --branch main" ) console.print( "\n[cyan]To publish fresh mailbox data later, run:[/]\n" f" uv run python -m mcp_agent_mail.cli share update {shlex.quote(str(bundle_path))}\n" " git add . && git commit -m \"Refresh mailbox\" && git push" ) save_config({ "project_indices": selected_indices, "project_count": len(selected_projects), "scrub_preset": scrub_preset, "deployment": deployment, "deployment_type": "github-new", "use_signing": use_signing, "generate_new_key": generate_new_key, "signing_key_path": str(signing_key) if signing_key else None, }) clear_resume_state() else: console.print("\n[yellow]Repository created but Pages setup failed[/]") console.print(f"Visit https://github.com/{repo_full_name}/settings/pages to enable manually") elif deployment.get("type") == "cloudflare-pages": success, pages_url = deploy_to_cloudflare_pages(bundle_path, deployment.get("project_name", "mailbox-viewer")) if success: console.print( Panel.fit( f"[bold green]Deployment Complete![/]\n\n" f"Cloudflare Pages: {pages_url}\n\n" f"[dim]Note: Your site should be live immediately[/]", title="Success", border_style="green", ) ) if signing_pub: console.print(f"\n[cyan]Signing public key saved to:[/] {signing_pub}") console.print("[dim]Share this with viewers to verify bundle authenticity[/]") save_config({ "project_indices": selected_indices, "project_count": len(selected_projects), "scrub_preset": scrub_preset, "deployment": deployment, "deployment_type": "cloudflare-pages", "use_signing": use_signing, "generate_new_key": generate_new_key, "signing_key_path": str(signing_key) if signing_key else None, }) clear_resume_state() else: console.print("\n[yellow]Cloudflare Pages deployment failed[/]") sys.exit(1) if __name__ == "__main__": try: main() except KeyboardInterrupt: console.print("\n[yellow]Cancelled by user[/]") sys.exit(0) except Exception as e: console.print(f"\n[bold red]Error:[/] {e}") sys.exit(1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dicklesworthstone/mcp_agent_mail'

If you have feedback or need assistance with the MCP directory API, please join our Discord server