Skip to main content
Glama
delta_wrapper.py5.76 kB
"""Delta command execution wrapper with proper async subprocess handling.""" import asyncio import logging from typing import Optional import shutil logger = logging.getLogger(__name__) # Default timeout for delta operations (30 seconds) DEFAULT_TIMEOUT = 30.0 # Maximum output size (100KB) MAX_OUTPUT_SIZE = 100_000 async def safe_delta_execution( cmd: list[str], input_text: str | bytes | None = None, timeout: float = DEFAULT_TIMEOUT, delta_path: Optional[str] = None ) -> str: """Safely execute delta with proper error handling. Args: cmd: Command and arguments for delta (e.g., ["delta", "--syntax-theme", "github"]) input_text: Input text to pipe to delta (will be encoded if string) timeout: Timeout in seconds (default: 30.0) delta_path: Custom path to delta executable Returns: Formatted output from delta, or error message """ # Find delta executable if delta_path: delta_exec = shutil.which(delta_path) or delta_path else: delta_exec = shutil.which("delta") if not delta_exec: return "Error: Delta not installed. Install with: cargo install git-delta" # Use custom delta path if provided if delta_path and delta_exec != cmd[0]: cmd = [delta_exec] + cmd[1:] elif not delta_path: cmd = [delta_exec] + cmd[1:] try: # CRITICAL: asyncio.create_subprocess_exec does NOT support text=True # Input must be encoded if provided as string input_bytes = None if input_text: if isinstance(input_text, str): input_bytes = input_text.encode('utf-8') else: input_bytes = input_text process = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE if input_bytes else None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Add timeout (November 2025 best practice) try: stdout, stderr = await asyncio.wait_for( process.communicate(input=input_bytes), timeout=timeout ) except asyncio.TimeoutError: process.kill() await process.wait() return "Error: Delta execution timed out" if process.returncode != 0: error_msg = stderr.decode('utf-8', errors='replace') if stderr else "Unknown error" logger.error(f"Delta failed: {error_msg}") return f"Error: {error_msg or 'Delta execution failed'}" # Decode output (always bytes from subprocess) output = stdout.decode('utf-8', errors='replace') # Limit output size if len(output) > MAX_OUTPUT_SIZE: output = output[:MAX_OUTPUT_SIZE] + f"\n\n[Output truncated - {len(output)} bytes total]" return output except FileNotFoundError: return "Error: Delta not installed. Install with: cargo install git-delta" except Exception as e: logger.error(f"Unexpected error: {e}", exc_info=True) return f"Error: {str(e)}" async def execute_git_then_delta( git_cmd: list[str], delta_cmd: list[str], timeout: float = DEFAULT_TIMEOUT, delta_path: Optional[str] = None ) -> str: """Execute git command and pipe output to delta. Args: git_cmd: Git command and arguments delta_cmd: Delta command and arguments timeout: Timeout in seconds delta_path: Custom path to delta executable Returns: Formatted output from delta, or error message """ # Find delta executable if delta_path: delta_exec = shutil.which(delta_path) or delta_path else: delta_exec = shutil.which("delta") if not delta_exec: return "Error: Delta not installed. Install with: cargo install git-delta" # Update delta command with executable path if delta_path and delta_exec != delta_cmd[0]: delta_cmd = [delta_exec] + delta_cmd[1:] elif not delta_path: delta_cmd = [delta_exec] + delta_cmd[1:] try: # Execute: git <cmd> | delta <options> # First, get output from git git_process = await asyncio.create_subprocess_exec( *git_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Read git output first try: git_stdout, git_stderr = await asyncio.wait_for( git_process.communicate(), timeout=timeout ) except asyncio.TimeoutError: git_process.kill() await git_process.wait() return "Error: Git command timed out." if git_process.returncode != 0: error_msg = git_stderr.decode('utf-8', errors='replace') if git_stderr else "Unknown error" logger.error(f"Git error: {error_msg}") return f"Error: Git command failed: {error_msg}" # Now pipe git output to delta git_output = git_stdout.decode('utf-8', errors='replace') # Execute delta with git output as input return await safe_delta_execution(delta_cmd, input_text=git_output, timeout=timeout, delta_path=None) except FileNotFoundError as e: if "git" in str(e).lower(): return "Error: Git not found. Please install Git." return "Error: Delta not found. Install with: cargo install git-delta" except Exception as e: logger.error(f"Error in execute_git_then_delta: {e}", exc_info=True) return f"Error: {str(e)}"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bajpainaman/DeltaMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server