Skip to main content
Glama
johannhartmann

MCP Code Analysis Server

test_git_sync.py14.6 kB
"""Tests for Git repository synchronization.""" import hashlib from pathlib import Path from typing import Any, cast from unittest.mock import MagicMock, patch import git import pytest from git.exc import GitCommandError, InvalidGitRepositoryError from src.scanner.git_sync import GitSync from src.utils.exceptions import RepositoryError @pytest.fixture def git_sync(tmp_path: Path) -> Any: """Create GitSync fixture with temporary storage.""" with patch("src.scanner.git_sync.settings") as mock_settings: mock_settings.scanner.root_paths = [str(tmp_path / "repos")] mock_settings.scanner.exclude_patterns = [ "__pycache__", "*.pyc", ".git", ".pytest_cache", ] sync = GitSync() yield sync @pytest.fixture def mock_repo() -> git.Repo: """Create mock git repository.""" repo = MagicMock(spec=git.Repo) repo.working_dir = "/tmp/test_repo" # nosec B108 - mock path for testing repo.heads = {"main": MagicMock(), "develop": MagicMock()} repo.active_branch = MagicMock(name="main") repo.active_branch.name = "main" repo.remotes = MagicMock() repo.remotes.origin = MagicMock() return repo class TestGitSync: """Tests for GitSync class.""" def test_init(self, tmp_path: Path) -> None: """Test GitSync initialization.""" with patch("src.scanner.git_sync.settings") as mock_settings: mock_settings.scanner.root_paths = [str(tmp_path / "repos")] sync = GitSync() assert sync.storage_path == tmp_path / "repos" assert sync.storage_path.exists() def test_get_repo_path(self, git_sync: GitSync) -> None: """Test repository path generation.""" path = git_sync._get_repo_path("test-owner", "test-repo") assert path == git_sync.storage_path / "test-owner" / "test-repo" def testextract_owner_repo_https(self, git_sync: GitSync) -> None: """Test extracting owner and repo from HTTPS URL.""" owner, repo = git_sync.extract_owner_repo( "https://github.com/test-owner/test-repo", ) assert owner == "test-owner" assert repo == "test-repo" # With .git suffix owner, repo = git_sync.extract_owner_repo( "https://github.com/test-owner/test-repo.git", ) assert owner == "test-owner" assert repo == "test-repo" def testextract_owner_repo_ssh(self, git_sync: GitSync) -> None: """Test extracting owner and repo from SSH URL.""" owner, repo = git_sync.extract_owner_repo( "git@github.com:test-owner/test-repo.git", ) assert owner == "test-owner" assert repo == "test-repo" def test_extract_owner_repo_invalid(self, git_sync: GitSync) -> None: """Test extracting owner and repo from invalid URL.""" from src.utils.exceptions import ValidationError with pytest.raises(ValidationError, match="Invalid URL"): git_sync.extract_owner_repo("https://example.com/test-repo") with pytest.raises(ValidationError, match="Invalid path"): git_sync.extract_owner_repo("https://github.com/invalid-path") @pytest.mark.asyncio async def test_clone_repository_success( self, git_sync: GitSync, mock_repo: git.Repo ) -> None: """Test successful repository cloning.""" with patch("git.Repo.clone_from", return_value=mock_repo) as mock_clone: repo = await git_sync.clone_repository( "https://github.com/test-owner/test-repo", branch="main", ) assert repo == mock_repo mock_clone.assert_called_once() # Check clone arguments call_args = mock_clone.call_args assert "test-owner/test-repo" in str(call_args[0][1]) assert call_args[1]["branch"] == "main" assert call_args[1]["depth"] == 1 @pytest.mark.asyncio async def test_clone_repository_with_token( self, git_sync: GitSync, mock_repo: git.Repo ) -> None: """Test repository cloning with access token.""" with patch("git.Repo.clone_from", return_value=mock_repo) as mock_clone: await git_sync.clone_repository( "https://github.com/test-owner/test-repo", access_token="test_token", ) # Check that token was inserted into URL call_args = mock_clone.call_args assert "test_token@github.com" in call_args[0][0] @pytest.mark.asyncio async def test_clone_repository_already_exists( self, git_sync: GitSync, mock_repo: git.Repo ) -> None: """Test cloning when repository already exists.""" repo_path = git_sync._get_repo_path("test-owner", "test-repo") repo_path.mkdir(parents=True) with ( patch("git.Repo", return_value=mock_repo), patch.object( git_sync, "update_repository", return_value=mock_repo, ) as mock_update, ): repo = await git_sync.clone_repository( "https://github.com/test-owner/test-repo", ) assert repo == mock_repo mock_update.assert_called_once() @pytest.mark.asyncio async def test_clone_repository_git_error(self, git_sync: GitSync) -> None: """Test repository cloning with git error.""" with ( patch( "git.Repo.clone_from", side_effect=GitCommandError("clone", "error"), ), pytest.raises(RepositoryError, match="Clone failed"), ): await git_sync.clone_repository( "https://github.com/test-owner/test-repo", ) @pytest.mark.asyncio async def test_update_repository_success( self, git_sync: GitSync, mock_repo: git.Repo ) -> None: """Test successful repository update.""" repo_path = git_sync._get_repo_path("test-owner", "test-repo") repo_path.mkdir(parents=True) with patch("git.Repo", return_value=mock_repo): repo = await git_sync.update_repository( "https://github.com/test-owner/test-repo", branch="main", ) assert repo == mock_repo # Verify calls via MagicMock attributes assert cast("Any", mock_repo.remotes.origin.fetch).call_count == 1 assert cast("Any", mock_repo.remotes.origin.pull).call_args[0][0] == "main" @pytest.mark.asyncio async def test_update_repository_not_exists( self, git_sync: GitSync, mock_repo: git.Repo ) -> None: """Test updating non-existent repository.""" with patch.object( git_sync, "clone_repository", return_value=mock_repo, ) as mock_clone: repo = await git_sync.update_repository( "https://github.com/test-owner/test-repo", ) assert repo == mock_repo mock_clone.assert_called_once() @pytest.mark.asyncio async def test_update_repository_invalid_repo( self, git_sync: GitSync, mock_repo: git.Repo ) -> None: """Test updating invalid repository directory.""" repo_path = git_sync._get_repo_path("test-owner", "test-repo") repo_path.mkdir(parents=True) with patch("git.Repo", side_effect=InvalidGitRepositoryError): with patch.object( git_sync, "clone_repository", return_value=mock_repo, ) as mock_clone: repo = await git_sync.update_repository( "https://github.com/test-owner/test-repo", ) assert repo == mock_repo mock_clone.assert_called_once() assert not repo_path.exists() # Should be removed def test_get_repository_exists( self, git_sync: GitSync, mock_repo: git.Repo ) -> None: """Test getting existing repository.""" repo_path = git_sync._get_repo_path("test-owner", "test-repo") repo_path.mkdir(parents=True) with patch("git.Repo", return_value=mock_repo): repo = git_sync.get_repository("https://github.com/test-owner/test-repo") assert repo == mock_repo def test_get_repository_not_exists(self, git_sync: GitSync) -> None: """Test getting non-existent repository.""" repo = git_sync.get_repository("https://github.com/test-owner/test-repo") assert repo is None def test_get_file_hash(self, git_sync: GitSync, tmp_path: Path) -> None: """Test file hash calculation.""" test_file = tmp_path / "test.txt" test_content = b"Hello, World!" test_file.write_bytes(test_content) hash_value = git_sync.get_file_hash(test_file) # Verify hash expected_hash = hashlib.sha256(test_content).hexdigest() assert hash_value == expected_hash @pytest.mark.asyncio async def test_get_changed_files( self, git_sync: GitSync, mock_repo: git.Repo ) -> None: """Test getting changed files.""" # Mock diff items diff_item1 = MagicMock() diff_item1.a_path = "src/old.py" diff_item1.b_path = "src/new.py" diff_item1.new_file = False diff_item1.deleted_file = False diff_item1.renamed_file = True diff_item2 = MagicMock() diff_item2.a_path = None diff_item2.b_path = "src/added.py" diff_item2.new_file = True diff_item2.deleted_file = False diff_item2.renamed_file = False mock_commit = MagicMock() mock_commit.diff.return_value = [diff_item1, diff_item2] cast("Any", mock_repo).commit.return_value = mock_commit # Provide a MagicMock for head with a commit attribute head_mock = MagicMock() cast("Any", mock_repo).head = head_mock head_mock.commit = MagicMock() changed_files = await git_sync.get_changed_files(mock_repo, "abc123") assert len(changed_files) == 2 # For renamed files, the key is the old path (a_path) assert changed_files["src/old.py"]["change_type"] == "renamed" assert changed_files["src/old.py"]["new_path"] == "src/new.py" assert changed_files["src/added.py"]["change_type"] == "added" @pytest.mark.asyncio async def test_scan_repository_files( self, git_sync: GitSync, tmp_path: Path ) -> None: """Test scanning repository files.""" # Create mock repository structure repo_path = tmp_path / "test_repo" src_path = repo_path / "src" src_path.mkdir(parents=True) # Create test files (src_path / "main.py").write_text("print('main')") (src_path / "utils.py").write_text("print('utils')") (src_path / "data.json").write_text('{"test": true}') # Create excluded directory pycache_path = src_path / "__pycache__" pycache_path.mkdir() (pycache_path / "main.cpython-39.pyc").write_text("compiled") mock_repo = MagicMock() mock_repo.working_dir = str(repo_path) files = await git_sync.scan_repository_files(mock_repo, {".py"}) assert len(files) == 2 file_paths = [f["path"] for f in files] assert "src/main.py" in file_paths assert "src/utils.py" in file_paths assert "src/data.json" not in file_paths assert "__pycache__" not in str(file_paths) def test_detect_language(self, git_sync: GitSync) -> None: """Test language detection from file extension.""" test_cases = [ (Path("test.py"), "python"), (Path("test.js"), "javascript"), (Path("test.ts"), "typescript"), (Path("test.java"), "java"), (Path("test.cpp"), "cpp"), (Path("test.rs"), "rust"), (Path("test.unknown"), "unknown"), ] for file_path, expected_language in test_cases: assert git_sync._detect_language(file_path) == expected_language def test_get_commit_info(self, git_sync: GitSync, mock_repo: git.Repo) -> None: """Test getting commit information.""" mock_commit = MagicMock() mock_commit.hexsha = "abc123def456" mock_commit.message = "Test commit message\n" mock_commit.author.name = "Test Author" mock_commit.author.email = "test@example.com" mock_commit.committed_date = 1234567890 mock_commit.stats.files = {"file1.py": {}, "file2.py": {}} mock_commit.stats.total = {"insertions": 10, "deletions": 5} cast("Any", mock_repo).commit.return_value = mock_commit info = git_sync.get_commit_info(mock_repo, "abc123") assert info["sha"] == "abc123def456" assert info["message"] == "Test commit message" assert info["author"] == "Test Author" assert info["author_email"] == "test@example.com" assert info["files_changed"] == ["file1.py", "file2.py"] assert info["additions"] == 10 assert info["deletions"] == 5 @pytest.mark.asyncio async def test_get_recent_commits( self, git_sync: GitSync, mock_repo: git.Repo ) -> None: """Test getting recent commits.""" # Create mock commits commits = [] for i in range(3): commit = MagicMock() commit.hexsha = f"commit{i}" commit.message = f"Commit {i}" commit.author.name = "Test Author" commit.author.email = "test@example.com" commit.committed_date = 1234567890 + i * 3600 commit.stats.files = {} commit.stats.total = {"insertions": i, "deletions": 0} commits.append(commit) cast("Any", mock_repo).iter_commits.return_value = commits with patch.object(git_sync, "get_commit_info") as mock_get_info: mock_get_info.side_effect = lambda r, sha: { "sha": sha, "message": f"Commit {sha[-1]}", } recent_commits = await git_sync.get_recent_commits( mock_repo, branch="main", limit=10, ) assert len(recent_commits) == 3 assert recent_commits[0]["sha"] == "commit0" assert recent_commits[2]["sha"] == "commit2"

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johannhartmann/mcpcodeanalysis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server