Skip to main content
Glama
repository_manager.py•19.3 kB
"""Git repository management for skills repositories.""" import json import logging import os import re import shutil import tempfile from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional from urllib.parse import urlparse import git logger = logging.getLogger(__name__) @dataclass class Repository: """Repository metadata. Attributes: id: Unique repository identifier url: Git repository URL local_path: Path to local clone priority: Priority for skill selection (higher = preferred) last_updated: Timestamp of last update skill_count: Number of skills in repository license: Repository license (MIT, Apache-2.0, etc.) """ id: str url: str local_path: Path priority: int last_updated: datetime skill_count: int license: str def to_dict(self) -> dict[str, Any]: """Convert Repository to dictionary for JSON serialization. Returns: Dictionary with all fields, Path and datetime converted to strings """ data = asdict(self) data["local_path"] = str(self.local_path) data["last_updated"] = self.last_updated.isoformat() return data @classmethod def from_dict(cls, data: dict[str, Any]) -> "Repository": """Create Repository from dictionary loaded from JSON. Args: data: Dictionary with repository fields Returns: Repository instance """ return cls( id=data["id"], url=data["url"], local_path=Path(data["local_path"]), priority=data["priority"], last_updated=datetime.fromisoformat(data["last_updated"]), skill_count=data["skill_count"], license=data["license"], ) class RepositoryManager: """Manage git-based skills repositories. Handles cloning, updating, and tracking multiple skill repositories. Supports prioritization for resolving conflicts between repositories. """ # Default repositories to clone on setup DEFAULT_REPOS = [ { "url": "https://github.com/anthropics/skills.git", "priority": 100, "license": "Apache-2.0", }, { "url": "https://github.com/obra/superpowers.git", "priority": 90, "license": "MIT", }, { "url": "https://github.com/bobmatnyc/claude-mpm-skills.git", "priority": 80, "license": "MIT", }, ] def __init__(self, base_dir: Optional[Path] = None) -> None: """Initialize repository manager. Args: base_dir: Base directory for storing repositories. Defaults to ~/.mcp-skills/repos/ """ self.base_dir = base_dir or Path.home() / ".mcp-skills" / "repos" self.base_dir.mkdir(parents=True, exist_ok=True) self.metadata_file = self.base_dir.parent / "repos.json" def add_repository( self, url: str, priority: int = 0, license: str = "Unknown" ) -> Repository: """Clone new repository. Args: url: Git repository URL priority: Priority for skill selection (0-100) license: Repository license (default: "Unknown") Returns: Repository metadata object Raises: ValueError: If URL is invalid or repository already exists Design Decision: Git Clone Strategy Rationale: Using GitPython's clone_from() for simplicity and Python integration. Direct subprocess calls would require manual error handling and platform-specific git binary management. GitPython provides consistent cross-platform behavior. Trade-offs: - Simplicity: GitPython handles git binary detection and error wrapping - Performance: Slightly slower than subprocess (~5-10% overhead for small repos) - Dependency: Requires GitPython library, but already in project dependencies Error Handling: - InvalidGitRepositoryError: URL is not a valid git repository - GitCommandError: Clone operation failed (network, permissions, etc.) - ValueError: Invalid priority range or duplicate repository """ # 1. Validate URL if not self._is_valid_git_url(url): raise ValueError(f"Invalid git URL: {url}") # 2. Validate priority range if not 0 <= priority <= 100: raise ValueError(f"Priority must be between 0-100, got {priority}") # 3. Generate repository ID from URL repo_id = self._generate_repo_id(url) # 4. Check if already exists existing = self.get_repository(repo_id) if existing: raise ValueError( f"Repository already exists: {repo_id} at {existing.local_path}" ) # 5. Clone repository using GitPython local_path = self.base_dir / repo_id logger.info(f"Cloning repository {url} to {local_path}") try: git.Repo.clone_from(url, local_path, depth=1) except git.exc.GitCommandError as e: raise ValueError(f"Failed to clone repository {url}: {e}") from e # 6. Scan for skills skill_count = self._count_skills(local_path) logger.info(f"Found {skill_count} skills in {repo_id}") # 7. Create Repository object repository = Repository( id=repo_id, url=url, local_path=local_path, priority=priority, last_updated=datetime.now(timezone.utc), skill_count=skill_count, license=license, ) # 8. Store metadata self._save_repository(repository) return repository def update_repository(self, repo_id: str) -> Repository: """Pull latest changes from repository. Args: repo_id: Repository identifier Returns: Updated repository metadata Raises: ValueError: If repository not found Error Handling: - ValueError: Repository not found in metadata - GitCommandError: Pull operation failed (network, conflicts, etc.) - InvalidGitRepositoryError: Local clone is corrupted Recovery Strategy: - Pull failures are propagated to caller for explicit handling - Consider re-cloning if local repository is corrupted - No automatic conflict resolution (user must handle manually) """ # 1. Find repository by ID repository = self.get_repository(repo_id) if not repository: raise ValueError(f"Repository not found: {repo_id}") # 2. Git pull latest changes logger.info(f"Updating repository {repo_id} from {repository.url}") try: repo = git.Repo(repository.local_path) origin = repo.remotes.origin origin.pull() except git.exc.InvalidGitRepositoryError as e: raise ValueError( f"Local repository is corrupted: {repository.local_path}. " f"Consider removing and re-cloning: {e}" ) from e except git.exc.GitCommandError as e: raise ValueError(f"Failed to update repository {repo_id}: {e}") from e # 3. Rescan for new/updated skills skill_count = self._count_skills(repository.local_path) logger.info(f"Rescanned {repo_id}: {skill_count} skills found") # 4. Update metadata repository.last_updated = datetime.now(timezone.utc) repository.skill_count = skill_count # 5. Save updated metadata self._update_repository_metadata(repository) return repository def list_repositories(self) -> list[Repository]: """List all configured repositories. Returns: List of Repository objects sorted by priority (highest first) Performance Note: - Time Complexity: O(n log n) due to sorting - Space Complexity: O(n) for loading all repositories For current scale (~3-10 repos), this is negligible. If repository count exceeds 100, consider: - Lazy loading with pagination - Maintaining sorted index in JSON - Moving to SQLite with indexed queries (planned for Phase 1 Task 7) """ repositories = self._load_all_repositories() # Sort by priority descending (highest priority first) repositories.sort(key=lambda r: r.priority, reverse=True) return repositories def remove_repository(self, repo_id: str) -> None: """Remove repository and its skills. Args: repo_id: Repository identifier to remove Raises: ValueError: If repository not found Error Handling: - ValueError: Repository not found in metadata - OSError: File deletion failed (permissions, locked files) Data Consistency: - Metadata is removed atomically with temp file strategy - If directory deletion fails after metadata removal, directory is orphaned - Future enhancement: Two-phase commit for atomic operation Failure Recovery: - Orphaned directories can be manually deleted from base_dir - Re-running remove will fail (metadata already gone) but directory remains - Consider: Mark as deleted in metadata, then cleanup in background """ # 1. Find repository by ID repository = self.get_repository(repo_id) if not repository: raise ValueError(f"Repository not found: {repo_id}") logger.info(f"Removing repository {repo_id} from {repository.local_path}") # 2. Delete local clone try: if repository.local_path.exists(): shutil.rmtree(repository.local_path) logger.info(f"Deleted local clone at {repository.local_path}") except OSError as e: logger.error(f"Failed to delete repository directory: {e}") raise ValueError( f"Failed to delete repository directory {repository.local_path}: {e}" ) from e # 3. Remove from metadata storage # Note: Skill index removal will be handled by SkillManager (Task 4) # and ChromaDB integration (Task 5) in later phases self._delete_repository_metadata(repo_id) def get_repository(self, repo_id: str) -> Optional[Repository]: """Get repository by ID. Args: repo_id: Repository identifier Returns: Repository object or None if not found Performance: - Time Complexity: O(n) linear scan of all repositories - For current scale (3-10 repos), this is <1ms Optimization Opportunity: - If repo count >100, consider in-memory dict cache - SQLite migration (Task 7) will provide O(1) indexed lookup """ repositories = self._load_all_repositories() for repo in repositories: if repo.id == repo_id: return repo return None # Private helper methods def _is_valid_git_url(self, url: str) -> bool: """Validate git repository URL format. Args: url: URL to validate Returns: True if URL appears to be a valid git repository URL Supported Formats: - HTTPS: https://github.com/user/repo.git - SSH: git@github.com:user/repo.git - Git protocol: git://github.com/user/repo.git Note: This is basic format validation, not network reachability check. Actual repository validity is tested during clone operation. """ if not url: return False # HTTPS URLs if url.startswith("https://") or url.startswith("http://"): try: parsed = urlparse(url) # Must have scheme, netloc, and path return bool(parsed.scheme and parsed.netloc and parsed.path) except Exception: return False # SSH URLs (git@host:path/to/repo.git) if url.startswith("git@"): # Basic validation: must contain colon separator return ":" in url # Git protocol URLs if url.startswith("git://"): return True return False def _generate_repo_id(self, url: str) -> str: """Generate repository ID from URL. Args: url: Git repository URL Returns: Repository ID in format "owner/repo" or "hostname/owner/repo" Examples: "https://github.com/anthropics/skills.git" -> "anthropics/skills" "git@github.com:obra/superpowers.git" -> "obra/superpowers" "https://gitlab.com/group/subgroup/project.git" -> "group/subgroup/project" Design Decision: ID Format Rationale: Use path-based IDs that preserve repository identity across different clone URLs (HTTPS vs SSH). This allows identifying duplicates when users add same repo with different URL formats. Trade-offs: - Uniqueness: Path-based IDs work for GitHub/GitLab style URLs - Collisions: Rare, but possible for self-hosted repos with same path - Readability: IDs are human-readable and match repo names """ # Remove .git suffix if present clean_url = url.rstrip("/") if clean_url.endswith(".git"): clean_url = clean_url[:-4] # Handle SSH URLs (git@host:path) if url.startswith("git@"): # Extract path after colon if ":" in clean_url: path = clean_url.split(":", 1)[1] return path.strip("/") # Handle HTTPS/HTTP/Git URLs try: parsed = urlparse(clean_url) # Extract path without leading slash path = parsed.path.lstrip("/") return path except Exception: # Fallback: use sanitized URL as ID return re.sub(r"[^a-zA-Z0-9_-]", "_", clean_url) def _count_skills(self, repo_path: Path) -> int: """Count SKILL.md files in repository. Args: repo_path: Path to repository root Returns: Number of skill files found Performance: - Time Complexity: O(n) where n = total files in repo - Optimization: Could cache results and only rescan changed files Future Enhancement: - Use watchdog for incremental updates - Store skill metadata during scan for faster access """ skill_files = list(repo_path.rglob("SKILL.md")) return len(skill_files) def _load_all_repositories(self) -> list[Repository]: """Load all repositories from JSON metadata file. Returns: List of Repository objects (empty list if file doesn't exist) Error Handling: - Missing file: Returns empty list (no repositories configured) - Corrupt JSON: Logs error and returns empty list - Invalid data: Logs error and skips malformed entries """ if not self.metadata_file.exists(): return [] try: with open(self.metadata_file, "r") as f: data = json.load(f) repositories = [] for repo_data in data.get("repositories", []): try: repo = Repository.from_dict(repo_data) repositories.append(repo) except (KeyError, ValueError) as e: logger.warning(f"Skipping malformed repository entry: {e}") return repositories except (json.JSONDecodeError, OSError) as e: logger.error(f"Failed to load repository metadata: {e}") return [] def _save_repository(self, repository: Repository) -> None: """Save new repository to metadata file. Args: repository: Repository to add to metadata Design Decision: Atomic File Updates Rationale: Use temp file + rename for atomic updates to prevent corruption if process crashes during write. POSIX rename() is atomic, ensuring metadata is never in half-written state. Trade-offs: - Safety: Prevents corruption at cost of extra disk I/O - Performance: Negligible for small files (<100KB) - Complexity: Requires temp file handling Error Handling: - Write errors: Temp file is not renamed, original preserved - Rename errors: Rare, but original file remains unchanged """ repositories = self._load_all_repositories() repositories.append(repository) self._write_metadata(repositories) def _update_repository_metadata(self, repository: Repository) -> None: """Update existing repository in metadata file. Args: repository: Repository with updated metadata Note: This replaces the repository entry with matching ID """ repositories = self._load_all_repositories() # Replace repository with matching ID for i, repo in enumerate(repositories): if repo.id == repository.id: repositories[i] = repository break self._write_metadata(repositories) def _delete_repository_metadata(self, repo_id: str) -> None: """Remove repository from metadata file. Args: repo_id: ID of repository to remove """ repositories = self._load_all_repositories() # Filter out repository with matching ID repositories = [repo for repo in repositories if repo.id != repo_id] self._write_metadata(repositories) def _write_metadata(self, repositories: list[Repository]) -> None: """Write repository list to metadata file atomically. Args: repositories: List of all repositories to persist Atomic Write Strategy: 1. Write to temporary file in same directory 2. Rename temp file over original (atomic operation on POSIX) 3. This ensures metadata is never corrupted by partial writes """ # Ensure parent directory exists self.metadata_file.parent.mkdir(parents=True, exist_ok=True) # Convert repositories to dict format data = {"repositories": [repo.to_dict() for repo in repositories]} # Write to temp file, then atomic rename temp_fd, temp_path = tempfile.mkstemp( dir=self.metadata_file.parent, prefix=".repos_", suffix=".json.tmp" ) try: with os.fdopen(temp_fd, "w") as f: json.dump(data, f, indent=2) # Atomic rename (POSIX guarantees atomicity) os.replace(temp_path, self.metadata_file) except Exception: # Clean up temp file on error try: os.unlink(temp_path) except OSError: pass raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bobmatnyc/mcp-skills'

If you have feedback or need assistance with the MCP directory API, please join our Discord server