Skip to main content
Glama
jbroll

MCP Build Environment Service

by jbroll
server.py59.4 kB
#!/usr/bin/env python3 """ MCP Build Service Provides build environment access via MCP protocol with commands: - list: Show available repos - make: Run make with specified arguments - git: Run git commands (limited to safe operations) - ls: List files/directories in repository - read_file: Read file contents with optional line range - env: Show environment information and tool versions """ import argparse import asyncio import json import logging import os import secrets import shlex import signal import socket import subprocess import sys import urllib.request from pathlib import Path from collections import defaultdict from typing import Any, Dict, List from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.server.sse import SseServerTransport from mcp.types import Tool, TextContent from pydantic import BaseModel, Field from starlette.applications import Starlette from starlette.routing import Route from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from starlette.responses import JSONResponse, StreamingResponse, PlainTextResponse from starlette.requests import Request import uvicorn from validators import validate_git_args, validate_make_args, validate_ls_args, validate_path, validate_file_path # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("mcp-build") # Configuration ENV_INFO_SCRIPT = Path(__file__).parent / "env_info.sh" # Base directory for repositories - always uses current working directory REPOS_BASE_DIR = Path(os.getcwd()) # Global configuration variables (set by parse_args) TRANSPORT_MODE = "stdio" HTTP_HOST = "0.0.0.0" HTTP_PORT = 3344 EXTERNAL_HOST = None # External hostname for display/docs (defaults to socket.gethostname()) SESSION_KEY = None # Pydantic models for REST API class LsRequest(BaseModel): """Request model for ls command""" args: str = Field(default="", description="Arguments for ls command") branch: str | None = Field(default=None, description="Git branch name (optional)") class GitQuickRequest(BaseModel): """Request model for quick git operations""" operation: str = Field(..., description="Git operation (status, branch, log)") args: str = Field(default="", description="Additional arguments") branch: str | None = Field(default=None, description="Git branch name (optional)") class MakeStreamRequest(BaseModel): """Request model for streaming make operations""" args: str = Field(default="", description="Make target and arguments") branch: str | None = Field(default=None, description="Git branch name (optional)") class GitStreamRequest(BaseModel): """Request model for streaming git operations""" operation: str = Field(..., description="Git operation (pull, fetch, diff, show, checkout)") args: str = Field(default="", description="Additional arguments") branch: str | None = Field(default=None, description="Git branch name (optional)") class ReadFileRequest(BaseModel): """Request model for read_file command""" path: str = Field(..., description="Path to the file to read") start_line: int | None = Field(default=None, description="Starting line number (1-indexed, optional)") end_line: int | None = Field(default=None, description="Ending line number (1-indexed, optional)") branch: str | None = Field(default=None, description="Git branch name (optional)") class ApiResponse(BaseModel): """Standard API response model""" success: bool data: Any = None error: str = None class RepoInfo(BaseModel): """Repository information model""" name: str path: str description: str is_default: bool class ReposListResponse(BaseModel): """Response model for list repos endpoint""" repos: List[RepoInfo] class CommandResult(BaseModel): """Result of a command execution""" stdout: str stderr: str exit_code: int combined_output: str def verify_session_key(request: Request) -> bool: """Verify the session key from request headers or query parameters""" if not SESSION_KEY: # If no session key is configured, allow access (stdio mode) return True # Check Authorization header (Bearer token) auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] # Remove "Bearer " prefix logger.info(f"Try Key: {token} : {SESSION_KEY}") if secrets.compare_digest(token, SESSION_KEY): return True # Check query parameter token = request.query_params.get("key", "") logger.info(f"Try Key: {token} : {SESSION_KEY}") if token and secrets.compare_digest(token, SESSION_KEY): return True return False class BuildEnvironmentServer: """MCP Server for build operations""" def __init__(self): self.server = Server("mcp-build") self.repos: Dict[str, Dict[str, str]] = {} self.current_repo: str | None = None self.worktree_locks: Dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) # Register handlers using decorators self._register_handlers() def _register_handlers(self) -> None: """Register MCP protocol handlers""" @self.server.list_tools() async def handle_list_tools() -> List[Tool]: """List available MCP tools""" return await self.get_tools_list() @self.server.call_tool() async def handle_call_tool(name: str, arguments: Any) -> List[TextContent]: """Handle tool execution""" return await self.execute_tool(name, arguments) async def discover_repos(self): """Discover repositories by scanning the base directory for git repos""" self.repos = {} try: # Scan the base directory for subdirectories containing .git # Skip hidden directories (which include our managed worktrees) for item in REPOS_BASE_DIR.iterdir(): if item.is_dir() and not item.name.startswith('.'): git_dir = item / ".git" if git_dir.exists(): # This is a git repository repo_name = item.name self.repos[repo_name] = { "path": str(item), "description": f"Repository at {item.relative_to(REPOS_BASE_DIR)}" } logger.info(f"Discovered {len(self.repos)} repositories in {REPOS_BASE_DIR}") except Exception as e: logger.error(f"Error discovering repositories: {e}", exc_info=True) self.repos = {} def get_repo_path(self, repo_name: str) -> Path: """Get the path to a repository. Repository name is required.""" if not repo_name: raise ValueError("Repository name is required") if repo_name not in self.repos: raise ValueError(f"Unknown repository: {repo_name}") return Path(self.repos[repo_name]["path"]) def _validate_branch_name(self, branch: str) -> None: """Validate branch name to prevent path traversal""" if not branch: raise ValueError("Branch name cannot be empty") if '..' in branch: raise ValueError("Branch name cannot contain '..'") if branch.startswith('/'): raise ValueError("Branch name cannot be absolute path") if '\0' in branch: raise ValueError("Branch name cannot contain null bytes") async def _ensure_worktree(self, repo_name: str, branch: str, base_path: Path) -> Path: """Ensure worktree exists for given repo and branch""" # Sanitize branch name for filesystem safe_branch = branch.replace('/', '-').replace('\\', '-') worktree_name = f".{repo_name}@{safe_branch}" worktree_path = base_path.parent / worktree_name lock_key = f"{repo_name}/{branch}" async with self.worktree_locks[lock_key]: if not worktree_path.exists(): logger.info(f"Creating worktree for {repo_name}:{branch} at {worktree_path}") # First fetch to ensure branch exists try: await self._run_git_simple(["git", "fetch", "origin"], cwd=base_path) except Exception as e: logger.warning(f"Failed to fetch before creating worktree: {e}") # Create worktree (path relative to base for git worktree add) rel_path = os.path.relpath(worktree_path, base_path) try: await self._run_git_simple( ["git", "worktree", "add", rel_path, f"origin/{branch}"], cwd=base_path ) except Exception as e: raise RuntimeError(f"Failed to create worktree for {branch}: {e}") else: # Worktree exists, fetch and pull to update logger.info(f"Updating worktree for {repo_name}:{branch}") try: await self._run_git_simple(["git", "fetch", "origin"], cwd=worktree_path) # Try to pull, but don't fail if it has local changes try: await self._run_git_simple(["git", "pull", "origin", branch], cwd=worktree_path) except Exception as e: logger.warning( f"WARNING: Failed to pull {branch} in worktree. " f"You may have uncommitted changes or build products. " f"Error: {e}\n" f"Continuing with existing state. Use 'git' tool to inspect or reset manually." ) except Exception as e: logger.warning(f"Failed to update worktree {worktree_path}: {e}") return worktree_path async def _run_git_simple(self, cmd: List[str], cwd: Path) -> None: """Run a git command without capturing output (for setup operations)""" process = await asyncio.create_subprocess_exec( *cmd, cwd=str(cwd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: error_msg = stderr.decode('utf-8', errors='replace').strip() raise RuntimeError(f"Command failed: {' '.join(cmd)}\n{error_msg}") async def get_working_path(self, repo_name: str, branch: str | None) -> Path: """Get working directory for repo and branch If branch is specified, ALWAYS uses a worktree for isolation. If branch is None, uses base repo (backward compatible). """ base_path = self.get_repo_path(repo_name) # If no branch specified, use base repo if not branch: return base_path # Validate branch name self._validate_branch_name(branch) # Always use worktree when branch is specified for consistency and isolation return await self._ensure_worktree(repo_name, branch, base_path) def _get_lock_key(self, repo_name: str, branch: str | None) -> str: """Get lock key for repo@branch""" if branch: return f"{repo_name}@{branch}" return repo_name async def execute_in_worktree(self, repo_name: str, branch: str | None, cmd: List[str]) -> str: """Execute command in repo worktree with exclusive lock for repo@branch""" lock_key = self._get_lock_key(repo_name, branch) async with self.worktree_locks[lock_key]: # Get working path (may create/update worktree) work_path = await self.get_working_path(repo_name, branch) # Execute command return await self.run_command(cmd, cwd=work_path) async def get_tools_list(self) -> List[Tool]: """List available MCP tools""" return [ Tool( name="list", description="List available repositories discovered in the current directory", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="make", description="Run make command with specified arguments. " "Executes make in the root of the specified repository. " "If branch is specified, creates/uses a hidden worktree (.repo@branch) for isolation.", inputSchema={ "type": "object", "properties": { "args": { "type": "string", "description": "Arguments to pass to make (e.g., 'clean', 'all', 'test')" }, "repo": { "type": "string", "description": "Repository name (required)" }, "branch": { "type": "string", "description": "Git branch name (optional). If provided, uses isolated worktree." } }, "required": ["repo"] } ), Tool( name="git", description="Run git commands in a repository. " "Limited to safe operations: status, log, checkout, pull, branch, diff, fetch, reset, show. " "If branch is specified, creates/uses a hidden worktree (.repo@branch) for isolation.", inputSchema={ "type": "object", "properties": { "args": { "type": "string", "description": "Git command and arguments (e.g., 'status', 'checkout main', 'pull origin main')" }, "repo": { "type": "string", "description": "Repository name (required)" }, "branch": { "type": "string", "description": "Git branch name (optional). If provided, uses isolated worktree." } }, "required": ["args", "repo"] } ), Tool( name="ls", description="List files and directories in a repository. " "Limited to paths within the repository to prevent path traversal. " "If branch is specified, creates/uses a hidden worktree (.repo@branch) for isolation.", inputSchema={ "type": "object", "properties": { "args": { "type": "string", "description": "Arguments to pass to ls (e.g., '-la', '-lh build/')" }, "repo": { "type": "string", "description": "Repository name (required)" }, "branch": { "type": "string", "description": "Git branch name (optional). If provided, uses isolated worktree." } }, "required": ["repo"] } ), Tool( name="env", description="Show environment information including environment variables " "and versions of key build tools (gcc, g++, python, make, cmake, etc.). " "If branch is specified, creates/uses a hidden worktree (.repo@branch) for isolation.", inputSchema={ "type": "object", "properties": { "repo": { "type": "string", "description": "Repository name (required)" }, "branch": { "type": "string", "description": "Git branch name (optional). If provided, uses isolated worktree." } }, "required": ["repo"] } ), Tool( name="read_file", description="Read the contents of a file in a repository. " "Supports reading specific line ranges for large files. " "If branch is specified, creates/uses a hidden worktree (.repo@branch) for isolation.", inputSchema={ "type": "object", "properties": { "repo": { "type": "string", "description": "Repository name (required)" }, "path": { "type": "string", "description": "Absolute path to the file to read (required)" }, "start_line": { "type": "integer", "description": "Starting line number (1-indexed, optional). If provided, only lines from start_line to end_line will be returned." }, "end_line": { "type": "integer", "description": "Ending line number (1-indexed, optional). If provided, only lines from start_line to end_line will be returned." }, "branch": { "type": "string", "description": "Git branch name (optional). If provided, uses isolated worktree." } }, "required": ["repo", "path"] } ) ] async def execute_tool(self, name: str, arguments: Any) -> List[TextContent]: """Handle tool execution""" try: if name == "list": return await self.handle_list() elif name == "make": return await self.handle_make(arguments) elif name == "git": return await self.handle_git(arguments) elif name == "ls": return await self.handle_ls(arguments) elif name == "env": return await self.handle_env(arguments) elif name == "read_file": return await self.handle_read_file(arguments) else: raise ValueError(f"Unknown tool: {name}") except Exception as e: logger.error(f"Error executing tool {name}: {e}", exc_info=True) return [TextContent(type="text", text=f"Error: {str(e)}")] async def handle_list(self) -> List[TextContent]: """Handle list command""" if not self.repos: return [TextContent(type="text", text="No repositories configured")] output = "Available repositories:\n\n" for name, info in self.repos.items(): output += f"- {name}\n" output += f" Path: {info.get('path', 'N/A')}\n" output += f" Description: {info.get('description', 'N/A')}\n\n" return [TextContent(type="text", text=output)] async def handle_make(self, args: Dict[str, Any]) -> List[TextContent]: """Handle make command""" repo = args.get("repo") branch = args.get("branch") make_args = args.get("args", "") # Validate arguments validate_make_args(make_args) # Build command cmd = ["make"] if make_args: cmd.extend(shlex.split(make_args)) # Execute in appropriate worktree with locking result = await self.execute_in_worktree(repo, branch, cmd) return [TextContent(type="text", text=result)] async def handle_git(self, args: Dict[str, Any]) -> List[TextContent]: """Handle git command""" repo = args.get("repo") branch = args.get("branch") git_args = args.get("args", "") if not git_args: raise ValueError("git command requires arguments") # Validate arguments validate_git_args(git_args) # Build command cmd = ["git"] + shlex.split(git_args) # Execute in appropriate worktree with locking result = await self.execute_in_worktree(repo, branch, cmd) return [TextContent(type="text", text=result)] async def handle_ls(self, args: Dict[str, Any]) -> List[TextContent]: """Handle ls command""" repo = args.get("repo") branch = args.get("branch") ls_args = args.get("args", "") # Validate arguments validate_ls_args(ls_args) # Build command cmd = ["ls"] if ls_args: cmd.extend(shlex.split(ls_args)) # Execute in appropriate worktree with locking result = await self.execute_in_worktree(repo, branch, cmd) return [TextContent(type="text", text=result)] async def handle_env(self, args: Dict[str, Any]) -> List[TextContent]: """Handle env command""" repo = args.get("repo") branch = args.get("branch") # Execute env info script if not ENV_INFO_SCRIPT.exists(): raise FileNotFoundError(f"Environment info script not found: {ENV_INFO_SCRIPT}") # Execute in appropriate worktree with locking result = await self.execute_in_worktree(repo, branch, [str(ENV_INFO_SCRIPT)]) return [TextContent(type="text", text=result)] async def handle_read_file(self, args: Dict[str, Any]) -> List[TextContent]: """Handle read_file command""" repo = args.get("repo") branch = args.get("branch") file_path = args.get("path") start_line = args.get("start_line") end_line = args.get("end_line") if not file_path: raise ValueError("File path is required") # Get working path with locking lock_key = self._get_lock_key(repo, branch) async with self.worktree_locks[lock_key]: repo_path = await self.get_working_path(repo, branch) # Validate and resolve the file path validated_path = validate_file_path(file_path, repo_path) # Read the file try: with open(validated_path, 'r', encoding='utf-8', errors='replace') as f: if start_line is not None or end_line is not None: # Read specific line range lines = f.readlines() total_lines = len(lines) # Validate line numbers if start_line is not None and start_line < 1: raise ValueError(f"start_line must be >= 1, got {start_line}") if end_line is not None and end_line < 1: raise ValueError(f"end_line must be >= 1, got {end_line}") if start_line is not None and end_line is not None and start_line > end_line: raise ValueError(f"start_line ({start_line}) must be <= end_line ({end_line})") # Default values start_idx = (start_line - 1) if start_line is not None else 0 end_idx = end_line if end_line is not None else total_lines # Clamp to valid range start_idx = max(0, min(start_idx, total_lines)) end_idx = max(0, min(end_idx, total_lines)) # Extract the requested lines selected_lines = lines[start_idx:end_idx] # Format output with line numbers output = f"File: {validated_path}\n" output += f"Lines {start_idx + 1}-{end_idx} of {total_lines}\n" output += "=" * 80 + "\n" for i, line in enumerate(selected_lines, start=start_idx + 1): output += f"{i:6d}: {line.rstrip()}\n" return [TextContent(type="text", text=output)] else: # Read entire file content = f.read() lines_count = content.count('\n') + (1 if content and not content.endswith('\n') else 0) output = f"File: {validated_path}\n" output += f"Total lines: {lines_count}\n" output += "=" * 80 + "\n" # Add line numbers to entire file for i, line in enumerate(content.splitlines(), start=1): output += f"{i:6d}: {line}\n" return [TextContent(type="text", text=output)] except FileNotFoundError: raise FileNotFoundError(f"File not found: {validated_path}") except PermissionError: raise PermissionError(f"Permission denied reading file: {validated_path}") except Exception as e: raise Exception(f"Error reading file {validated_path}: {str(e)}") async def run_command(self, cmd: List[str], cwd: Path) -> str: """Run a command in a repository directory""" try: # Ensure working directory exists if not cwd.exists(): raise FileNotFoundError(f"Repository path does not exist: {cwd}") logger.info(f"Executing command: {' '.join(cmd)} in {cwd}") # Run command process = await asyncio.create_subprocess_exec( *cmd, cwd=str(cwd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() # Format output output = [] if stdout: output.append("=== STDOUT ===") output.append(stdout.decode('utf-8', errors='replace')) if stderr: output.append("=== STDERR ===") output.append(stderr.decode('utf-8', errors='replace')) if process.returncode != 0: output.append(f"\n=== EXIT CODE: {process.returncode} ===") return "\n".join(output) if output else "(no output)" except Exception as e: logger.error(f"Command execution failed: {e}", exc_info=True) raise async def run_command_streaming(self, cmd: List[str], cwd: Path): """Run a command with line-by-line streaming of output""" try: # Ensure working directory exists if not cwd.exists(): raise FileNotFoundError(f"Repository path does not exist: {cwd}") logger.info(f"Executing streaming command: {' '.join(cmd)} in {cwd}") # Run command process = await asyncio.create_subprocess_exec( *cmd, cwd=str(cwd), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Process lines as they arrive from both streams stdout_done = False stderr_done = False while not (stdout_done and stderr_done): # Check stdout if not stdout_done: try: line = await asyncio.wait_for(process.stdout.readline(), timeout=0.01) if line: decoded_line = line.decode('utf-8', errors='replace').rstrip() if decoded_line: yield {"type": "stdout", "line": decoded_line} else: stdout_done = True except asyncio.TimeoutError: pass # Check stderr if not stderr_done: try: line = await asyncio.wait_for(process.stderr.readline(), timeout=0.01) if line: decoded_line = line.decode('utf-8', errors='replace').rstrip() if decoded_line: yield {"type": "stderr", "line": decoded_line} else: stderr_done = True except asyncio.TimeoutError: pass # Small sleep to prevent busy waiting if not (stdout_done and stderr_done): await asyncio.sleep(0.01) # Wait for process to complete await process.wait() # Yield completion message yield { "type": "complete", "exit_code": process.returncode } except Exception as e: logger.error(f"Streaming command execution failed: {e}", exc_info=True) yield { "type": "error", "message": str(e) } async def run(self): """Start the MCP server with stdio transport""" await self.discover_repos() async with stdio_server() as (read_stream, write_stream): logger.info("MCP Build Server starting with stdio transport...") await self.server.run( read_stream, write_stream, self.server.create_initialization_options() ) async def handle_sse(self, request): """Handle SSE endpoint for HTTP transport""" # Verify authentication if not verify_session_key(request): logger.warning(f"Unauthorized SSE connection attempt from {request.client.host}") return JSONResponse( {"error": "Unauthorized", "message": "Invalid or missing session key"}, status_code=401, headers={"WWW-Authenticate": "Bearer"} ) logger.info(f"SSE connection established from {request.client.host}") sse_transport = SseServerTransport("/messages") async with sse_transport.connect_sse( request.scope, request.receive, request._send ) as (read_stream, write_stream): init_options = self.server.create_initialization_options() await self.server.run(read_stream, write_stream, init_options) # SSE connection has been handled through the transport's send callback # We don't need to return a response as the ASGI protocol was handled directly # However, to satisfy Starlette's routing, we should not return None # The connection is already closed by the context manager above return None # REST API Endpoints for Quick Operations async def api_list_repos(self, request: Request): """GET /api/repos - List all repositories""" if not verify_session_key(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: repos = [ RepoInfo( name=name, path=info["path"], description=info.get("description", ""), is_default=False # No default repository ) for name, info in self.repos.items() ] response = ReposListResponse(repos=repos) return JSONResponse(response.model_dump()) except Exception as e: logger.error(f"Error listing repos: {e}", exc_info=True) return JSONResponse({"error": str(e)}, status_code=500) async def api_get_env(self, request: Request): """GET /api/repos/{repo}/env - Get environment info""" if not verify_session_key(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: repo = request.path_params.get("repo") # For GET endpoint, branch can be passed as query param branch = request.query_params.get("branch") if not ENV_INFO_SCRIPT.exists(): raise FileNotFoundError(f"Environment info script not found: {ENV_INFO_SCRIPT}") # Execute in appropriate worktree with locking result = await self.execute_in_worktree(repo, branch, [str(ENV_INFO_SCRIPT)]) return JSONResponse({"success": True, "data": result}) except Exception as e: logger.error(f"Error getting env: {e}", exc_info=True) return JSONResponse({"success": False, "error": str(e)}, status_code=500) async def api_ls(self, request: Request): """POST /api/repos/{repo}/ls - List directory contents""" if not verify_session_key(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: repo = request.path_params.get("repo") body = await request.json() ls_request = LsRequest(**body) # Validate arguments validate_ls_args(ls_request.args) # Build command cmd = ["ls"] if ls_request.args: cmd.extend(shlex.split(ls_request.args)) # Execute in appropriate worktree with locking result = await self.execute_in_worktree(repo, ls_request.branch, cmd) return JSONResponse({"success": True, "data": result}) except Exception as e: logger.error(f"Error running ls: {e}", exc_info=True) return JSONResponse({"success": False, "error": str(e)}, status_code=500) async def api_read_file(self, request: Request): """POST /api/repos/{repo}/read_file - Read file contents""" if not verify_session_key(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: repo = request.path_params.get("repo") body = await request.json() read_file_request = ReadFileRequest(**body) # Get working path with locking lock_key = self._get_lock_key(repo, read_file_request.branch) async with self.worktree_locks[lock_key]: repo_path = await self.get_working_path(repo, read_file_request.branch) # Validate and resolve the file path validated_path = validate_file_path(read_file_request.path, repo_path) # Read the file (reuse the logic from handle_read_file) with open(validated_path, 'r', encoding='utf-8', errors='replace') as f: if read_file_request.start_line is not None or read_file_request.end_line is not None: # Read specific line range lines = f.readlines() total_lines = len(lines) # Validate line numbers if read_file_request.start_line is not None and read_file_request.start_line < 1: raise ValueError(f"start_line must be >= 1, got {read_file_request.start_line}") if read_file_request.end_line is not None and read_file_request.end_line < 1: raise ValueError(f"end_line must be >= 1, got {read_file_request.end_line}") if (read_file_request.start_line is not None and read_file_request.end_line is not None and read_file_request.start_line > read_file_request.end_line): raise ValueError( f"start_line ({read_file_request.start_line}) must be <= " f"end_line ({read_file_request.end_line})" ) # Default values start_idx = (read_file_request.start_line - 1) if read_file_request.start_line is not None else 0 end_idx = read_file_request.end_line if read_file_request.end_line is not None else total_lines # Clamp to valid range start_idx = max(0, min(start_idx, total_lines)) end_idx = max(0, min(end_idx, total_lines)) # Extract the requested lines selected_lines = lines[start_idx:end_idx] # Format output with line numbers output = f"File: {validated_path}\n" output += f"Lines {start_idx + 1}-{end_idx} of {total_lines}\n" output += "=" * 80 + "\n" for i, line in enumerate(selected_lines, start=start_idx + 1): output += f"{i:6d}: {line.rstrip()}\n" return JSONResponse({"success": True, "data": output}) else: # Read entire file content = f.read() lines_count = content.count('\n') + (1 if content and not content.endswith('\n') else 0) output = f"File: {validated_path}\n" output += f"Total lines: {lines_count}\n" output += "=" * 80 + "\n" # Add line numbers to entire file for i, line in enumerate(content.splitlines(), start=1): output += f"{i:6d}: {line}\n" return JSONResponse({"success": True, "data": output}) except FileNotFoundError as e: logger.error(f"File not found: {e}", exc_info=True) return JSONResponse({"success": False, "error": str(e)}, status_code=404) except ValueError as e: logger.error(f"Validation error: {e}", exc_info=True) return JSONResponse({"success": False, "error": str(e)}, status_code=400) except Exception as e: logger.error(f"Error reading file: {e}", exc_info=True) return JSONResponse({"success": False, "error": str(e)}, status_code=500) async def api_git_quick(self, request: Request): """POST /api/repos/{repo}/git/quick - Quick git operations (status, branch, log)""" if not verify_session_key(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: repo = request.path_params.get("repo") body = await request.json() git_request = GitQuickRequest(**body) # Only allow quick operations if git_request.operation not in ["status", "branch", "log"]: return JSONResponse( {"success": False, "error": f"Operation '{git_request.operation}' not allowed for quick endpoint. Use /stream/repos/{{repo}}/git instead."}, status_code=400 ) # Build git arguments git_args = git_request.operation if git_request.args: git_args += f" {git_request.args}" # Validate arguments validate_git_args(git_args) # Build command cmd = ["git"] + shlex.split(git_args) # Execute in appropriate worktree with locking result = await self.execute_in_worktree(repo, git_request.branch, cmd) return JSONResponse({"success": True, "data": result}) except Exception as e: logger.error(f"Error running git: {e}", exc_info=True) return JSONResponse({"success": False, "error": str(e)}, status_code=500) # Streaming SSE Endpoints for Long Operations async def stream_make(self, request: Request): """POST /stream/repos/{repo}/make - Stream make output in real-time""" if not verify_session_key(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: repo = request.path_params.get("repo") body = await request.json() make_request = MakeStreamRequest(**body) # Validate arguments validate_make_args(make_request.args) # Build command cmd = ["make"] if make_request.args: cmd.extend(shlex.split(make_request.args)) # Stream execution with locking lock_key = self._get_lock_key(repo, make_request.branch) async def event_generator(): """Generate SSE events from command output""" try: async with self.worktree_locks[lock_key]: # Get working path (may create/update worktree) repo_path = await self.get_working_path(repo, make_request.branch) # Stream command output async for chunk in self.run_command_streaming(cmd, cwd=repo_path): yield f"data: {json.dumps(chunk)}\n\n" except Exception as e: logger.error(f"Error in stream: {e}", exc_info=True) yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Disable nginx buffering } ) except Exception as e: logger.error(f"Error setting up make stream: {e}", exc_info=True) return JSONResponse({"success": False, "error": str(e)}, status_code=500) async def stream_git(self, request: Request): """POST /stream/repos/{repo}/git - Stream git operation output in real-time""" if not verify_session_key(request): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: repo = request.path_params.get("repo") body = await request.json() git_request = GitStreamRequest(**body) # Build git arguments git_args = git_request.operation if git_request.args: git_args += f" {git_request.args}" # Validate arguments validate_git_args(git_args) # Build command cmd = ["git"] + shlex.split(git_args) # Stream execution with locking lock_key = self._get_lock_key(repo, git_request.branch) async def event_generator(): """Generate SSE events from command output""" try: async with self.worktree_locks[lock_key]: # Get working path (may create/update worktree) repo_path = await self.get_working_path(repo, git_request.branch) # Stream command output async for chunk in self.run_command_streaming(cmd, cwd=repo_path): yield f"data: {json.dumps(chunk)}\n\n" except Exception as e: logger.error(f"Error in stream: {e}", exc_info=True) yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) except Exception as e: logger.error(f"Error setting up git stream: {e}", exc_info=True) return JSONResponse({"success": False, "error": str(e)}, status_code=500) async def serve_documentation(self, request: Request): """GET /mcp-build.md?key=SESSION_KEY - Serve dynamically generated documentation""" # Documentation endpoint requires authentication if not verify_session_key(request): logger.warning(f"Unauthorized documentation access attempt from {request.client.host}") return PlainTextResponse("Unauthorized", status_code=403) try: doc_path = Path(__file__).parent / "MCP-BUILD.md" if not doc_path.exists(): return PlainTextResponse( "Documentation not found. Please see README.md in the mcp-build repository.", status_code=404 ) # Read base documentation content = doc_path.read_text() # Extract session key from request session_key = request.query_params.get("key", "") if not session_key and self.session_key: # Fall back to session_key from self if not in query params session_key = self.session_key # Construct SSE endpoint URL dynamically # Use HTTPS scheme (service is always deployed behind HTTPS proxy) host = request.headers.get("host", f"{request.url.hostname}:{request.url.port}") sse_endpoint = f"https://{host}/sse?key={session_key}" # Inject connection information at the top of the document connection_header = f"""# MCP Build Service **Connect to this service:** `{sse_endpoint}` --- """ # Replace the first heading with our dynamic version # Find the first line starting with "# " and replace it with our header lines = content.split('\n') new_lines = [] header_replaced = False for line in lines: if not header_replaced and line.startswith('# '): # Skip the original title, we'll use our dynamic header new_lines.append(connection_header.rstrip()) header_replaced = True else: new_lines.append(line) dynamic_content = '\n'.join(new_lines) return PlainTextResponse( dynamic_content, media_type="text/markdown", headers={ "Cache-Control": "private, no-cache", # Don't cache personalized content } ) except Exception as e: logger.error(f"Error serving documentation: {e}", exc_info=True) return PlainTextResponse(f"Error loading documentation: {str(e)}", status_code=500) def create_http_app(self): """Create Starlette ASGI application for HTTP transport""" routes = [ # Documentation endpoint (public, no auth required) Route("/mcp-build.md", endpoint=self.serve_documentation, methods=["GET"]), # MCP Protocol endpoint (backwards compatibility) Route("/sse", endpoint=self.handle_sse, methods=["GET"]), # REST API endpoints for quick operations Route("/api/repos", endpoint=self.api_list_repos, methods=["GET"]), Route("/api/repos/{repo}/env", endpoint=self.api_get_env, methods=["GET"]), Route("/api/repos/{repo}/ls", endpoint=self.api_ls, methods=["POST"]), Route("/api/repos/{repo}/read_file", endpoint=self.api_read_file, methods=["POST"]), Route("/api/repos/{repo}/git/quick", endpoint=self.api_git_quick, methods=["POST"]), # Streaming SSE endpoints for long operations Route("/stream/repos/{repo}/make", endpoint=self.stream_make, methods=["POST"]), Route("/stream/repos/{repo}/git", endpoint=self.stream_git, methods=["POST"]), ] middleware = [ Middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) ] return Starlette(debug=True, routes=routes, middleware=middleware) async def run_http(self): """Start the MCP server with HTTP transport""" await self.discover_repos() app = self.create_http_app() # Use external hostname for display (not bind address) display_host = EXTERNAL_HOST if EXTERNAL_HOST else HTTP_HOST logger.info(f"MCP Build Server starting with HTTP transport on {HTTP_HOST}:{HTTP_PORT}") logger.info(f"") logger.info(f"Documentation: http://{display_host}:{HTTP_PORT}/mcp-build.md") logger.info(f"") logger.info(f"Available endpoints:") logger.info(f" MCP Protocol (SSE): http://{display_host}:{HTTP_PORT}/sse") logger.info(f"") logger.info(f" REST API (Quick Operations):") logger.info(f" GET /api/repos - List repositories") logger.info(f" GET /api/repos/{{repo}}/env - Get environment info") logger.info(f" POST /api/repos/{{repo}}/ls - List directory contents") logger.info(f" POST /api/repos/{{repo}}/read_file - Read file contents") logger.info(f" POST /api/repos/{{repo}}/git/quick - Quick git operations (status, branch, log)") logger.info(f"") logger.info(f" Streaming API (Long Operations):") logger.info(f" POST /stream/repos/{{repo}}/make - Stream make output") logger.info(f" POST /stream/repos/{{repo}}/git - Stream git operations (pull, fetch, diff, show, checkout)") logger.info(f"") if SESSION_KEY: logger.info("Authentication: Session key required") logger.info(" Header: Authorization: Bearer <session-key>") logger.info(" Or query param: ?key=<session-key>") logger.info("") logger.info("=" * 80) logger.info("Copy-paste this message to AI agents:") logger.info("") print(f"\n\nUse the MCP build service http://{display_host}:{HTTP_PORT}/mcp-build.md\nSession key: {SESSION_KEY}\n\n") logger.info("=" * 80) else: logger.warning("Authentication: DISABLED (no session key configured)") config = uvicorn.Config( app, host=HTTP_HOST, port=HTTP_PORT, log_level="info" ) server = uvicorn.Server(config) await server.serve() def detect_public_ip(timeout=5): """Detect public IP address using external service""" services = [ "https://api.ipify.org", "https://checkip.amazonaws.com", "https://icanhazip.com" ] for service in services: try: logger.info(f"Detecting public IP from {service}...") with urllib.request.urlopen(service, timeout=timeout) as response: ip = response.read().decode('utf-8').strip() logger.info(f"Detected public IP: {ip}") return ip except Exception as e: logger.debug(f"Failed to get IP from {service}: {e}") continue logger.warning("Could not detect public IP address") return None def setup_signal_handlers(): """Set up signal handlers for immediate shutdown""" def signal_handler(signum, frame): """Handle shutdown signals in a signal-safe way""" sig_name = signal.Signals(signum).name # Use os.write for signal-safe output (logger is not signal-safe) msg = f"\nReceived {sig_name}, shutting down...\n".encode('utf-8') os.write(sys.stderr.fileno(), msg) # Stop the event loop gracefully instead of forcing sys.exit(0) try: loop = asyncio.get_running_loop() loop.stop() except RuntimeError: # No event loop running, exit immediately sys.exit(0) # Register handlers for SIGINT (Ctrl+C) and SIGTERM signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def parse_args(): """Parse command-line arguments""" parser = argparse.ArgumentParser( description="MCP Build Service - Provides build environment access via MCP protocol", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Start with default stdio transport %(prog)s # Start with HTTP transport (auto-generates session key) %(prog)s --transport http # Start with HTTP transport and custom session key %(prog)s --transport http --session-key my-secret-key # Start with HTTP transport and specify external hostname for display %(prog)s --transport http --external-host example.com # Start with HTTP and auto-detect public IP (useful behind NAT/router) %(prog)s --transport http --detect-public-ip # Combine: use public IP detection with custom session key %(prog)s --transport http --detect-public-ip --session-key my-key # Use persistent session key with file-based storage # First run: generates and saves key to file # Subsequent runs: loads existing key from file %(prog)s --transport http --key-file ~/.mcp-build-key # Override with specific session key (and save to file) %(prog)s --transport http --session-key my-secret --key-file ~/.mcp-build-key """ ) parser.add_argument( "--transport", choices=["stdio", "http"], default="stdio", help="Transport mode (default: stdio)" ) parser.add_argument( "--host", default="0.0.0.0", help="Host address for HTTP transport (default: 0.0.0.0)" ) parser.add_argument( "--port", type=int, default=3344, help="Port for HTTP transport (default: 3344)" ) parser.add_argument( "--external-host", help="External hostname/IP for documentation URLs (default: auto-detected from system hostname)" ) parser.add_argument( "--detect-public-ip", action="store_true", help="Auto-detect public IP address using external service (useful when behind NAT/router)" ) parser.add_argument( "--session-key", help="Session key for HTTP authentication (overrides key from --key-file if both provided)" ) parser.add_argument( "--generate-key", action="store_true", help="Auto-generate a random session key for HTTP transport" ) parser.add_argument( "--key-file", help="File path for persistent session key storage (reads existing key or generates new one)" ) return parser.parse_args() async def main(): """Main async entry point""" global TRANSPORT_MODE, HTTP_HOST, HTTP_PORT, EXTERNAL_HOST, SESSION_KEY # Parse command-line arguments args = parse_args() # Set configuration from arguments TRANSPORT_MODE = args.transport.lower() HTTP_HOST = args.host HTTP_PORT = args.port # Set external hostname for display purposes if args.external_host: EXTERNAL_HOST = args.external_host elif args.detect_public_ip: # Detect public IP address EXTERNAL_HOST = detect_public_ip() if not EXTERNAL_HOST: # Fallback to local hostname if detection fails try: EXTERNAL_HOST = socket.gethostname() except Exception: EXTERNAL_HOST = "localhost" else: # Auto-detect local hostname try: EXTERNAL_HOST = socket.gethostname() except Exception: EXTERNAL_HOST = "localhost" # Handle session key with file-based persistence import os key_from_file = None # Try to read existing key from file if --key-file is provided if args.key_file: key_path = os.path.expanduser(args.key_file) if os.path.exists(key_path): try: with open(key_path, 'r') as f: key_from_file = f.read().strip() if key_from_file: logger.info(f"Loaded existing session key from: {key_path}") except Exception as e: logger.warning(f"Failed to read session key from {key_path}: {e}") key_from_file = None # Determine session key (priority: explicit arg > file > generate) if args.session_key: # Explicit session key takes highest priority SESSION_KEY = args.session_key logger.info("Using session key from command-line argument") elif key_from_file: # Use existing key from file SESSION_KEY = key_from_file elif args.generate_key or TRANSPORT_MODE == "http": # Generate a new key if in HTTP mode and no key provided if TRANSPORT_MODE == "http": SESSION_KEY = secrets.token_urlsafe(32) logger.warning("=" * 80) logger.warning("Generated NEW session key for HTTP transport:") logger.warning(f" {SESSION_KEY}") logger.warning("") if args.key_file: logger.warning(f"Key will be persisted to: {args.key_file}") else: logger.warning("To reuse this key, start the server with:") logger.warning(f" mcp-build --transport http --session-key {SESSION_KEY}") logger.warning("=" * 80) # Write session key to file if requested (whether read, provided, or generated) if args.key_file and SESSION_KEY: try: key_path = os.path.expanduser(args.key_file) # Only write if key changed or file doesn't exist should_write = True if os.path.exists(key_path): try: with open(key_path, 'r') as f: existing_key = f.read().strip() if existing_key == SESSION_KEY: should_write = False # Key unchanged, no need to write except Exception: pass # Write anyway if we can't read if should_write: with open(key_path, 'w') as f: f.write(SESSION_KEY) os.chmod(key_path, 0o600) # Set to user-read-write only logger.info(f"Session key written to: {key_path}") except Exception as e: logger.error(f"Failed to write session key to {args.key_file}: {e}") sys.exit(1) # Set up signal handlers for immediate shutdown setup_signal_handlers() # Create and run server server = BuildEnvironmentServer() try: if TRANSPORT_MODE == "http": await server.run_http() else: # Default to stdio await server.run() except KeyboardInterrupt: # This will be caught if the signal handler doesn't stop the loop first logger.info("\nShutdown complete") except Exception as e: logger.error(f"Server error: {e}", exc_info=True) sys.exit(1) def cli(): """Synchronous entry point for setuptools console_scripts""" asyncio.run(main()) if __name__ == "__main__": cli()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jbroll/mcp-build'

If you have feedback or need assistance with the MCP directory API, please join our Discord server