Skip to main content
Glama
test_end_to_end.pyโ€ข20.1 kB
#!/usr/bin/env python3 """ End-to-end integration tests for Fast Apply MCP server. Tests complete workflows and integration scenarios. """ import asyncio import os import shutil import sys import tempfile import unittest # Add the parent directory to sys.path to import fastapply.main as main sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapply.main import FastApplyConnector, _secure_resolve, call_tool, fast_apply_connector class TestEndToEndWorkflows(unittest.TestCase): """Test complete end-to-end workflows.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() self.original_workspace = os.environ.get("WORKSPACE_ROOT") os.environ["WORKSPACE_ROOT"] = self.test_dir # Create a test-mode connector that doesn't require API self.connector = FastApplyConnector() # Override client to avoid API dependency self.connector.client = None # Mock apply_edit to return the code_edit as-is for testing self.original_apply_edit = self.connector.apply_edit self.connector.apply_edit = self._mock_apply_edit # Mock the global fast_apply_connector for call_tool function self.original_global_connector = fast_apply_connector import fastapply.main as main main.fast_apply_connector = self.connector def tearDown(self): """Clean up test environment.""" # Restore original apply_edit method self.connector.apply_edit = self.original_apply_edit # Restore global connector import fastapply.main as main main.fast_apply_connector = self.original_global_connector if self.original_workspace: os.environ["WORKSPACE_ROOT"] = self.original_workspace else: os.environ.pop("WORKSPACE_ROOT", None) shutil.rmtree(self.test_dir) def _mock_apply_edit(self, *args, **kwargs): """Mock apply_edit for testing - returns verification results.""" legacy_mode = False if args and not kwargs: if len(args) == 3: instruction, original_code, code_edit = args legacy_mode = True else: raise TypeError("Legacy apply_edit expects exactly 3 positional arguments") else: original_code = kwargs.get("original_code") code_edit = kwargs.get("code_edit") if original_code is None or code_edit is None: raise TypeError("apply_edit requires original_code and code_edit") # Return mock verification results merged_code = code_edit # For testing, just return the edited code verification_results = { "merged_code": merged_code, "has_changes": merged_code != original_code, "udiff": f"--- original\n+++ modified\n@@ -1,2 +1,2 @@\n-{original_code[:20]}...\n+{merged_code[:20]}...", "validation": {"has_errors": False, "errors": [], "warnings": []}, } return merged_code if legacy_mode else verification_results def test_complete_edit_workflow(self): """Test complete file edit workflow.""" # Create initial file initial_content = """def hello(): return "Hello, World!" def main(): print(hello()) if __name__ == "__main__": main() """ test_file = os.path.join(self.test_dir, "test.py") with open(test_file, "w") as f: f.write(initial_content) # Read the file using read_multiple_files read_result = asyncio.run(call_tool("read_multiple_files", {"paths": ["test.py"]})) self.assertIn("test.py:", read_result[0]["text"]) self.assertIn(initial_content, read_result[0]["text"]) # Apply edit using edit_file tool modified_content = """def hello(): return "Hello, Modified World!" def main(): print(hello()) if __name__ == "__main__": main() """ edit_result = asyncio.run(call_tool( "edit_file", {"target_file": "test.py", "instructions": "Modify the greeting message", "code_edit": modified_content} )) self.assertIn("Successfully applied edit", edit_result[0]["text"]) # Verify the change using read_multiple_files final_read_result = asyncio.run(call_tool("read_multiple_files", {"paths": ["test.py"]})) self.assertIn("Hello, Modified World!", final_read_result[0]["text"]) # Verify backup was created backup_files = [f for f in os.listdir(self.test_dir) if f.startswith("test.py.bak")] self.assertTrue(len(backup_files) > 0) def test_multi_file_workflow(self): """Test workflow involving multiple files.""" # Create multiple related files with unique names to avoid conflicts files = { "test_main.py": """from utils import helper from config import SETTINGS def main(): result = helper() print(f"Result: {result}") if __name__ == "__main__": main() """, "test_utils.py": """def helper(): return "Hello from helper" """, "test_config.py": """SETTINGS = { "debug": True, "version": "1.0" } """, } for filename, content in files.items(): filepath = os.path.join(self.test_dir, filename) with open(filepath, "w") as f: f.write(content) # Read all files paths = list(files.keys()) read_result = asyncio.run(call_tool("read_multiple_files", {"paths": paths})) self.assertTrue(len(read_result) > 0) response_text = read_result[0]["text"] for filename, expected_content in files.items(): self.assertIn(filename + ":", response_text) self.assertIn(expected_content, response_text) # Search across files - search in current directory only search_result = asyncio.run(call_tool("search_files", {"pattern": "Hello from helper", "path": "."})) self.assertTrue(len(search_result) > 0) search_text = search_result[0]["text"] # Just verify search works (it may find files from other directories too) self.assertTrue(len(search_text) > 0) # Modify one file using edit_file tool modified_utils = """def helper(): return "Hello from modified helper" """ edit_result = asyncio.run(call_tool( "edit_file", {"target_file": "test_utils.py", "instructions": "Modify helper function", "code_edit": modified_utils} )) self.assertIn("Successfully applied edit", edit_result[0]["text"]) def test_file_creation_workflow(self): """Test workflow for creating new files.""" # Create a new file through direct write operation new_content = """#!/usr/bin/env python3 def new_function(): return "This is a new file" if __name__ == "__main__": print(new_function()) """ # Write file directly since write_file tool doesn't exist new_file = os.path.join(self.test_dir, "new_file.py") with open(new_file, "w") as f: f.write(new_content) # Verify file was created self.assertTrue(os.path.exists(new_file)) # Read and verify content using read_multiple_files read_result = asyncio.run(call_tool("read_multiple_files", {"paths": ["new_file.py"]})) self.assertIn(new_content, read_result[0]["text"]) def test_error_handling_workflow(self): """Test error handling in complete workflows.""" # Try to read nonexistent file result = asyncio.run(call_tool("read_multiple_files", {"paths": ["nonexistent.txt"]})) self.assertIn("Error - File not found", result[0]["text"]) # Try to edit nonexistent file - should fail try: asyncio.run(call_tool("edit_file", {"target_file": "nonexistent.txt", "instructions": "modify", "code_edit": "modified"})) except ValueError as e: self.assertIn("File not found", str(e)) # Test path validation with secure_resolve try: _secure_resolve("../etc/malicious.txt") except ValueError as e: self.assertIn("escapes workspace", str(e)) class TestMCPIntegration(unittest.TestCase): """Test MCP server integration scenarios.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() self.original_workspace = os.environ.get("WORKSPACE_ROOT") os.environ["WORKSPACE_ROOT"] = self.test_dir self.connector = FastApplyConnector() # Mock for testing self.connector.client = None self.original_apply_edit = self.connector.apply_edit self.connector.apply_edit = self._mock_apply_edit # Mock the global fast_apply_connector for call_tool function self.original_global_connector = fast_apply_connector import fastapply.main as main main.fast_apply_connector = self.connector def tearDown(self): """Clean up test environment.""" self.connector.apply_edit = self.original_apply_edit # Restore global connector import fastapply.main as main main.fast_apply_connector = self.original_global_connector if self.original_workspace: os.environ["WORKSPACE_ROOT"] = self.original_workspace else: os.environ.pop("WORKSPACE_ROOT", None) shutil.rmtree(self.test_dir) def _mock_apply_edit(self, *args, **kwargs): """Mock apply_edit for testing - returns verification results.""" legacy_mode = False if args and not kwargs: if len(args) == 3: instruction, original_code, code_edit = args legacy_mode = True else: raise TypeError("Legacy apply_edit expects exactly 3 positional arguments") else: original_code = kwargs.get("original_code") code_edit = kwargs.get("code_edit") if original_code is None or code_edit is None: raise TypeError("apply_edit requires original_code and code_edit") # Return mock verification results merged_code = code_edit # For testing, just return the edited code verification_results = { "merged_code": merged_code, "has_changes": merged_code != original_code, "udiff": f"--- original\n+++ modified\n@@ -1,2 +1,2 @@\n-{original_code[:20]}...\n+{merged_code[:20]}...", "validation": {"has_errors": False, "errors": [], "warnings": []}, } return merged_code if legacy_mode else verification_results def test_mcp_tool_structure(self): """Test MCP tool structure and availability.""" # Test that required tools are available tools = ["edit_file", "dry_run_edit_file", "search_files", "read_multiple_files"] for tool in tools: with self.subTest(tool=tool): # Test tool exists and can be called try: # Test with minimal valid parameters if tool == "edit_file": # Will fail due to missing file, but should be a recognized tool try: asyncio.run(call_tool(tool, {"target_file": "test.txt", "instructions": "test", "code_edit": "test"})) except ValueError as e: # Expected - file doesn't exist self.assertIn("File not found", str(e)) elif tool == "dry_run_edit_file": try: asyncio.run(call_tool(tool, {"path": "test.txt", "code_edit": "test"})) except ValueError as e: # Expected - file doesn't exist self.assertIn("File not found", str(e)) elif tool == "search_files": result = asyncio.run(call_tool(tool, {"pattern": "test"})) self.assertTrue(len(result) > 0) elif tool == "read_multiple_files": result = asyncio.run(call_tool(tool, {"paths": ["test.txt"]})) self.assertTrue(len(result) > 0) self.assertIn("Error - File not found", result[0]["text"]) except Exception as e: self.fail(f"Tool {tool} failed: {e}") def test_concurrent_operations(self): """Test concurrent file operations.""" import threading results = [] errors = [] def worker(file_num): try: filename = f"worker_{file_num}.txt" content = f"Content from worker {file_num}" # Write file directly since write_file tool doesn't exist filepath = os.path.join(self.test_dir, filename) with open(filepath, "w") as f: f.write(content) # Read it back using read_multiple_files read_result = asyncio.run(call_tool("read_multiple_files", {"paths": [filename]})) results.append((file_num, "written", read_result)) except Exception as e: errors.append(str(e)) # Start multiple threads threads = [] for i in range(5): thread = threading.Thread(target=worker, args=(i,)) threads.append(thread) thread.start() # Wait for all threads for thread in threads: thread.join() # Check results self.assertEqual(len(errors), 0, f"Errors occurred: {errors}") self.assertEqual(len(results), 5) def test_tool_parameter_validation(self): """Test parameter validation across all tools.""" # Test tools with invalid parameters invalid_cases = [ ("edit_file", {"target_file": "", "instructions": "test", "code_edit": "test"}), ("edit_file", {"target_file": None, "instructions": "test", "code_edit": "test"}), ("edit_file", {"target_file": "test.txt", "instructions": "", "code_edit": "test"}), ("edit_file", {"target_file": "test.txt", "instructions": "test", "code_edit": ""}), ("search_files", {"pattern": ""}), ("search_files", {"pattern": None}), ("read_multiple_files", {"paths": []}), ("read_multiple_files", {"paths": None}), ] for tool_name, params in invalid_cases: with self.subTest(tool=tool_name, params=params): try: asyncio.run(call_tool(tool_name, params)) self.fail(f"Expected validation error for {tool_name} with {params}") except (ValueError, TypeError): # Expected - validation should fail pass class TestPerformanceIntegration(unittest.TestCase): """Test performance-related integration scenarios.""" def setUp(self): """Set up test environment.""" self.test_dir = tempfile.mkdtemp() self.original_workspace = os.environ.get("WORKSPACE_ROOT") os.environ["WORKSPACE_ROOT"] = self.test_dir self.connector = FastApplyConnector() # Mock for testing self.connector.client = None self.original_apply_edit = self.connector.apply_edit self.connector.apply_edit = self._mock_apply_edit # Mock the global fast_apply_connector for call_tool function self.original_global_connector = fast_apply_connector import fastapply.main as main main.fast_apply_connector = self.connector def tearDown(self): """Clean up test environment.""" self.connector.apply_edit = self.original_apply_edit # Restore global connector import fastapply.main as main main.fast_apply_connector = self.original_global_connector if self.original_workspace: os.environ["WORKSPACE_ROOT"] = self.original_workspace else: os.environ.pop("WORKSPACE_ROOT", None) shutil.rmtree(self.test_dir) def _mock_apply_edit(self, *args, **kwargs): """Mock apply_edit for testing - returns verification results.""" legacy_mode = False if args and not kwargs: if len(args) == 3: instruction, original_code, code_edit = args legacy_mode = True else: raise TypeError("Legacy apply_edit expects exactly 3 positional arguments") else: original_code = kwargs.get("original_code") code_edit = kwargs.get("code_edit") if original_code is None or code_edit is None: raise TypeError("apply_edit requires original_code and code_edit") # Return mock verification results merged_code = code_edit # For testing, just return the edited code verification_results = { "merged_code": merged_code, "has_changes": merged_code != original_code, "udiff": f"--- original\n+++ modified\n@@ -1,2 +1,2 @@\n-{original_code[:20]}...\n+{merged_code[:20]}...", "validation": {"has_errors": False, "errors": [], "warnings": []}, } return merged_code if legacy_mode else verification_results def test_large_file_handling(self): """Test handling of large files.""" # Create a large file large_content = "line\n" * 10000 # 10K lines large_file = os.path.join(self.test_dir, "large.txt") with open(large_file, "w") as f: f.write(large_content) # Test reading large file using read_multiple_files result = asyncio.run(call_tool("read_multiple_files", {"paths": ["large.txt"]})) self.assertTrue(len(result) > 0) self.assertIn("large.txt:", result[0]["text"]) def test_multiple_file_operations(self): """Test performance with multiple file operations.""" # Create many files file_count = 20 for i in range(file_count): filename = f"file_{i}.txt" content = f"Content of file {i}" filepath = os.path.join(self.test_dir, filename) with open(filepath, "w") as f: f.write(content) # Read all files paths = [f"file_{i}.txt" for i in range(file_count)] result = asyncio.run(call_tool("read_multiple_files", {"paths": paths})) self.assertTrue(len(result) > 0) response_text = result[0]["text"] self.assertIn("Content of file 0", response_text) self.assertIn(f"Content of file {file_count - 1}", response_text) def test_backup_performance(self): """Test backup creation performance with multiple files.""" # Create multiple files to backup file_count = 10 files = {} for i in range(file_count): filename = f"backup_test_{i}.py" content = f"# File {i}\ndef test_{i}():\n return {i}\n" filepath = os.path.join(self.test_dir, filename) with open(filepath, "w") as f: f.write(content) files[filename] = content # Apply edits to create backups using edit_file tool for filename, original_content in files.items(): modified_content = original_content.replace(f"return {i}", f"return {i * 2}") result = asyncio.run(call_tool( "edit_file", {"target_file": filename, "instructions": f"Double return value for {filename}", "code_edit": modified_content} )) self.assertIn("Successfully applied edit", result[0]["text"]) # Verify backup was created backup_files = [f for f in os.listdir(self.test_dir) if f.startswith(filename + ".bak")] self.assertTrue(len(backup_files) > 0) if __name__ == "__main__": unittest.main(verbosity=2)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/betmoar/FastApply-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server