MCPunk
by jurasofish
- tests
import logging
import sys
import time
from pathlib import Path
import pytest
from git import Repo
from mcpunk.file_breakdown import Project as FileBreakdownProject
from mcpunk.file_chunk import ChunkCategory
from tests.conftest import FileSet
def test_project_initialization(basic_file_set: FileSet) -> None:
project = FileBreakdownProject(root=basic_file_set.root)
assert project.root == basic_file_set.root
assert len(project.files) == 2
assert set(project.file_map.keys()) == {
basic_file_set.root / "a.py",
basic_file_set.root / "b.py",
}
def test_project_file_loading(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
fs.add_file("test.py", "import foo")
project = FileBreakdownProject(root=fs.root)
assert len(project.files) == 1
loaded_file = project.file_map[fs.root / "test.py"]
assert loaded_file.contents == "import foo"
assert loaded_file.ext == ".py"
assert [c.category for c in loaded_file.chunks] == [ChunkCategory.imports]
def test_project_mixed_files_and_locations(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
# Top level files
fs.add_file("script.py", "import blah")
fs.add_file("readme.md", "# Title\n## Section")
fs.add_file("config.xyz", "some: config")
# Deep nested files
fs.add_file("src/lib/utils.py", "x = 1")
fs.add_file("docs/api/spec.md", "# API\n## Endpoints")
fs.add_file("config/env/dev.xyz", "env: dev")
project = FileBreakdownProject(root=fs.root)
assert len(project.files) == 6
# Check Python files were chunked correctly
script_file = project.file_map[fs.root / "script.py"]
utils_file = project.file_map[fs.root / "src/lib/utils.py"]
assert [c.category for c in script_file.chunks] == [ChunkCategory.imports]
assert [c.category for c in utils_file.chunks] == [ChunkCategory.module_level]
# Check Markdown files were chunked correctly
readme_file = project.file_map[fs.root / "readme.md"]
spec_file = project.file_map[fs.root / "docs/api/spec.md"]
assert all(c.category == ChunkCategory.markdown_section for c in readme_file.chunks)
assert all(c.category == ChunkCategory.markdown_section for c in spec_file.chunks)
# Check unknown extensions used whole file chunker
config_file = project.file_map[fs.root / "config.xyz"]
env_file = project.file_map[fs.root / "config/env/dev.xyz"]
assert all(c.category == ChunkCategory.whole_file for c in config_file.chunks)
assert all(c.category == ChunkCategory.whole_file for c in env_file.chunks)
def test_project_git_detection_without_git(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
fs.add_file("test.py", "x = 1")
project = FileBreakdownProject(root=fs.root)
assert project.git_repo is None
def test_project_git_detection_with_git(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
fs.add_file("test.py", "x = 1")
# Initialize actual git repo
Repo.init(fs.root)
project = FileBreakdownProject(root=fs.root)
assert project.git_repo is not None
def test_project_non_existent_root(tmp_path: Path) -> None:
non_existent = tmp_path / "does_not_exist"
with pytest.raises(ValueError, match="does not exist"):
FileBreakdownProject(root=non_existent)
@pytest.mark.skipif(sys.platform not in ("linux", "darwin"), reason="Dumb test relies on chmod")
def test_project_handles_unreadable_file(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
fs.add_file("good.py", "x = 1")
bad_file = fs.root / "bad.py"
bad_file.touch()
bad_file.chmod(0o000) # Make unreadable
project = FileBreakdownProject(root=fs.root)
assert len(project.files) == 1 # Should have just the good file
assert (fs.root / "good.py") in project.file_map
# Good file should be chunked normally
good_file = project.file_map[fs.root / "good.py"]
assert [c.category for c in good_file.chunks] == [ChunkCategory.module_level]
def test_project_handles_bad_syntax_file(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
fs.add_file("good.py", "x = 1")
fs.add_file("bad_syntax.py", "x = ")
project = FileBreakdownProject(root=fs.root)
assert len(project.files) == 2
assert (fs.root / "good.py") in project.file_map
assert (fs.root / "bad_syntax.py") in project.file_map
# Bad syntax should fallback to whole file chunker
bad_syntax_file = project.file_map[fs.root / "bad_syntax.py"]
assert [c.category for c in bad_syntax_file.chunks] == [ChunkCategory.whole_file]
# Good file should be chunked normally
good_file = project.file_map[fs.root / "good.py"]
assert [c.category for c in good_file.chunks] == [ChunkCategory.module_level]
def test_project_parallel_processing(
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
monkeypatch: pytest.MonkeyPatch,
) -> None:
# A bit of a chunky one as it results in multiprocessing.
# e.g. in CI we only have two cores. For reliable testing, patch this.
monkeypatch.setattr("os.cpu_count", lambda: 8)
fs = FileSet(tmp_path)
# First add 10 files - should use single worker
for i in range(10):
fs.add_file(f"file_{i}.py", f"x_{i} = {i}")
caplog.set_level(logging.DEBUG)
project = FileBreakdownProject(root=fs.root, files_per_parallel_worker=10)
assert len(project.files) == 10
assert "workers" not in caplog.text.lower()
# Add 20 more files - should now use multiple workers
for i in range(10, 30):
fs.add_file(f"file_{i}.py", f"x_{i} = {i}")
project = FileBreakdownProject(root=fs.root, files_per_parallel_worker=10)
assert len(project.files) == 30
assert "Using 3 workers to process 30 files" in caplog.text
def test_project_watch_new_file(tmp_path: Path) -> None:
# Init project
fs = FileSet(tmp_path)
fs.add_file("test.py", "def test(): pass")
project = FileBreakdownProject(root=fs.root, file_watch_refresh_freq_seconds=0.0)
# Add new file and wait for refresh
fs.add_file("new.py", "import new")
for _ in range(2_000):
if (fs.root / "new.py") in project.file_map:
break
time.sleep(1e-3)
else:
raise AssertionError("File not updated")
assert (fs.root / "new.py") in project.file_map
new_file = project.file_map[fs.root / "new.py"]
assert new_file.contents == "import new"
assert [c.category for c in new_file.chunks] == [ChunkCategory.imports]
def test_project_watch_modify_file(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
fs.add_file("test.py", "def test(): pass")
project = FileBreakdownProject(root=fs.root, file_watch_refresh_freq_seconds=0.0)
# Modify file and wait for refresh
(fs.root / "test.py").write_text("import modified")
for _ in range(2_000):
if project.file_map[fs.root / "test.py"].contents == "import modified":
break
time.sleep(1e-3)
else:
raise AssertionError("File not updated")
modified_file = project.file_map[fs.root / "test.py"]
assert modified_file.contents == "import modified"
assert [c.category for c in modified_file.chunks] == [ChunkCategory.imports]
def test_project_watch_delete_file(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
fs.add_file("test.py", "def test(): pass")
project = FileBreakdownProject(root=fs.root, file_watch_refresh_freq_seconds=0.0)
assert len(project.file_map) == 1
# Delete file and wait for refresh
(fs.root / "test.py").unlink()
for _ in range(2_000):
if (fs.root / "test.py") not in project.file_map:
break
time.sleep(1e-3)
else:
raise AssertionError("File not updated")
assert (fs.root / "test.py") not in project.file_map
def test_project_watch_delete_and_recreate_file(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
fs.add_file("test.py", "def test(): pass")
project = FileBreakdownProject(root=fs.root, file_watch_refresh_freq_seconds=0.0)
assert len(project.file_map) == 1
# Delete file and wait for refresh
(fs.root / "test.py").unlink()
for _ in range(2_000):
if (fs.root / "test.py") not in project.file_map:
break
time.sleep(1e-3)
else:
raise AssertionError("File not updated after delete")
assert len(project.file_map) == 0
# Recreate file and wait for refresh
fs.add_file("test.py", "import recreated")
for _ in range(2_000):
if (fs.root / "test.py") in project.file_map:
break
time.sleep(1e-3)
else:
raise AssertionError("File not updated after recreation")
modified_file = project.file_map[fs.root / "test.py"]
assert modified_file.contents == "import recreated"
assert [c.category for c in modified_file.chunks] == [ChunkCategory.imports]
def test_project_watch_modify_deep_file(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
fs.add_file("deep/nested/dir/test.py", "def test(): pass")
project = FileBreakdownProject(root=fs.root, file_watch_refresh_freq_seconds=0.0)
assert len(project.file_map) == 1
# Modify deep file and wait for refresh
(fs.root / "deep/nested/dir/test.py").write_text("import modified")
for _ in range(2_000):
if project.file_map[fs.root / "deep/nested/dir/test.py"].contents == "import modified":
break
time.sleep(1e-3)
else:
raise AssertionError("File not updated")
modified_file = project.file_map[fs.root / "deep/nested/dir/test.py"]
assert modified_file.contents == "import modified"
assert [c.category for c in modified_file.chunks] == [ChunkCategory.imports]
def test_project_instantiation_git_ignored_files(tmp_path: Path) -> None:
fs = FileSet(tmp_path)
# Setup git repo
repo = Repo.init(fs.root)
# Create files
fs.add_file("tracked.py", "x = 1")
fs.add_file("ignored.py", "y = 2")
# Add gitignore
(fs.root / ".gitignore").write_text("ignored.py\n")
# Add tracked file to git
repo.index.add(["tracked.py"])
repo.index.commit("Add tracked file")
project = FileBreakdownProject(root=fs.root)
assert len(project.files) == 1
assert (fs.root / "tracked.py") in project.file_map
assert (fs.root / "ignored.py") not in project.file_map
def test_project_git_ignored_file_watch(
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test is a bit racy and gross, has a retry loop.
It's generally tricky to confirm that something doesn't happen, especially when
that thing has a nondeterministic delay to it. Oh well. Better imo than digging
deep into tests of implementation details.
"""
fs = FileSet(tmp_path)
repo = Repo.init(fs.root)
(fs.root / ".gitignore").write_text("ignored.py\n")
# Add tracked file to git
fs.add_file("tracked.py", "x = 1")
repo.index.add(["tracked.py"])
repo.index.commit("Add tracked file")
# Add ignored file
fs.add_file("ignored.py", "y = 2")
project = FileBreakdownProject(root=fs.root, file_watch_refresh_freq_seconds=0.0)
assert len(project.files) == 1
# Modify ignored file
caplog.clear()
caplog.set_level(logging.DEBUG)
(fs.root / "ignored.py").write_text("y = 3")
# Wait for ignore message in logs
for _ in range(2_000):
if f"Ignoring modified for {fs.root / 'ignored.py'}" in caplog.text:
break
time.sleep(1e-3)
else:
raise AssertionError("File was not ignored")
def test_project_modify_file_in_dot_git_dir(
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Files in .git should be ignored."""
fs = FileSet(tmp_path)
# Setup git repo and add file
_repo = Repo.init(fs.root)
git_file = fs.root / ".git" / "test.py"
git_file.parent.mkdir(exist_ok=True)
git_file.write_text("x = 1")
project = FileBreakdownProject(root=fs.root, file_watch_refresh_freq_seconds=0.0)
assert len(project.files) == 0 # No tracked files
# Modify file in .git
caplog.clear()
caplog.set_level(logging.DEBUG)
git_file.write_text("x = 2")
# Wait for ignore message in logs
for _ in range(2_000):
if f"Ignoring modified for {git_file}" in caplog.text:
break
time.sleep(1e-3)
else:
raise AssertionError("File in .git was not ignored")
def test_project_load_non_existent_file(
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
fs = FileSet(tmp_path)
fs.add_file("exists.py", "x = 1")
project = FileBreakdownProject(root=fs.root)
caplog.clear()
caplog.set_level(logging.WARNING)
non_existent = fs.root / "does_not_exist.py"
project.load_files([fs.root / "does_not_exist.py"])
# Wait for warning message
for _ in range(2_000):
if f"File {non_existent} does not exist" in caplog.text:
break
time.sleep(1e-3)
else:
raise AssertionError(f"No log message about non-existent file. Caplog is {caplog.text}")
assert len(project.files) == 1
assert fs.root / "exists.py" in project.file_map
assert fs.root / "does_not_exist.py" not in project.file_map