test_tmp_files.py•5.93 kB
"""Test proper handling of .tmp files during sync."""
import asyncio
from pathlib import Path
import pytest
from watchfiles import Change
async def create_test_file(path: Path, content: str = "test content") -> None:
"""Create a test file with given content."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
@pytest.mark.asyncio
async def test_temp_file_filter(watch_service, app_config, project_config, test_project):
"""Test that .tmp files are correctly filtered out."""
# Test filter_changes method directly
tmp_path = Path(test_project.path) / "test.tmp"
assert not watch_service.filter_changes(Change.added, str(tmp_path))
# Test with valid file
valid_path = Path(test_project.path) / "test.md"
assert watch_service.filter_changes(Change.added, str(valid_path))
@pytest.mark.asyncio
async def test_handle_tmp_files(watch_service, project_config, test_project, sync_service):
"""Test handling of .tmp files during sync process."""
project_dir = Path(test_project.path)
# Create a .tmp file - this simulates a file being written with write_file_atomic
tmp_file = project_dir / "test.tmp"
await create_test_file(tmp_file, "This is a temporary file")
# Create the target final file
final_file = project_dir / "test.md"
await create_test_file(final_file, "This is the final file")
# Setup changes that include both the .tmp and final file
changes = {
(Change.added, str(tmp_file)),
(Change.added, str(final_file)),
}
# Handle changes
await watch_service.handle_changes(test_project, changes)
# Verify only the final file got an entity
tmp_entity = await sync_service.entity_repository.get_by_file_path("test.tmp")
final_entity = await sync_service.entity_repository.get_by_file_path("test.md")
assert tmp_entity is None, "Temp file should not have an entity"
assert final_entity is not None, "Final file should have an entity"
@pytest.mark.asyncio
async def test_atomic_write_tmp_file_handling(
watch_service, project_config, test_project, sync_service
):
"""Test handling of file changes during atomic write operations."""
project_dir = project_config.home
# This test simulates the full atomic write process:
# 1. First a .tmp file is created
# 2. Then the .tmp file is renamed to the final file
# 3. Both events are processed by the watch service
# Setup file paths
tmp_path = project_dir / "document.tmp"
final_path = project_dir / "document.md"
# Create mockup of the atomic write process
await create_test_file(tmp_path, "Content for document")
# First batch of changes - .tmp file created
changes1 = {(Change.added, str(tmp_path))}
# Process first batch
await watch_service.handle_changes(test_project, changes1)
# Now "replace" the temp file with the final file
tmp_path.rename(final_path)
# Second batch of changes - .tmp file deleted, final file added
changes2 = {(Change.deleted, str(tmp_path)), (Change.added, str(final_path))}
# Process second batch
await watch_service.handle_changes(test_project, changes2)
# Verify only the final file is in the database
tmp_entity = await sync_service.entity_repository.get_by_file_path("document.tmp")
final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
assert tmp_entity is None, "Temp file should not have an entity"
assert final_entity is not None, "Final file should have an entity"
# Check events
new_events = [e for e in watch_service.state.recent_events if e.action == "new"]
assert len(new_events) == 1
assert new_events[0].path == "document.md"
@pytest.mark.asyncio
async def test_rapid_atomic_writes(watch_service, project_config, test_project, sync_service):
"""Test handling of rapid atomic writes to the same destination."""
project_dir = Path(test_project.path)
# This test simulates multiple rapid atomic writes to the same file:
# 1. Several .tmp files are created one after another
# 2. Each is then renamed to the same final file
# 3. Events are batched and processed together
# Setup file paths
tmp1_path = project_dir / "document.1.tmp"
tmp2_path = project_dir / "document.2.tmp"
final_path = project_dir / "document.md"
# Create multiple temp files that will be used in sequence
await create_test_file(tmp1_path, "First version")
await create_test_file(tmp2_path, "Second version")
# Simulate the first atomic write
tmp1_path.replace(final_path)
# Brief pause to ensure file system registers the change
await asyncio.sleep(0.1)
# Read content to verify
content1 = final_path.read_text(encoding="utf-8")
assert content1 == "First version"
# Simulate the second atomic write
tmp2_path.replace(final_path)
# Verify content was updated
content2 = final_path.read_text(encoding="utf-8")
assert content2 == "Second version"
# Create a batch of changes that might arrive in mixed order
changes = {
(Change.added, str(tmp1_path)),
(Change.deleted, str(tmp1_path)),
(Change.added, str(tmp2_path)),
(Change.deleted, str(tmp2_path)),
(Change.added, str(final_path)),
(Change.modified, str(final_path)),
}
# Process all changes
await watch_service.handle_changes(test_project, changes)
# Verify only the final file is in the database
final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
assert final_entity is not None
# Also verify no tmp entities were created
tmp1_entity = await sync_service.entity_repository.get_by_file_path("document.1.tmp")
tmp2_entity = await sync_service.entity_repository.get_by_file_path("document.2.tmp")
assert tmp1_entity is None
assert tmp2_entity is None