Skip to main content
Glama
git.py7.1 kB
""" Git repository management tools for Pwno MCP Provides tools for fetching specific versions of git repositories. Useful for analyzing specific vulnerable versions of software or libraries. """ import logging import os import shutil import tempfile from typing import Dict, Any, Optional import subprocess logger = logging.getLogger(__name__) class GitTools: """Tools for git repository operations""" def __init__(self): """ Initialize git tools. Sets up a workspace directory for cloned repositories. """ self.workspace_dir = os.path.join(tempfile.gettempdir(), "project") os.makedirs(self.workspace_dir, exist_ok=True) logger.info(f"Git workspace initialized at: {self.workspace_dir}") def fetch_repo(self, repo_url: str, version: Optional[str] = None, target_dir: Optional[str] = None, shallow: bool = True) -> Dict[str, Any]: """ Fetch a specific version of a git repository. :param repo_url: Git repository URL (https or ssh) :param version: Specific version to checkout (branch/tag/commit hash). If None, uses default branch :param target_dir: Target directory name. If None, derives from repo URL :param shallow: Whether to perform shallow clone (faster for large repos) :returns: Dictionary with fetch results including path and status """ try: # Extract repo name if target_dir not specified if not target_dir: repo_name = repo_url.rstrip('/').split('/')[-1] if repo_name.endswith('.git'): repo_name = repo_name[:-4] target_dir = repo_name # Full path for the repository repo_path = os.path.join(self.workspace_dir, target_dir) # Check if repo already exists if os.path.exists(repo_path): logger.info(f"Repository already exists at {repo_path}, removing...") shutil.rmtree(repo_path) # Clone the repository clone_cmd = ["git", "clone"] if shallow and version: # For specific versions, we need full clone to checkout clone_cmd.extend(["--depth", "1"]) clone_cmd.extend([repo_url, repo_path]) logger.info(f"Cloning repository: {repo_url}") result = subprocess.run( clone_cmd, capture_output=True, text=True, timeout=300 # 5 minute timeout for large repos ) if result.returncode != 0: return { "success": False, "error": "Clone failed", "stderr": result.stderr, "command": " ".join(clone_cmd) } # Checkout specific version if requested checkout_info = None if version: logger.info(f"Checking out version: {version}") # First, fetch the specific ref if needed fetch_result = subprocess.run( ["git", "fetch", "origin", version], cwd=repo_path, capture_output=True, text=True ) # Try to checkout the version checkout_result = subprocess.run( ["git", "checkout", version], cwd=repo_path, capture_output=True, text=True ) if checkout_result.returncode != 0: # Try as a remote branch checkout_result = subprocess.run( ["git", "checkout", "-b", version, f"origin/{version}"], cwd=repo_path, capture_output=True, text=True ) if checkout_result.returncode != 0: return { "success": False, "error": "Checkout failed", "stderr": checkout_result.stderr, "clone_success": True, "path": repo_path } checkout_info = checkout_result.stdout.strip() # Get current commit info commit_result = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=repo_path, capture_output=True, text=True ) current_commit = commit_result.stdout.strip() if commit_result.returncode == 0 else "unknown" # Get branch info branch_result = subprocess.run( ["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=repo_path, capture_output=True, text=True ) current_branch = branch_result.stdout.strip() if branch_result.returncode == 0 else "unknown" return { "success": True, "path": repo_path, "repo_url": repo_url, "version": version or "default", "current_commit": current_commit, "current_branch": current_branch, "checkout_info": checkout_info, "workspace": self.workspace_dir } except subprocess.TimeoutExpired: return { "success": False, "error": "Operation timed out", "timeout": 300 } except Exception as e: logger.error(f"Git fetch error: {str(e)}") return { "success": False, "error": str(e), "type": type(e).__name__ } def cleanup_workspace(self) -> Dict[str, Any]: """ Clean up the git workspace directory. :returns: Dictionary with cleanup status """ try: if os.path.exists(self.workspace_dir): shutil.rmtree(self.workspace_dir) os.makedirs(self.workspace_dir, exist_ok=True) return { "success": True, "message": "Workspace cleaned", "path": self.workspace_dir } else: return { "success": True, "message": "Workspace already clean", "path": self.workspace_dir } except Exception as e: logger.error(f"Cleanup error: {str(e)}") return { "success": False, "error": str(e) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/0xSensei/pwno-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server