Skip to main content
Glama

iTerm2 Worktree MCP Server

5
  • Apple
worktree_mcp_server.py45.8 kB
#!/usr/bin/env python3 import asyncio import json import os import subprocess import sys import time import iterm2 from typing import Any, Dict, List, Optional class WorktreeMCPServer: def __init__(self): # Check if running in iTerm self.is_iterm = self.detect_iterm() # Only provide tools if running in iTerm self.tools = self.get_tools() if self.is_iterm else [] def detect_iterm(self) -> bool: """Detect if the MCP server is running in iTerm""" try: # Check environment variables that iTerm sets term_program = os.environ.get('TERM_PROGRAM', '') if term_program == 'iTerm.app': return True # Try to connect to iTerm to verify it's available import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: connection = loop.run_until_complete(iterm2.Connection.async_create()) loop.run_until_complete(connection.async_close()) return True except: return False finally: loop.close() except Exception as e: print(f"Warning: Could not detect iTerm: {e}", file=sys.stderr) return False def get_tools(self) -> List[Dict[str, Any]]: """Get the list of available tools""" return [ { "name": "createWorktree", "description": "Create a git worktree with iTerm automation to start development", "inputSchema": { "type": "object", "properties": { "feature_name": { "type": "string", "description": "The feature name to work on (e.g., 'add-auth')" }, "branch_name": { "type": "string", "description": "The branch name to use (e.g., 'feature/add-auth')" }, "worktree_folder": { "type": "string", "description": "The worktree folder name (e.g., 'project-name-feat-add-auth')" }, "description": { "type": "string", "description": "Description of the task to do" }, "start_claude": { "type": "boolean", "description": "Whether to automatically start Claude with the task description (default: false). Only set to true if you want Claude to start with a specific command." }, "open_location": { "type": "string", "enum": ["new_tab", "new_window", "new_pane_right", "new_pane_below"], "description": "Where to open the worktree (default: new_tab). Options: new_tab (new tab), new_window (new window), new_pane_right (vertical split, new pane to right), new_pane_below (horizontal split, new pane below)" }, "switch_back": { "type": "boolean", "description": "Whether to switch back to the original tab/window after opening the worktree (default: false). Only applies to new_tab and new_window locations." } }, "required": ["feature_name", "branch_name", "worktree_folder", "description"] } }, { "name": "closeWorktree", "description": "Close a worktree after checking it's clean and pushed", "inputSchema": { "type": "object", "properties": { "worktree_name": { "type": "string", "description": "The name of the worktree folder to close" } }, "required": ["worktree_name"] } }, { "name": "activeWorktrees", "description": "List all active worktrees managed by this MCP server", "inputSchema": { "type": "object", "properties": {}, "required": [] } }, { "name": "switchToWorktree", "description": "Switch to a worktree tab in iTerm2", "inputSchema": { "type": "object", "properties": { "worktree_name": { "type": "string", "description": "The name of the worktree folder to switch to" }, "tab_id": { "type": "string", "description": "Optional specific tab ID to switch to. If not provided, will find tab by worktree path" } }, "required": ["worktree_name"] } }, { "name": "openWorktree", "description": "Open an existing worktree in a new iTerm2 tab", "inputSchema": { "type": "object", "properties": { "worktree_name": { "type": "string", "description": "The name of the worktree folder to open" }, "force": { "type": "boolean", "description": "Force open in new tab even if worktree is already open elsewhere (default: false)" }, "open_location": { "type": "string", "enum": ["new_tab", "new_window", "new_pane_right", "new_pane_below"], "description": "Where to open the worktree (default: new_tab). Options: new_tab (new tab), new_window (new window), new_pane_right (vertical split, new pane to right), new_pane_below (horizontal split, new pane below)" }, "switch_back": { "type": "boolean", "description": "Whether to switch back to the original tab/window after opening the worktree (default: false). Only applies to new_tab and new_window locations." } }, "required": ["worktree_name"] } } ] async def find_tab_by_path(self, worktree_path: str) -> Optional[str]: """Find iTerm2 tab ID that has the given worktree path as working directory""" try: connection = await iterm2.Connection.async_create() app = await iterm2.async_get_app(connection) # Normalize the worktree path for comparison normalized_worktree = os.path.normpath(worktree_path) # Search through all tabs to find one with matching working directory for window in app.windows: for tab in window.tabs: session = tab.current_session if session: # Get the working directory of the session try: working_dir = await session.async_get_variable("path") if working_dir: normalized_working_dir = os.path.normpath(working_dir) if normalized_working_dir == normalized_worktree: return tab.tab_id except: # If we can't get the path, continue to next session continue return None except Exception as e: print(f"Warning: Could not search iTerm tabs: {e}", file=sys.stderr) return None async def find_all_tabs_by_path(self, worktree_path: str) -> List[Dict[str, Any]]: """Find all iTerm2 tabs that have the given worktree path as working directory""" try: connection = await iterm2.Connection.async_create() app = await iterm2.async_get_app(connection) # Normalize the worktree path for comparison normalized_worktree = os.path.normpath(worktree_path) # Get current window to determine thisWindow flag current_window = app.current_window current_window_id = current_window.window_id if current_window else None matching_tabs = [] # Search through all tabs to find ones with matching working directory for window in app.windows: for tab in window.tabs: session = tab.current_session if session: # Get the working directory of the session try: working_dir = await session.async_get_variable("path") if working_dir: normalized_working_dir = os.path.normpath(working_dir) if normalized_working_dir == normalized_worktree: matching_tabs.append({ "tabId": tab.tab_id, "windowId": window.window_id, "thisWindow": window.window_id == current_window_id }) except: # If we can't get the path, continue to next session continue return matching_tabs except Exception as e: print(f"Warning: Could not search iTerm tabs: {e}", file=sys.stderr) return [] def get_all_git_worktrees(self) -> List[Dict[str, str]]: """Get all git worktrees from git command""" try: result = subprocess.run( ["git", "worktree", "list", "--porcelain"], capture_output=True, text=True, check=True ) worktrees = [] current_worktree = {} for line in result.stdout.strip().split('\n'): if not line: if current_worktree: worktrees.append(current_worktree) current_worktree = {} continue if line.startswith('worktree '): current_worktree['path'] = line[9:] # Remove 'worktree ' prefix # Extract folder name from path current_worktree['folder'] = os.path.basename(current_worktree['path']) elif line.startswith('branch '): current_worktree['branch'] = line[7:] # Remove 'branch ' prefix elif line.startswith('HEAD '): current_worktree['head'] = line[5:] # Remove 'HEAD ' prefix # Add the last worktree if exists if current_worktree: worktrees.append(current_worktree) return worktrees except subprocess.CalledProcessError: return [] except Exception: return [] def validate_worktree_creation(self, branch_name: str, worktree_folder: str) -> tuple[bool, str]: """Validate if worktree can be created""" # Check if we're in a git repo try: result = subprocess.run( ["git", "rev-parse", "--git-dir"], capture_output=True, text=True, check=True ) except subprocess.CalledProcessError: return False, "Not in a git repository" # Check if branch already exists try: result = subprocess.run( ["git", "branch", "--list", branch_name], capture_output=True, text=True, check=True ) if result.stdout.strip(): return False, f"Branch '{branch_name}' already exists" except subprocess.CalledProcessError: return False, "Failed to check if branch exists" # Check if worktree folder already exists in parent directory parent_dir = os.path.dirname(os.getcwd()) worktree_path = os.path.join(parent_dir, worktree_folder) if os.path.exists(worktree_path): return False, f"Folder '{worktree_folder}' already exists in parent directory" return True, "Validation passed" def create_worktree(self, branch_name: str, worktree_folder: str) -> tuple[bool, str]: """Create the git worktree""" try: parent_dir = os.path.dirname(os.getcwd()) worktree_path = os.path.join(parent_dir, worktree_folder) # Create worktree with new branch result = subprocess.run( ["git", "worktree", "add", "-b", branch_name, worktree_path], capture_output=True, text=True, check=True ) return True, f"Worktree created successfully at {worktree_path}" except subprocess.CalledProcessError as e: return False, f"Failed to create worktree: {e.stderr}" async def automate_iterm(self, worktree_folder: str, description: str, start_claude: bool = True, open_location: str = "new_tab", switch_back: bool = False) -> tuple[bool, str]: """Automate iTerm to open worktree in specified location, cd to worktree, and optionally start claude""" try: # Connect to iTerm connection = await iterm2.Connection.async_create() app = await iterm2.async_get_app(connection) # Get current window and session for context current_window = app.current_window if not current_window: return False, "No current iTerm window found" original_tab = current_window.current_tab original_session = original_tab.current_session if original_tab else None session = None tab_id = None # Create session based on open_location if open_location == "new_window": # Create new window new_window = await iterm2.Window.async_create(connection) session = new_window.current_tab.current_session tab_id = new_window.current_tab.tab_id elif open_location == "new_tab": # Create new tab (original behavior) new_tab = await current_window.async_create_tab() session = new_tab.current_session tab_id = new_tab.tab_id elif open_location == "new_pane_right": # Split pane vertically (new pane to the right) if not original_session: return False, "No current session found for pane split" session = await original_session.async_split_pane(vertical=True) # For panes, we use the tab ID of the containing tab tab_id = original_tab.tab_id elif open_location == "new_pane_below": # Split pane horizontally (new pane below) if not original_session: return False, "No current session found for pane split" session = await original_session.async_split_pane(vertical=False) # For panes, we use the tab ID of the containing tab tab_id = original_tab.tab_id else: return False, f"Invalid open_location: {open_location}" if not session: return False, f"Failed to create session for {open_location}" # Wait 1 second then cd to worktree await asyncio.sleep(1) parent_dir = os.path.dirname(os.getcwd()) worktree_path = os.path.join(parent_dir, worktree_folder) await session.async_send_text(f"cd '{worktree_path}'\n") # Optionally send claude command with disallowed tools and description as argument if start_claude: escaped_description = description.replace('"', '\\"') await session.async_send_text(f'claude "{escaped_description}" --disallowedTools mcp__worktree__createWorktree,mcp__worktree__closeWorktree,mcp__worktree__activeWorktrees,mcp__worktree__switchToWorktree,mcp__worktree__openWorktree\n') # Switch back to original tab/window only if switch_back is True and for new_tab and new_window cases if switch_back and open_location in ["new_tab", "new_window"] and original_tab: await original_tab.async_select() return True, f"iTerm automation completed successfully ({open_location})" except Exception as e: return False, f"iTerm automation failed: {str(e)}" def validate_worktree_closure(self, worktree_name: str) -> tuple[bool, str]: """Validate if worktree can be closed (clean and pushed)""" parent_dir = os.path.dirname(os.getcwd()) worktree_path = os.path.join(parent_dir, worktree_name) if not os.path.exists(worktree_path): return False, f"Worktree '{worktree_name}' does not exist" try: # Check if git status is clean result = subprocess.run( ["git", "status", "--porcelain"], cwd=worktree_path, capture_output=True, text=True, check=True ) if result.stdout.strip(): return False, f"Worktree has uncommitted changes: {result.stdout.strip()}" # Check if all commits are pushed (if upstream exists) # First check if there's an upstream branch upstream_check = subprocess.run( ["git", "rev-parse", "--abbrev-ref", "@{u}"], cwd=worktree_path, capture_output=True, text=True ) if upstream_check.returncode == 0: # Upstream exists, check for unpushed commits result = subprocess.run( ["git", "log", "--oneline", "@{u}..HEAD"], cwd=worktree_path, capture_output=True, text=True, check=True ) if result.stdout.strip(): return False, f"Worktree has unpushed commits: {result.stdout.strip()}" else: # No upstream, check if there are commits ahead of the base branch # First get the base branch (usually main/master) base_branch_result = subprocess.run( ["git", "symbolic-ref", "refs/remotes/origin/HEAD"], cwd=worktree_path, capture_output=True, text=True ) if base_branch_result.returncode == 0: # Extract base branch name from refs/remotes/origin/HEAD base_branch = base_branch_result.stdout.strip().split('/')[-1] else: # Fallback to common base branch names for branch in ["main", "master"]: check_result = subprocess.run( ["git", "rev-parse", "--verify", f"origin/{branch}"], cwd=worktree_path, capture_output=True, text=True ) if check_result.returncode == 0: base_branch = branch break else: # If we can't determine base branch, allow deletion if working tree is clean return True, "Worktree is clean and can be deleted" # Check for commits ahead of base branch result = subprocess.run( ["git", "log", "--oneline", f"origin/{base_branch}..HEAD"], cwd=worktree_path, capture_output=True, text=True ) if result.returncode == 0 and result.stdout.strip(): return False, f"Branch has commits ahead of origin/{base_branch} but no upstream configured. Push the branch first or use --force" return True, "Worktree is clean and pushed" except subprocess.CalledProcessError as e: return False, f"Failed to check worktree status: {e.stderr}" async def check_iterm_tab_exists(self, tab_id: str) -> bool: """Check if iTerm tab exists""" try: connection = await iterm2.Connection.async_create() app = await iterm2.async_get_app(connection) # Find tab by ID for window in app.windows: for tab in window.tabs: if tab.tab_id == tab_id: return True return False except Exception as e: # If we can't connect to iTerm, assume tab doesn't exist return False async def close_iterm_tab(self, tab_id: str) -> tuple[bool, str]: """Close iTerm tab if it exists""" try: connection = await iterm2.Connection.async_create() app = await iterm2.async_get_app(connection) # Find tab by ID for window in app.windows: for tab in window.tabs: if tab.tab_id == tab_id: await tab.async_close() return True, f"Closed tab {tab_id}" return False, f"Tab {tab_id} not found" except Exception as e: return False, f"Failed to close tab: {str(e)}" def check_branch_has_commits(self, worktree_name: str) -> tuple[bool, str]: """Check if the worktree's branch has any commits beyond the base branch""" parent_dir = os.path.dirname(os.getcwd()) worktree_path = os.path.join(parent_dir, worktree_name) try: # Get the current branch name branch_result = subprocess.run( ["git", "branch", "--show-current"], cwd=worktree_path, capture_output=True, text=True, check=True ) current_branch = branch_result.stdout.strip() # Get base branch (usually main/master) base_branch_result = subprocess.run( ["git", "symbolic-ref", "refs/remotes/origin/HEAD"], cwd=worktree_path, capture_output=True, text=True ) if base_branch_result.returncode == 0: base_branch = base_branch_result.stdout.strip().split('/')[-1] else: # Fallback to common base branch names for branch in ["main", "master"]: check_result = subprocess.run( ["git", "rev-parse", "--verify", f"origin/{branch}"], cwd=worktree_path, capture_output=True, text=True ) if check_result.returncode == 0: base_branch = branch break else: return False, "Could not determine base branch" # Check for commits ahead of base branch result = subprocess.run( ["git", "log", "--oneline", f"origin/{base_branch}..HEAD"], cwd=worktree_path, capture_output=True, text=True ) if result.returncode == 0: has_commits = bool(result.stdout.strip()) return has_commits, current_branch else: return False, "Failed to check commit history" except subprocess.CalledProcessError as e: return False, f"Failed to check branch commits: {e.stderr}" def delete_branch(self, branch_name: str) -> tuple[bool, str]: """Delete a git branch""" try: # Delete the branch result = subprocess.run( ["git", "branch", "-D", branch_name], capture_output=True, text=True, check=True ) return True, f"Deleted branch '{branch_name}'" except subprocess.CalledProcessError as e: return False, f"Failed to delete branch '{branch_name}': {e.stderr}" async def handle_close_worktree(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle the closeWorktree tool call""" worktree_name = arguments["worktree_name"] # Step 1: Validate worktree can be closed valid, validation_msg = self.validate_worktree_closure(worktree_name) if not valid: return { "content": [ { "type": "text", "text": f"❌ Cannot close worktree: {validation_msg}" } ] } # Step 2: Check if branch has commits and get branch name has_commits, branch_name_or_error = self.check_branch_has_commits(worktree_name) branch_to_delete = None if isinstance(branch_name_or_error, str) and not has_commits: branch_to_delete = branch_name_or_error # Step 3: Find tab ID dynamically by worktree path parent_dir = os.path.dirname(os.getcwd()) worktree_path = os.path.join(parent_dir, worktree_name) tab_id = await self.find_tab_by_path(worktree_path) # Step 4: Remove worktree try: parent_dir = os.path.dirname(os.getcwd()) worktree_path = os.path.join(parent_dir, worktree_name) result = subprocess.run( ["git", "worktree", "remove", worktree_path], capture_output=True, text=True, check=True ) except subprocess.CalledProcessError as e: return { "content": [ { "type": "text", "text": f"❌ Failed to remove worktree: {e.stderr}" } ] } # Step 5: Delete branch if it has no commits branch_deleted = False if branch_to_delete: success, delete_msg = self.delete_branch(branch_to_delete) branch_deleted = success # Step 6: Close iTerm tab if it exists tab_closed = False if tab_id: success, tab_msg = await self.close_iterm_tab(tab_id) tab_closed = success # Build success message message = f"✅ Successfully closed worktree '{worktree_name}'" if branch_deleted: message += f" and deleted branch '{branch_to_delete}'" if tab_closed: message += " and closed iTerm tab" elif tab_id: message += " (iTerm tab could not be closed)" return { "content": [ { "type": "text", "text": message } ] } async def handle_create_worktree(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle the createWorktree tool call""" feature_name = arguments["feature_name"] branch_name = arguments["branch_name"] worktree_folder = arguments["worktree_folder"] description = arguments["description"] # Step 0: Validate valid, validation_msg = self.validate_worktree_creation(branch_name, worktree_folder) if not valid: return { "content": [ { "type": "text", "text": f"❌ Validation failed: {validation_msg}" } ] } # Step 1: Create worktree success, worktree_msg = self.create_worktree(branch_name, worktree_folder) if not success: return { "content": [ { "type": "text", "text": f"❌ {worktree_msg}" } ] } # Steps 2-6: iTerm automation start_claude = arguments.get("start_claude", False) # Default to False to avoid guessing open_location = arguments.get("open_location", "new_tab") # Default to new_tab switch_back = arguments.get("switch_back", False) # Default to False success, iterm_msg = await self.automate_iterm(worktree_folder, description, start_claude, open_location, switch_back) if not success: return { "content": [ { "type": "text", "text": f"✅ Worktree created but iTerm automation failed: {iterm_msg}" } ] } return { "content": [ { "type": "text", "text": f"✅ Successfully created worktree '{worktree_folder}' with branch '{branch_name}' and started development session" } ] } async def handle_list_worktrees(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle the listWorktrees tool call""" # Get all git worktrees git_worktrees = self.get_all_git_worktrees() if not git_worktrees: return { "content": [ { "type": "text", "text": "📝 No git worktrees found" } ] } # Check each worktree's tab status dynamically and build response response_lines = ["📋 All Git Worktrees:"] for i, git_worktree in enumerate(git_worktrees, 1): folder = git_worktree.get("folder", "Unknown") branch = git_worktree.get("branch", "Unknown") path = git_worktree.get("path", "Unknown") # Find all iTerm2 tabs dynamically by path matching_tabs = await self.find_all_tabs_by_path(path) if matching_tabs: # Format tabs info tab_info_parts = [] for tab in matching_tabs: tab_exists = await self.check_iterm_tab_exists(tab["tabId"]) tab_status = "✅" if tab_exists else "❌" this_window_indicator = " (thisWindow)" if tab["thisWindow"] else "" tab_info_parts.append(f"Tab: {tab['tabId']}{this_window_indicator} {tab_status}") tab_info = ", ".join(tab_info_parts) response_lines.append(f" {i}. {folder} (Branch: {branch}, {tab_info})") else: # No tabs found with this worktree path response_lines.append(f" {i}. {folder} (Branch: {branch}, Path: {path}) 📍 No iTerm tabs found") return { "content": [ { "type": "text", "text": "\n".join(response_lines) } ] } async def handle_switch_to_worktree(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle the switchToWorktree tool call""" worktree_name = arguments["worktree_name"] tab_id = arguments.get("tab_id") try: connection = await iterm2.Connection.async_create() app = await iterm2.async_get_app(connection) target_tab_id = None if tab_id: # Tab ID provided - verify it exists tab_exists = await self.check_iterm_tab_exists(tab_id) if not tab_exists: return { "content": [ { "type": "text", "text": f"❌ Tab {tab_id} not found" } ] } target_tab_id = tab_id else: # No tab ID provided - find by worktree path parent_dir = os.path.dirname(os.getcwd()) worktree_path = os.path.join(parent_dir, worktree_name) # Check if worktree exists if not os.path.exists(worktree_path): return { "content": [ { "type": "text", "text": f"❌ Worktree '{worktree_name}' does not exist at {worktree_path}" } ] } # Find tab by worktree path target_tab_id = await self.find_tab_by_path(worktree_path) if not target_tab_id: return { "content": [ { "type": "text", "text": f"❌ No iTerm tab found for worktree '{worktree_name}' at {worktree_path}" } ] } # Find and switch to the target tab for window in app.windows: for tab in window.tabs: if tab.tab_id == target_tab_id: await tab.async_select() return { "content": [ { "type": "text", "text": f"✅ Switched to worktree '{worktree_name}' tab {target_tab_id}" } ] } # This shouldn't happen if check_iterm_tab_exists worked correctly return { "content": [ { "type": "text", "text": f"❌ Could not switch to tab {target_tab_id}" } ] } except Exception as e: return { "content": [ { "type": "text", "text": f"❌ Failed to switch to worktree: {str(e)}" } ] } async def handle_open_worktree(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle the openWorktree tool call""" worktree_name = arguments["worktree_name"] force = arguments.get("force", False) open_location = arguments.get("open_location", "new_tab") switch_back = arguments.get("switch_back", False) # Check if worktree exists parent_dir = os.path.dirname(os.getcwd()) worktree_path = os.path.join(parent_dir, worktree_name) if not os.path.exists(worktree_path): return { "content": [ { "type": "text", "text": f"❌ Worktree '{worktree_name}' does not exist at {worktree_path}" } ] } # Check if worktree is already open in any tabs (only for new_tab and new_window) if open_location in ["new_tab", "new_window"]: existing_tabs = await self.find_all_tabs_by_path(worktree_path) if existing_tabs and not force: # Worktree is already open and force is not set tab_info_parts = [] for tab in existing_tabs: this_window_indicator = " (thisWindow)" if tab["thisWindow"] else "" tab_info_parts.append(f"Tab: {tab['tabId']}{this_window_indicator}") tab_info = ", ".join(tab_info_parts) return { "content": [ { "type": "text", "text": f"❌ Worktree '{worktree_name}' is already open in {tab_info}. Use force=true to open in a new {open_location.replace('_', ' ')} anyway." } ] } # Open worktree in specified location try: connection = await iterm2.Connection.async_create() app = await iterm2.async_get_app(connection) # Get current window and session for context current_window = app.current_window if not current_window: return { "content": [ { "type": "text", "text": "❌ No current iTerm window found" } ] } original_tab = current_window.current_tab original_session = original_tab.current_session if original_tab else None session = None tab_id = None # Create session based on open_location if open_location == "new_window": # Create new window new_window = await iterm2.Window.async_create(connection) session = new_window.current_tab.current_session tab_id = new_window.current_tab.tab_id elif open_location == "new_tab": # Create new tab (original behavior) new_tab = await current_window.async_create_tab() session = new_tab.current_session tab_id = new_tab.tab_id elif open_location == "new_pane_right": # Split pane vertically (new pane to the right) if not original_session: return { "content": [ { "type": "text", "text": "❌ No current session found for pane split" } ] } session = await original_session.async_split_pane(vertical=True) # For panes, we use the tab ID of the containing tab tab_id = original_tab.tab_id elif open_location == "new_pane_below": # Split pane horizontally (new pane below) if not original_session: return { "content": [ { "type": "text", "text": "❌ No current session found for pane split" } ] } session = await original_session.async_split_pane(vertical=False) # For panes, we use the tab ID of the containing tab tab_id = original_tab.tab_id else: return { "content": [ { "type": "text", "text": f"❌ Invalid open_location: {open_location}" } ] } if not session: return { "content": [ { "type": "text", "text": f"❌ Failed to create session for {open_location}" } ] } # Wait 1 second then cd to worktree await asyncio.sleep(1) await session.async_send_text(f"cd '{worktree_path}'\n") # Switch back to original tab/window only if switch_back is True and for new_tab and new_window cases if switch_back and open_location in ["new_tab", "new_window"] and original_tab: await original_tab.async_select() force_message = " (forced)" if force and open_location in ["new_tab", "new_window"] else "" location_display = open_location.replace('_', ' ') return { "content": [ { "type": "text", "text": f"✅ Opened worktree '{worktree_name}' in {location_display} {tab_id}{force_message}" } ] } except Exception as e: return { "content": [ { "type": "text", "text": f"❌ Failed to open worktree: {str(e)}" } ] } async def handle_message(message: Dict[str, Any]) -> Dict[str, Any]: """Handle incoming MCP messages""" server = WorktreeMCPServer() method = message.get("method") if method == "initialize": return { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "serverInfo": { "name": "worktree-mcp-server", "version": "1.0.0" } } elif method == "tools/list": return { "tools": server.tools } elif method == "tools/call": tool_name = message["params"]["name"] arguments = message["params"]["arguments"] if tool_name == "createWorktree": return await server.handle_create_worktree(arguments) elif tool_name == "closeWorktree": return await server.handle_close_worktree(arguments) elif tool_name == "activeWorktrees": return await server.handle_list_worktrees(arguments) elif tool_name == "switchToWorktree": return await server.handle_switch_to_worktree(arguments) elif tool_name == "openWorktree": return await server.handle_open_worktree(arguments) else: return { "content": [ { "type": "text", "text": f"Unknown tool: {tool_name}" } ], "isError": True } else: return { "content": [ { "type": "text", "text": f"Unknown method: {method}" } ], "isError": True } async def main(): """Main MCP server loop""" # Read messages from stdin and write responses to stdout while True: try: line = sys.stdin.readline() if not line: break message = json.loads(line.strip()) response = await handle_message(message) # Send response with proper MCP format response_obj = { "jsonrpc": "2.0", "id": message.get("id"), "result": response } print(json.dumps(response_obj)) sys.stdout.flush() except EOFError: break except Exception as e: error_response = { "jsonrpc": "2.0", "id": message.get("id") if 'message' in locals() else None, "error": { "code": -32603, "message": f"Server error: {str(e)}" } } print(json.dumps(error_response)) sys.stdout.flush() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timoconnellaus/claude-code-iterm-worktree-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server