Skip to main content
Glama
test_file_operations.pyโ€ข17.4 kB
#!/usr/bin/env python3 """ File operations tests for Fast Apply MCP server. Tests file reading, writing, validation, and manipulation operations. """ import asyncio import os import shutil import sys import tempfile import unittest from unittest.mock import patch # Add the parent directory to sys.path to import main sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapply.main import FastApplyConnector, _atomic_write, call_tool, validate_code_quality class TestFileReadOperations(unittest.TestCase): """Test file reading operations.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() self.original_cwd = os.getcwd() os.chdir(self.test_dir) def tearDown(self): """Clean up test environment.""" os.chdir(self.original_cwd) shutil.rmtree(self.test_dir) def test_read_file_basic(self): """Test basic file reading functionality.""" content = "test file content\nline 2\nline 3" test_file = "test.txt" with open(test_file, "w") as f: f.write(content) # Test reading through MCP tool - note: read_file is not a supported tool # The available tools are: edit_file, read_multiple_files, dry_run_edit_file, search_files # Let's test read_multiple_files instead result = asyncio.run(call_tool("read_multiple_files", {"paths": ["test.txt"]})) self.assertIn("text", result[0]) self.assertIn("test file content", result[0]["text"]) def test_read_file_nonexistent(self): """Test reading nonexistent file.""" result = asyncio.run(call_tool("read_multiple_files", {"paths": ["nonexistent.txt"]})) self.assertIn("text", result[0]) self.assertIn("Error - File not found", result[0]["text"]) def test_read_file_security_violation(self): """Test file reading with security violations.""" # Test path traversal attempt result = asyncio.run(call_tool("read_multiple_files", {"paths": ["../etc/passwd"]})) self.assertIn("text", result[0]) self.assertIn("error", result[0]["text"].lower()) def test_read_multiple_files(self): """Test reading multiple files.""" files_to_create = [ ("file1.txt", "content1"), ("file2.txt", "content2"), ("file3.txt", "content3") ] for filename, content in files_to_create: with open(filename, "w") as f: f.write(content) # Test reading multiple files paths = ["file1.txt", "file2.txt", "file3.txt"] result = asyncio.run(call_tool("read_multiple_files", {"paths": paths})) self.assertIn("text", result[0]) response_text = result[0]["text"] self.assertIn("content1", response_text) self.assertIn("content2", response_text) self.assertIn("content3", response_text) def test_read_multiple_files_empty_paths(self): """Test reading multiple files with empty paths.""" # This should raise an exception as paths parameter is required and must be non-empty with self.assertRaises(ValueError) as cm: asyncio.run(call_tool("read_multiple_files", {"paths": []})) self.assertIn("paths parameter is required", str(cm.exception)) def test_read_multiple_files_mixed_success_failure(self): """Test reading multiple files with some failures.""" # Create one valid file valid_file = "valid.txt" with open(valid_file, "w") as f: f.write("valid content") # Test with valid and invalid paths paths = ["valid.txt", "nonexistent.txt", "../etc/passwd"] result = asyncio.run(call_tool("read_multiple_files", {"paths": paths})) self.assertIn("text", result[0]) response_text = result[0]["text"] self.assertIn("valid content", response_text) self.assertIn("Error - File not found", response_text) class TestFileWriteOperations(unittest.TestCase): """Test file writing operations.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() self.original_cwd = os.getcwd() os.chdir(self.test_dir) def tearDown(self): """Clean up test environment.""" os.chdir(self.original_cwd) shutil.rmtree(self.test_dir) def test_atomic_write_success(self): """Test successful atomic write.""" content = "test content for atomic write" with tempfile.NamedTemporaryFile(mode="w", delete=False, dir=self.test_dir) as tmp: tmp_path = tmp.name try: # Atomic write should succeed (no exception means success) _atomic_write(tmp_path, content) # Verify content was written with open(tmp_path, "r") as f: written_content = f.read() self.assertEqual(written_content, content) finally: if os.path.exists(tmp_path): os.unlink(tmp_path) def test_atomic_write_directory_creation(self): """Test atomic write creates parent directories if needed.""" content = "test content for new directory" # Create the parent directory first since _atomic_write doesn't create it nested_dir = os.path.join(self.test_dir, "nested", "subdir") os.makedirs(nested_dir, exist_ok=True) nested_path = os.path.join(nested_dir, "file.txt") # Should write file to existing directory _atomic_write(nested_path, content) # Verify file exists and content is correct self.assertTrue(os.path.exists(nested_path)) with open(nested_path, "r") as f: written_content = f.read() self.assertEqual(written_content, content) def test_atomic_write_permission_error(self): """Test atomic write handles permission errors.""" content = "test content" # Create a read-only directory readonly_path = os.path.join(self.test_dir, "readonly", "file.txt") readonly_dir = os.path.join(self.test_dir, "readonly") os.makedirs(readonly_dir, mode=0o555) # Read-only directory # Should handle permission error gracefully with self.assertRaises(OSError): _atomic_write(readonly_path, content) def test_write_file_basic(self): """Test basic file writing functionality.""" content = "new file content" test_file = "test.txt" # Test atomic write functionality directly since MCP tools require FastApply client _atomic_write(test_file, content) # Verify file was created and content is correct self.assertTrue(os.path.exists(test_file)) with open(test_file, "r") as f: written_content = f.read() self.assertEqual(written_content, content) def test_write_file_security_violation(self): """Test file writing with security violations.""" content = "malicious content" # Test path traversal attempt with self.assertRaises(ValueError) as cm: asyncio.run(call_tool("edit_file", { "path": "../etc/malicious.txt", "code_edit": content, "instruction": "malicious edit" })) self.assertIn("Invalid file path", str(cm.exception)) class TestFileValidation(unittest.TestCase): """Test file validation operations.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.test_dir) def test_validate_supported_file_types(self): """Test validation of supported file types.""" supported_files = [ ("test.py", "python content"), ("test.js", "javascript content"), ("test.ts", "typescript content"), ("test.html", "html content"), ("test.css", "css content"), ("test.json", "json content"), ("test.md", "markdown content"), ("test.txt", "text content") ] for filename, content in supported_files: with self.subTest(filename=filename): result = validate_code_quality(content, filename) # Should not raise exception for supported types self.assertIsInstance(result, dict) def test_validate_unsupported_file_type(self): """Test validation of unsupported file types.""" unsupported_files = [ ("test.exe", "executable content"), ("test.dll", "dll content"), ("test.so", "shared object content"), ("test.bin", "binary content") ] for filename, content in unsupported_files: with self.subTest(filename=filename): # Unsupported file types are handled gracefully, no exception raised result = validate_code_quality(content, filename) self.assertIsInstance(result, dict) self.assertIn("has_errors", result) def test_validate_file_size_limits(self): """Test validation of file size limits.""" # Create a large content that exceeds typical limits large_content = "x" * 1000000 # 1MB # Should handle large content without crashing result = validate_code_quality(large_content, "large.txt") self.assertIsInstance(result, dict) def test_validate_code_quality_metrics(self): """Test code quality validation metrics.""" test_cases = [ ("good.py", "def hello():\n return 'Hello, World!'\n"), ("bad.py", "def x():\n y=1+2\n return y\n"), # Poor formatting ( "complex.py", "def complex_function(a,b,c,d,e,f):\n if a and b and c and d and e and f:\n return True\n return False\n" ) ] for filename, content in test_cases: with self.subTest(filename=filename): result = validate_code_quality(content, filename) self.assertIsInstance(result, dict) self.assertIn("has_errors", result) class TestFileManipulation(unittest.TestCase): """Test file manipulation operations.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() self.original_cwd = os.getcwd() os.chdir(self.test_dir) # Create connector with mock client to avoid initialization issues self.connector = FastApplyConnector() self.connector.client = None # Force test mode def tearDown(self): """Clean up test environment.""" os.chdir(self.original_cwd) shutil.rmtree(self.test_dir) def test_apply_edit_basic(self): """Test basic edit application.""" original_code = "def hello():\n return 'Hello, World!'\n" code_edit = "def hello():\n return 'Hello, Modified World!'\n" instruction = "Modify the greeting message" test_file = "test.py" with open(test_file, "w") as f: f.write(original_code) # Mock the apply_edit to avoid API call and simulate success with patch.object(self.connector, 'apply_edit') as mock_apply: mock_apply.return_value = { "success": True, "modified_code": code_edit, "backup_created": False } result = self.connector.apply_edit( original_code=original_code, code_edit=code_edit, instruction=instruction, file_path="test.py" ) self.assertIn("success", result) self.assertTrue(result["success"]) # Verify the mock was called correctly mock_apply.assert_called_once_with( original_code=original_code, code_edit=code_edit, instruction=instruction, file_path="test.py" ) def test_apply_edit_with_backup(self): """Test edit application with backup creation.""" original_code = "original content" code_edit = "modified content" instruction = "modify content" test_file = "test.txt" with open(test_file, "w") as f: f.write(original_code) # Mock the apply_edit to avoid API call and simulate backup creation with patch.object(self.connector, 'apply_edit') as mock_apply: mock_apply.return_value = { "success": True, "modified_code": code_edit, "backup_created": True, "backup_path": test_file + ".bak_1234567890" # Simulate timestamped backup } result = self.connector.apply_edit( original_code=original_code, code_edit=code_edit, instruction=instruction, file_path="test.txt" ) self.assertIn("success", result) self.assertTrue(result["success"]) self.assertTrue(result["backup_created"]) # Verify mock was called correctly mock_apply.assert_called_once_with( original_code=original_code, code_edit=code_edit, instruction=instruction, file_path="test.txt" ) def test_apply_edit_no_changes(self): """Test edit application when no changes are needed.""" original_code = "def hello():\n return 'Hello, World!'\n" code_edit = original_code # Same content instruction = "no changes needed" test_file = "test.py" with open(test_file, "w") as f: f.write(original_code) # Mock the apply_edit to avoid API call with patch.object(self.connector, 'apply_edit', return_value={ "success": True, "modified_code": original_code, "backup_created": False }): result = self.connector.apply_edit( original_code=original_code, code_edit=code_edit, instruction=instruction, file_path="test.py" ) self.assertIn("success", result) self.assertTrue(result["success"]) # Verify file content unchanged with open(test_file, "r") as f: content = f.read() self.assertEqual(content, original_code) def test_apply_edit_security_violation(self): """Test edit application with security violations.""" original_code = "original content" code_edit = "modified content" instruction = "malicious edit" # Test with path traversal - should raise exception due to client not being initialized with self.assertRaises(RuntimeError) as cm: self.connector.apply_edit( original_code=original_code, code_edit=code_edit, instruction=instruction, file_path="../etc/malicious.txt" ) self.assertIn("Fast Apply client not initialized", str(cm.exception)) class TestFileSearchOperations(unittest.TestCase): """Test file search operations.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() self.original_cwd = os.getcwd() os.chdir(self.test_dir) # Create test files self.test_files = { "test.py": "def hello():\n return 'Hello'\n", "test.js": "function hello() {\n return 'Hello';\n}\n", "README.md": "# Test Project\n\nThis is a test project.\n", "config.json": '{\n "name": "test",\n "version": "1.0"\n}\n' } for filename, content in self.test_files.items(): with open(filename, "w") as f: f.write(content) def tearDown(self): """Clean up test environment.""" os.chdir(self.original_cwd) shutil.rmtree(self.test_dir) def test_search_files_basic(self): """Test basic file search.""" result = asyncio.run(call_tool("search_files", { "path": ".", "pattern": "test" })) self.assertIn("text", result[0]) response_text = result[0]["text"] # Should find both Python and JavaScript files with "test" in the name self.assertIn("test.py", response_text) self.assertIn("test.js", response_text) def test_search_files_no_matches(self): """Test file search with no matches.""" result = asyncio.run(call_tool("search_files", { "path": ".", "pattern": "nonexistent_pattern" })) self.assertIn("text", result[0]) self.assertIn("No matches found", result[0]["text"]) def test_search_files_invalid_path(self): """Test file search with invalid path.""" # Test with invalid path that doesn't exist with self.assertRaises(ValueError) as cm: asyncio.run(call_tool("search_files", { "path": "../nonexistent_path", "pattern": "test" })) self.assertIn("Path escapes workspace", str(cm.exception))

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/betmoar/FastApply-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server