Skip to main content
Glama
johannhartmann

MCP Code Analysis Server

repository.py10 kB
"""Repository management tools for MCP server.""" from datetime import datetime from typing import Any, cast from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from src.database.models import Commit, File from src.database.repositories import CommitRepo, FileRepo, RepositoryRepo from src.logger import get_logger from src.scanner.git_sync import GitSync from src.scanner.github_monitor import GitHubMonitor logger = get_logger(__name__) # Time constants SECONDS_PER_HOUR = 3600 SECONDS_PER_MINUTE = 60 class RepositoryTool: """MCP tools for repository management.""" def __init__(self, session: AsyncSession) -> None: self.session = session self.github_monitor = GitHubMonitor() self.git_sync = GitSync() self.repo_repo = RepositoryRepo(session) self.file_repo = FileRepo(session) self.commit_repo = CommitRepo(session) async def list_repositories(self) -> list[dict[str, Any]]: """ List all monitored GitHub repositories with their sync status. Returns: List of repositories with their current status """ try: repo_list = await self.repo_repo.list_all() repositories = [] for repo in repo_list: # Get file and commit counts from sqlalchemy import func file_count_result = await self.session.execute( select(func.count(File.id)).where(File.repository_id == repo.id), ) file_count = file_count_result.scalar() or 0 commit_count_result = await self.session.execute( select(func.count(Commit.id)).where( Commit.repository_id == repo.id ), ) commit_count = commit_count_result.scalar() or 0 # Get last commit last_commit = await self.commit_repo.get_latest(cast("int", repo.id)) # Check local directory exists local_path = self.git_sync._get_repo_path( cast("str", repo.owner), cast("str", repo.name) ) is_cloned = local_path.exists() repositories.append( { "id": repo.id, "name": repo.name, "owner": repo.owner, "url": repo.github_url, "branch": repo.default_branch, "last_synced": ( repo.last_synced.isoformat() if repo.last_synced else None ), "status": { "is_cloned": is_cloned, "file_count": file_count, "commit_count": commit_count, "last_commit": ( { "sha": ( last_commit.sha[:7] if last_commit else None ), "message": ( last_commit.message if last_commit else None ), "timestamp": ( last_commit.timestamp.isoformat() if last_commit else None ), } if last_commit else None ), }, "sync_age": self._calculate_sync_age( cast("datetime | None", repo.last_synced) ), }, ) return repositories except Exception as e: logger.exception("Error in list_repositories: %s") return [{"error": str(e)}] async def sync_repository(self, repository_url: str) -> dict[str, Any]: """ Manually trigger sync for a specific repository. Args: repository_url: GitHub repository URL Returns: Sync status and results """ try: # Find repository repo = await self.repo_repo.get_by_url(repository_url) result: dict[str, Any] if not repo: # Try to add new repository try: # Get repository info from GitHub repo_info = await self.github_monitor.get_repository_info( repository_url, ) # Create repository record repo = await self.repo_repo.create( github_url=repository_url, owner=repo_info["owner"], name=repo_info["name"], default_branch=repo_info["default_branch"], ) result = { "status": "added", "repository": { "id": repo.id, "name": repo.name, "owner": repo.owner, "url": repo.github_url, }, "message": "Repository added and sync initiated", } except (ValueError, ConnectionError, OSError) as e: return { "status": "error", "error": f"Failed to add repository: {e!s}", "repository_url": repository_url, } else: result = { "status": "syncing", "repository": { "id": repo.id, "name": repo.name, "owner": repo.owner, "url": repo.github_url, }, "message": "Sync initiated for existing repository", } # Check if cloned local_path = self.git_sync._get_repo_path( cast("str", repo.owner), cast("str", repo.name) ) if not local_path.exists(): # Clone repository git_repo = await self.git_sync.clone_repository( cast("str", repo.github_url), cast("str", repo.default_branch), ) result["cloned"] = True result["local_path"] = str(local_path) else: # Update repository git_repo = await self.git_sync.update_repository( cast("str", repo.github_url), cast("str", repo.default_branch), ) result["updated"] = True # Get changed files count if repo.last_synced: # Get commits since last sync to count changed files recent_commits = await self.git_sync.get_recent_commits( git_repo, cast("str", repo.default_branch), limit=100, since=cast("datetime | None", repo.last_synced), ) # Collect unique changed files changed_files = set() for commit_info in recent_commits: changed_files.update(commit_info.get("files_changed", [])) result["changed_files"] = len(changed_files) # Update last synced await self.repo_repo.update_last_synced(cast("int", repo.id)) # Get latest commits if repo.last_synced: commits = await self.github_monitor.get_commits_since( cast("str", repo.owner), cast("str", repo.name), cast("datetime", repo.last_synced), cast("str", repo.default_branch), ) result["new_commits"] = len(commits) # Store commits if commits: await self.commit_repo.create_batch( [ { "repository_id": repo.id, "sha": commit["sha"], "message": commit["message"], "author": commit["author"], "author_email": commit["author_email"], "timestamp": commit["timestamp"], } for commit in commits ], ) return result except Exception as e: logger.exception("Error in sync_repository: %s") return { "status": "error", "error": str(e), "repository_url": repository_url, } def _calculate_sync_age(self, last_synced: datetime | None) -> str | None: """Calculate human-readable sync age.""" if not last_synced: return "Never synced" from datetime import UTC age = datetime.now(tz=UTC) - last_synced if age.days > 0: return f"{age.days} day{'s' if age.days > 1 else ''} ago" if age.seconds > SECONDS_PER_HOUR: hours = age.seconds // SECONDS_PER_HOUR return f"{hours} hour{'s' if hours > 1 else ''} ago" if age.seconds > SECONDS_PER_MINUTE: minutes = age.seconds // SECONDS_PER_MINUTE return f"{minutes} minute{'s' if minutes > 1 else ''} ago" return "Just now"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johannhartmann/mcpcodeanalysis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server