Skip to main content
Glama

mcp-optimizer

test_large_scale_optimization.pyโ€ข16.7 kB
"""Stress tests for large-scale optimization problems. This module contains performance and stress tests for optimization solvers to verify their behavior under high computational load and large problem sizes. """ import gc import logging import multiprocessing import random import time from concurrent.futures import ThreadPoolExecutor, as_completed import psutil import pytest from mcp_optimizer.solvers.ortools_solver import ORToolsSolver from mcp_optimizer.solvers.pulp_solver import PuLPSolver logger = logging.getLogger(__name__) class TestLargeScaleOptimization: """Test suite for large-scale optimization problems.""" @pytest.fixture def ortools_solver(self): """Create OR-Tools solver instance.""" return ORToolsSolver() @pytest.fixture def pulp_solver(self): """Create PuLP solver instance.""" return PuLPSolver() def monitor_memory_usage(self, test_name: str): """Monitor memory usage during test execution.""" process = psutil.Process() initial_memory = process.memory_info().rss / 1024 / 1024 # MB logger.info(f"{test_name}: Initial memory usage: {initial_memory:.2f} MB") return initial_memory def check_memory_usage( self, initial_memory: float, test_name: str, max_memory_mb: float = 2048 ): """Check if memory usage is within acceptable limits.""" process = psutil.Process() current_memory = process.memory_info().rss / 1024 / 1024 # MB memory_increase = current_memory - initial_memory logger.info( f"{test_name}: Current memory usage: {current_memory:.2f} MB " f"(+{memory_increase:.2f} MB)" ) assert current_memory < max_memory_mb, ( f"Memory usage exceeded limit: {current_memory:.2f} MB > {max_memory_mb} MB" ) @pytest.mark.stress @pytest.mark.timeout(300) # 5 minutes timeout def test_large_assignment_problem_1000x1000(self, ortools_solver): """Test 1000x1000 assignment problem - ultimate stress test.""" test_name = "Large Assignment 1000x1000" initial_memory = self.monitor_memory_usage(test_name) # Generate 1000 workers and 1000 tasks workers = [f"worker_{i}" for i in range(1000)] tasks = [f"task_{i}" for i in range(1000)] # Generate random cost matrix random.seed(42) # For reproducibility costs = [[random.randint(1, 100) for _ in range(1000)] for _ in range(1000)] start_time = time.time() try: result = ortools_solver.solve_assignment_problem( workers=workers, tasks=tasks, costs=costs, maximize=False ) execution_time = time.time() - start_time logger.info(f"{test_name}: Execution time: {execution_time:.2f}s") # Verify result structure assert result is not None assert result.get("status").value in ["optimal", "feasible"] assert result.get("execution_time") is not None assert execution_time < 180, f"Execution time too long: {execution_time:.2f}s" # Verify solution quality if result.get("status") == "OPTIMAL": assignments = result.get("assignments", []) assert len(assignments) == 1000, ( f"Expected 1000 assignments, got {len(assignments)}" ) # Check uniqueness assigned_workers = set() assigned_tasks = set() for assignment in assignments: assert assignment["worker"] not in assigned_workers, ( "Duplicate worker assignment" ) assert assignment["task"] not in assigned_tasks, "Duplicate task assignment" assigned_workers.add(assignment["worker"]) assigned_tasks.add(assignment["task"]) self.check_memory_usage(initial_memory, test_name, max_memory_mb=1500) finally: # Force garbage collection gc.collect() @pytest.mark.stress @pytest.mark.timeout(180) # 3 minutes timeout def test_large_assignment_problem_500x500(self, ortools_solver): """Test 500x500 assignment problem - high stress test.""" test_name = "Large Assignment 500x500" initial_memory = self.monitor_memory_usage(test_name) workers = [f"worker_{i}" for i in range(500)] tasks = [f"task_{i}" for i in range(500)] random.seed(123) costs = [[random.randint(1, 100) for _ in range(500)] for _ in range(500)] start_time = time.time() try: result = ortools_solver.solve_assignment_problem( workers=workers, tasks=tasks, costs=costs, maximize=False ) execution_time = time.time() - start_time logger.info(f"{test_name}: Execution time: {execution_time:.2f}s") assert result is not None assert result.get("status").value in ["optimal", "feasible"] assert execution_time < 120, f"Execution time too long: {execution_time:.2f}s" self.check_memory_usage(initial_memory, test_name, max_memory_mb=1000) finally: gc.collect() @pytest.mark.stress @pytest.mark.timeout(300) def test_large_linear_programming_1000_variables(self, pulp_solver): """Test linear programming with 1000 variables.""" test_name = "Large LP 1000 variables" initial_memory = self.monitor_memory_usage(test_name) # Generate large LP problem: maximize c^T * x subject to A*x <= b, x >= 0 num_vars = 1000 num_constraints = 500 random.seed(456) # Generate objective coefficients objective = {f"x_{i}": random.uniform(1, 10) for i in range(num_vars)} # Generate constraints constraints = [] for i in range(num_constraints): constraint = {} # Sparse constraint: only 10% of variables appear in each constraint selected_vars = random.sample(range(num_vars), k=num_vars // 10) for var_idx in selected_vars: constraint[f"x_{var_idx}"] = random.uniform(-5, 5) constraints.append( { "name": f"constraint_{i}", "coefficients": constraint, "operator": "<=", "rhs": random.uniform(10, 100), } ) start_time = time.time() try: # Convert to PuLP format from mcp_optimizer.schemas.base import ( Constraint, ConstraintOperator, Objective, ObjectiveSense, Variable, VariableType, ) # Create variables dict variables = {} for var_name in objective.keys(): variables[var_name] = Variable( name=var_name, type=VariableType.CONTINUOUS, lower=0.0, upper=None ) # Create objective obj = Objective(sense=ObjectiveSense.MAXIMIZE, coefficients=objective) # Convert constraints pulp_constraints = [] for constraint in constraints: pulp_constraints.append( Constraint( name=constraint["name"], expression=constraint["coefficients"], operator=ConstraintOperator.LE, rhs=constraint["rhs"], ) ) result = pulp_solver.solve_linear_program( objective=obj, variables=variables, constraints=pulp_constraints ) execution_time = time.time() - start_time logger.info(f"{test_name}: Execution time: {execution_time:.2f}s") assert result is not None assert result.get("status").value in ["optimal", "feasible", "unbounded"] assert execution_time < 240, f"Execution time too long: {execution_time:.2f}s" # Verify solution structure if result.get("status") == "OPTIMAL": variables_result = result.get("variables", {}) assert len(variables_result) <= num_vars self.check_memory_usage(initial_memory, test_name, max_memory_mb=1200) finally: gc.collect() @pytest.mark.stress @pytest.mark.timeout(120) def test_concurrent_solver_requests(self, ortools_solver): """Test concurrent solver requests to verify thread safety.""" test_name = "Concurrent Solver Requests" initial_memory = self.monitor_memory_usage(test_name) # Generate smaller problems for concurrent execution num_workers = 4 problem_size = 50 def solve_assignment_problem(): workers = [f"w_{i}" for i in range(problem_size)] tasks = [f"t_{i}" for i in range(problem_size)] costs = [ [random.randint(1, 50) for _ in range(problem_size)] for _ in range(problem_size) ] return ortools_solver.solve_assignment_problem( workers=workers, tasks=tasks, costs=costs ) start_time = time.time() try: with ThreadPoolExecutor(max_workers=num_workers) as executor: # Submit 20 concurrent problems futures = [executor.submit(solve_assignment_problem) for _ in range(20)] results = [] for future in as_completed(futures): result = future.result() results.append(result) assert result is not None assert result.get("status").value in ["optimal", "feasible", "infeasible"] execution_time = time.time() - start_time logger.info(f"{test_name}: Total execution time: {execution_time:.2f}s") assert len(results) == 20 assert execution_time < 60, f"Concurrent execution too slow: {execution_time:.2f}s" self.check_memory_usage(initial_memory, test_name, max_memory_mb=800) finally: gc.collect() @pytest.mark.stress @pytest.mark.timeout(180) def test_memory_constrained_optimization(self, ortools_solver): """Test optimization under memory constraints.""" test_name = "Memory Constrained Optimization" initial_memory = self.monitor_memory_usage(test_name) # Generate multiple medium-sized problems sequentially for iteration in range(10): problem_size = 200 workers = [f"worker_{i}_{iteration}" for i in range(problem_size)] tasks = [f"task_{i}_{iteration}" for i in range(problem_size)] random.seed(iteration) costs = [ [random.randint(1, 100) for _ in range(problem_size)] for _ in range(problem_size) ] result = ortools_solver.solve_assignment_problem( workers=workers, tasks=tasks, costs=costs ) assert result is not None assert result.get("status").value in ["optimal", "feasible"] # Force garbage collection after each iteration gc.collect() # Check memory doesn't grow excessively current_memory = psutil.Process().memory_info().rss / 1024 / 1024 memory_growth = current_memory - initial_memory assert memory_growth < 500, f"Memory growth too large: {memory_growth:.2f} MB" logger.info(f"{test_name}: Successfully completed 10 iterations") @pytest.mark.stress @pytest.mark.timeout(240) def test_worst_case_assignment_scenarios(self, ortools_solver): """Test worst-case scenarios for assignment problems.""" test_name = "Worst Case Assignment" initial_memory = self.monitor_memory_usage(test_name) scenarios = [ # Scenario 1: All costs are identical (many optimal solutions) {"name": "identical_costs", "size": 100, "cost_generator": lambda i, j: 10}, # Scenario 2: Costs follow extreme distribution { "name": "extreme_costs", "size": 100, "cost_generator": lambda i, j: 1 if i == j else 1000, }, # Scenario 3: Random sparse costs (many zeros) { "name": "sparse_costs", "size": 100, "cost_generator": lambda i, j: random.choice([0, 0, 0, random.randint(1, 100)]), }, ] for scenario in scenarios: logger.info(f"Testing scenario: {scenario['name']}") size = scenario["size"] workers = [f"worker_{i}" for i in range(size)] tasks = [f"task_{i}" for i in range(size)] random.seed(789) costs = [[scenario["cost_generator"](i, j) for j in range(size)] for i in range(size)] start_time = time.time() result = ortools_solver.solve_assignment_problem( workers=workers, tasks=tasks, costs=costs ) execution_time = time.time() - start_time assert result is not None assert result.get("status").value in ["optimal", "feasible"] assert execution_time < 30, ( f"Scenario {scenario['name']} too slow: {execution_time:.2f}s" ) logger.info(f"Scenario {scenario['name']}: {execution_time:.2f}s") self.check_memory_usage(initial_memory, test_name, max_memory_mb=600) @pytest.mark.stress def test_solver_performance_comparison(self, ortools_solver, pulp_solver): """Compare performance between different solvers.""" test_name = "Solver Performance Comparison" initial_memory = self.monitor_memory_usage(test_name) # Generate test problem size = 100 workers = [f"worker_{i}" for i in range(size)] tasks = [f"task_{i}" for i in range(size)] random.seed(999) costs = [[random.randint(1, 100) for _ in range(size)] for _ in range(size)] # Test OR-Tools start_time = time.time() ortools_result = ortools_solver.solve_assignment_problem( workers=workers, tasks=tasks, costs=costs ) ortools_time = time.time() - start_time # Test PuLP (if it supports assignment problems) pulp_time = None try: start_time = time.time() pulp_result = pulp_solver.solve_assignment_problem( workers=workers, tasks=tasks, costs=costs ) pulp_time = time.time() - start_time except AttributeError: logger.info("PuLP solver doesn't support assignment problems directly") pulp_result = None # Log performance comparison logger.info(f"OR-Tools execution time: {ortools_time:.3f}s") if pulp_time: logger.info(f"PuLP execution time: {pulp_time:.3f}s") logger.info(f"Performance ratio (PuLP/OR-Tools): {pulp_time / ortools_time:.2f}") # Verify results assert ortools_result is not None assert ortools_result.get("status").value in ["optimal", "feasible"] if pulp_result: assert pulp_result.get("status").value in ["optimal", "feasible"] self.check_memory_usage(initial_memory, test_name, max_memory_mb=400) @pytest.mark.stress class TestStressTestConfiguration: """Configuration and utilities for stress testing.""" def test_system_requirements_check(self): """Verify system has sufficient resources for stress testing.""" # Check available memory memory = psutil.virtual_memory() available_gb = memory.available / (1024**3) assert available_gb >= 2, ( f"Insufficient memory for stress tests: {available_gb:.1f} GB available" ) # Check CPU cores cpu_count = multiprocessing.cpu_count() assert cpu_count >= 2, f"Insufficient CPU cores for stress tests: {cpu_count} cores" logger.info(f"System check passed: {available_gb:.1f} GB memory, {cpu_count} CPU cores") def test_stress_test_markers(self): """Verify stress test markers are properly configured.""" # This test ensures stress tests can be run selectively # Run with: pytest -m stress assert True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dmitryanchikov/mcp-optimizer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server