Skip to main content
Glama
test_batch_operations_integration_fixed.pyโ€ข18.3 kB
#!/usr/bin/env python3 """ Fixed integration tests for batch operations tools. Tests the integration of batch operations with the main MCP server and other FastApply components, including real-world scenarios and error handling. Phase 6 Implementation Tests - Batch Operations Integration (FIXED) """ import asyncio import shutil import tempfile import time import unittest from pathlib import Path from fastapply.batch_operations import ( BatchConfig, ) from fastapply.main import call_tool class TestBatchOperationsMCPIntegration(unittest.TestCase): """Test integration of batch operations with MCP server.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() self.config = BatchConfig(max_concurrent_operations=2) def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir, ignore_errors=True) def create_test_project(self): """Create a test project with multiple files.""" test_files = { "main.py": """ import utils import config def main(): result = utils.calculate_total(config.PRICES) print(f"Total: {result}") return result if __name__ == "__main__": main() """, "utils.py": """ import config def calculate_total(prices): total = sum(prices) return total def calculate_average(prices): if not prices: return 0 return sum(prices) / len(prices) """, "config.py": """ PRICES = [10, 20, 30, 40, 50] def get_config(): return {"prices": PRICES} """, } for file_name, content in test_files.items(): file_path = Path(self.temp_dir) / file_name file_path.write_text(content.strip()) return self.temp_dir async def test_analyze_project_batches_mcp_tool(self): """Test analyze_project_batches MCP tool.""" project_path = self.create_test_project() arguments = {"project_path": project_path, "analysis_types": ["symbols", "dependencies", "quality"], "max_workers": 2} result = await call_tool("analyze_project_batches", arguments) self.assertIsInstance(result, list) self.assertEqual(len(result), 1) # Should return one text response # The result should contain JSON with analysis data response_text = result[0]["text"] self.assertIn("request_id", response_text) self.assertIn("project_path", response_text) self.assertIn("total_files", response_text) async def test_schedule_batch_operations_mcp_tool(self): """Test schedule_batch_operations MCP tool.""" operations = [ { "operation_id": "test_analysis_1", "operation_type": "analyze", "target_path": self.temp_dir, "parameters": {"analysis_types": ["symbols"]}, }, { "operation_id": "test_analysis_2", "operation_type": "analyze", "target_path": self.temp_dir, "parameters": {"analysis_types": ["dependencies"]}, }, ] arguments = {"operations": operations, "schedule_mode": "parallel", "max_concurrent": 2} result = await call_tool("schedule_batch_operations", arguments) self.assertIsInstance(result, list) async def test_monitor_batch_progress_mcp_tool(self): """Test monitor_batch_progress MCP tool.""" operation_id = "test_monitoring" arguments = {"operation_id": operation_id, "include_details": True} result = await call_tool("monitor_batch_progress", arguments) self.assertIsInstance(result, list) async def test_get_batch_results_mcp_tool(self): """Test get_batch_results MCP tool.""" operation_id = "test_results" arguments = {"operation_id": operation_id, "include_details": True} result = await call_tool("get_batch_results", arguments) self.assertIsInstance(result, list) async def test_execute_batch_rename_mcp_tool(self): """Test execute_batch_rename MCP tool.""" project_path = self.create_test_project() # Fixed: remove update_references parameter to match implementation rename_operations = [ {"old_name": "calculate_total", "new_name": "compute_total", "symbol_type": "function"} ] arguments = { "rename_operations": rename_operations, "project_path": project_path, "dry_run": True, # Test with dry run first } result = await call_tool("execute_batch_rename", arguments) self.assertIsInstance(result, list) async def test_batch_extract_components_mcp_tool(self): """Test batch_extract_components MCP tool.""" project_path = self.create_test_project() extractions = [{"pattern": "def calculate_*", "target_file": "extracted_functions.py", "symbol_type": "function"}] arguments = {"extractions": extractions, "project_path": project_path, "manage_imports": True, "dry_run": True} result = await call_tool("batch_extract_components", arguments) self.assertIsInstance(result, list) async def test_batch_apply_pattern_transformation_mcp_tool(self): """Test batch_apply_pattern_transformation MCP tool.""" project_path = self.create_test_project() arguments = { "pattern": r"def (\w+)\(.*\):", "replacement": r"def \1(*args, **kwargs):", "project_path": project_path, "file_patterns": ["*.py"], "dry_run": True, } result = await call_tool("batch_apply_pattern_transformation", arguments) self.assertIsInstance(result, list) async def test_batch_create_backup_mcp_tool(self): """Test batch_create_backup MCP tool.""" project_path = self.create_test_project() arguments = { "project_path": project_path, "backup_name": "test_backup", "include_metadata": True } result = await call_tool("batch_create_backup", arguments) self.assertIsInstance(result, list) class TestBatchOperationsErrorHandling(unittest.TestCase): """Test error handling in batch operations.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() self.config = BatchConfig(max_concurrent_operations=2) def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir, ignore_errors=True) async def test_invalid_project_path_handling(self): """Test handling of invalid project paths.""" arguments = {"project_path": "/nonexistent/path", "analysis_types": ["symbols"]} # Should handle gracefully without crashing result = await call_tool("analyze_project_batches", arguments) self.assertIsInstance(result, list) # Check that error response contains appropriate error information self.assertIn("error", result[0]["text"].lower()) async def test_memory_limit_handling(self): """Test handling of memory limits.""" # Fixed: use max_memory_usage_mb instead of memory_limit_mb _ = BatchConfig(max_concurrent_operations=1, max_memory_usage_mb=1) # Very low limit project_path = self.temp_dir arguments = {"project_path": project_path, "analysis_types": ["symbols"], "max_workers": 1} result = await call_tool("analyze_project_batches", arguments) self.assertIsInstance(result, list) async def test_permission_error_handling(self): """Test handling of permission errors.""" # Create a read-only directory readonly_dir = Path(self.temp_dir) / "readonly" readonly_dir.mkdir() readonly_dir.chmod(0o444) # Read-only arguments = {"project_path": str(readonly_dir), "analysis_types": ["symbols"]} # Should handle gracefully without crashing result = await call_tool("analyze_project_batches", arguments) self.assertIsInstance(result, list) # Check that error response contains appropriate error information self.assertIn("error", result[0]["text"].lower()) def test_timeout_handling(self): """Test handling of operation timeouts.""" # Create a config with very short timeout config = BatchConfig(timeout_seconds=1) # This test doesn't require async call_tool self.assertTrue(config.timeout_seconds == 1) class TestBatchOperationsPerformance(unittest.TestCase): """Test performance characteristics of batch operations.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() self.config = BatchConfig(max_concurrent_operations=4) def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir, ignore_errors=True) def create_large_test_project(self): """Create a larger test project for performance testing.""" base_dir = Path(self.temp_dir) # Create multiple Python files for i in range(20): file_content = f""" # File {i} import os import sys def function_{i}(): result = {i} * 2 return result class Class_{i}: def method_{i}(self): return function_{i}() def main(): return Class_{i}().method_{i}() if __name__ == "__main__": main() """ file_path = base_dir / f"file_{i}.py" file_path.write_text(file_content.strip()) return str(base_dir) async def test_concurrent_operation_performance(self): """Test performance of concurrent operations.""" project_path = self.create_large_test_project() start_time = time.time() arguments = { "project_path": project_path, "analysis_types": ["symbols", "dependencies"], "max_workers": 4 } result = await call_tool("analyze_project_batches", arguments) execution_time = time.time() - start_time self.assertIsInstance(result, list) # Should complete within reasonable time (adjust as needed) self.assertLess(execution_time, 30.0) # 30 seconds max async def test_large_project_analysis_performance(self): """Test performance on large projects.""" project_path = self.create_large_test_project() start_time = time.time() arguments = { "project_path": project_path, "analysis_types": ["symbols"], "max_workers": 2 } result = await call_tool("analyze_project_batches", arguments) execution_time = time.time() - start_time self.assertIsInstance(result, list) # Should handle 20 files efficiently self.assertLess(execution_time, 15.0) def test_memory_usage_efficiency(self): """Test memory usage efficiency without psutil.""" # Since we can't use psutil, test the configuration instead config = BatchConfig(max_memory_usage_mb=1024) # Verify the configuration is set correctly self.assertEqual(config.max_memory_usage_mb, 1024) self.assertTrue(config.max_memory_usage_mb > 0) class TestBatchOperationsRealWorldScenarios(unittest.TestCase): """Test batch operations with real-world scenarios.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() self.config = BatchConfig(max_concurrent_operations=2) def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir, ignore_errors=True) def create_web_app_project(self): """Create a realistic web application project structure.""" base_dir = Path(self.temp_dir) # Create project structure dirs = ["src", "tests", "docs", "config"] for dir_name in dirs: (base_dir / dir_name).mkdir() # Create source files with cross-dependencies files = { "src/__init__.py": "", "src/app.py": """ from flask import Flask from src.routes import main_routes, auth_routes from src.models import User, Post from src.config import Config app = Flask(__name__) app.config.from_object(Config) # Register blueprints app.register_blueprint(main_routes) app.register_blueprint(auth_routes) def create_app(): return app if __name__ == "__main__": app.run() """, "src/routes/__init__.py": "", "src/routes/main.py": """ from flask import Blueprint, render_template from src.models import Post main_routes = Blueprint('main', __name__) @main_routes.route('/') def index(): posts = Post.query.all() return render_template('index.html', posts=posts) @main_routes.route('/about') def about(): return render_template('about.html') """, "src/routes/auth.py": """ from flask import Blueprint, render_template, redirect, url_for from src.models import User from src.config import Config auth_routes = Blueprint('auth', __name__) @auth_routes.route('/login') def login(): return render_template('login.html') @auth_routes.route('/register') def register(): return render_template('register.html') """, "src/models/__init__.py": "", "src/models/user.py": """ from datetime import datetime from src.config import db class User: def __init__(self, username, email): self.username = username self.email = email self.created_at = datetime.now() def save(self): # Save to database pass def authenticate(self, password): # Authentication logic return True """, "src/models/post.py": """ from datetime import datetime from src.models.user import User class Post: def __init__(self, title, content, author): self.title = title self.content = content self.author = author self.created_at = datetime.now() def save(self): # Save to database pass def publish(self): # Publishing logic self.published = True self.save() """, "src/config.py": """ import os class Config: SECRET_KEY = os.environ.get('SECRET_KEY', 'dev-key-123') DATABASE_URL = os.environ.get('DATABASE_URL', 'sqlite:///app.db') DEBUG = os.environ.get('DEBUG', 'True').lower() == 'true' def get_config(): return Config """, "tests/__init__.py": "", "tests/test_models.py": """ import unittest from src.models.user import User from src.models.post import Post class TestModels(unittest.TestCase): def test_user_creation(self): user = User('testuser', 'test@example.com') self.assertEqual(user.username, 'testuser') def test_post_creation(self): user = User('testuser', 'test@example.com') post = Post('Test Post', 'Test content', user) self.assertEqual(post.title, 'Test Post') """, } for file_path, content in files.items(): full_path = base_dir / file_path full_path.parent.mkdir(parents=True, exist_ok=True) full_path.write_text(content.strip()) return str(base_dir) async def test_cross_module_dependency_analysis(self): """Test analysis of projects with complex cross-module dependencies.""" project_path = self.create_web_app_project() arguments = { "project_path": project_path, "analysis_types": ["symbols", "dependencies"], "max_workers": 2 } result = await call_tool("analyze_project_batches", arguments) self.assertIsInstance(result, list) # Check that results contain dependency information response_text = result[0]["text"] self.assertIn("dependencies", response_text.lower()) async def test_package_refactoring_scenario(self): """Test a realistic package refactoring scenario.""" project_path = self.create_web_app_project() # Rename the 'models' package to 'entities' rename_operations = [ {"old_name": "models", "new_name": "entities", "symbol_type": "module"}, {"old_name": "User", "new_name": "User", "symbol_type": "class", "scope": "models"}, {"old_name": "Post", "new_name": "Post", "symbol_type": "class", "scope": "models"}, ] arguments = { "rename_operations": rename_operations, "project_path": project_path, "dry_run": True } result = await call_tool("execute_batch_rename", arguments) self.assertIsInstance(result, list) async def test_python_package_analysis(self): """Test analysis of a complete Python package.""" project_path = self.create_web_app_project() arguments = { "project_path": project_path, "analysis_types": ["symbols", "dependencies", "quality"], "max_workers": 3 } result = await call_tool("analyze_project_batches", arguments) self.assertIsInstance(result, list) # Check that comprehensive analysis was performed response_text = result[0]["text"] self.assertIn("symbols", response_text.lower()) self.assertIn("dependencies", response_text.lower()) self.assertIn("quality", response_text.lower()) # Create a test suite that can run async tests def create_async_test_suite(): """Create a test suite that can handle async tests.""" suite = unittest.TestSuite() # Add all the test classes test_classes = [ TestBatchOperationsMCPIntegration, TestBatchOperationsErrorHandling, TestBatchOperationsPerformance, TestBatchOperationsRealWorldScenarios, ] for test_class in test_classes: tests = unittest.TestLoader().loadTestsFromTestCase(test_class) for test in tests: if asyncio.iscoroutinefunction(test._testMethodName): # Convert async test to sync test._testMethodName = test._testMethodName suite.addTest(test) return suite if __name__ == "__main__": # Run the tests unittest.main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/betmoar/FastApply-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server