Skip to main content
Glama

mcp-optimizer

ortools_solver.py•29.6 kB
"""OR-Tools solver implementation for various optimization problems.""" import logging import time from typing import Any try: from ortools.graph.python import linear_sum_assignment from ortools.linear_solver import pywraplp ORTOOLS_AVAILABLE = True except ImportError as e: logging.warning(f"OR-Tools not available: {e}") linear_sum_assignment = None pywraplp = None ORTOOLS_AVAILABLE = False from mcp_optimizer.config import settings from mcp_optimizer.schemas.base import ( OptimizationStatus, SolverInfo, ) from mcp_optimizer.schemas.problem_schemas import ( Assignment, AssignmentResult, TransportationFlow, TransportationResult, ) logger = logging.getLogger(__name__) class AdvancedConstraints: """Advanced constraint types for optimization problems.""" def __init__(self) -> None: self.precedence_constraints: list[tuple[str, str]] = [] self.skill_requirements: dict[str, list[str]] = {} self.resource_limits: dict[str, int] = {} self.exclusion_constraints: list[list[str]] = [] self.grouping_constraints: list[tuple[list[str], bool]] = [] def add_precedence_constraint(self, task_before: str, task_after: str) -> None: """Add precedence constraint: task_before must be completed before task_after.""" self.precedence_constraints.append((task_before, task_after)) def add_skill_requirement(self, task: str, required_skills: list[str]) -> None: """Add skill requirement for a task.""" self.skill_requirements[task] = required_skills def add_resource_limit(self, resource: str, limit: int) -> None: """Add resource limit constraint.""" self.resource_limits[resource] = limit def add_exclusion_constraint(self, tasks: list[str]) -> None: """Add exclusion constraint: these tasks cannot be assigned to the same worker.""" self.exclusion_constraints.append(tasks) def add_grouping_constraint(self, tasks: list[str], same_worker: bool = True) -> None: """Add grouping constraint: these tasks must/must not be assigned to the same worker.""" self.grouping_constraints.append((tasks, same_worker)) class ORToolsSolver: """Solver for various optimization problems using OR-Tools.""" def __init__(self) -> None: """Initialize OR-Tools solver.""" self.solver_name = "OR-Tools" def solve_assignment_problem( self, workers: list[str], tasks: list[str], costs: list[list[float]], maximize: bool = False, max_tasks_per_worker: int | None = None, min_tasks_per_worker: int | None = None, worker_skills: dict[str, list[str]] | None = None, task_requirements: dict[str, list[str]] | None = None, advanced_constraints: AdvancedConstraints | None = None, ) -> dict[str, Any]: """Solve assignment problem using OR-Tools with advanced constraints support. Args: workers: List of worker names tasks: List of task names costs: Cost matrix (workers x tasks) maximize: Whether to maximize instead of minimize max_tasks_per_worker: Maximum tasks per worker min_tasks_per_worker: Minimum tasks per worker worker_skills: Dictionary mapping workers to their skills task_requirements: Dictionary mapping tasks to required skills advanced_constraints: Advanced constraint object Returns: Assignment optimization result """ if not ORTOOLS_AVAILABLE: return { "status": "error", "error_message": "OR-Tools is not available. Please install it with 'pip install ortools'", "total_cost": None, "assignments": [], "execution_time": 0.0, } # Validate input dimensions first (outside try-catch) if len(costs) != len(workers): raise ValueError( f"Cost matrix rows ({len(costs)}) must match workers count ({len(workers)})" ) for i, row in enumerate(costs): if len(row) != len(tasks): raise ValueError( f"Cost matrix row {i} length ({len(row)}) must match tasks count ({len(tasks)})" ) # Validate constraints feasibility (outside try-catch) self._validate_assignment_constraints( workers, tasks, max_tasks_per_worker, min_tasks_per_worker, worker_skills, task_requirements, advanced_constraints, ) start_time = time.time() try: # For maximize problems, negate costs if maximize: costs = [[-cost for cost in row] for row in costs] # Use LinearSumAssignment for simple 1:1 assignment without advanced constraints if ( max_tasks_per_worker is None and min_tasks_per_worker is None and worker_skills is None and task_requirements is None and advanced_constraints is None and len(workers) == len(tasks) ): assignment = linear_sum_assignment.SimpleLinearSumAssignment() # Add costs for worker_idx in range(len(workers)): for task_idx in range(len(tasks)): assignment.add_arc_with_cost( worker_idx, task_idx, int(costs[worker_idx][task_idx] * 1000), ) # Solve status = assignment.solve() execution_time = time.time() - start_time if status == assignment.OPTIMAL: # Extract solution assignments = [] total_cost = 0.0 for worker_idx in range(assignment.num_nodes()): if assignment.right_mate(worker_idx) >= 0: task_idx = assignment.right_mate(worker_idx) original_cost = ( -costs[worker_idx][task_idx] if maximize else costs[worker_idx][task_idx] ) assignments.append( Assignment( worker=workers[worker_idx], task=tasks[task_idx], cost=original_cost, ) ) total_cost += original_cost result = AssignmentResult( status=OptimizationStatus.OPTIMAL, total_cost=total_cost, assignments=assignments, execution_time=execution_time, solver_info=SolverInfo( solver_name="OR-Tools LinearSumAssignment", iterations=None, gap=None, ), ) logger.info(f"Assignment problem solved optimally in {execution_time:.3f}s") return result.model_dump() else: return AssignmentResult( status=OptimizationStatus.INFEASIBLE, total_cost=None, assignments=[], execution_time=execution_time, error_message="Assignment problem is infeasible", ).model_dump() else: # Use linear programming for complex constraints return self._solve_assignment_with_advanced_constraints( workers, tasks, costs, maximize, max_tasks_per_worker, min_tasks_per_worker, worker_skills, task_requirements, advanced_constraints, start_time, ) except Exception as e: execution_time = time.time() - start_time logger.error(f"Error solving assignment problem: {e}") return AssignmentResult( status=OptimizationStatus.ERROR, total_cost=None, assignments=[], execution_time=execution_time, error_message=f"Solver error: {str(e)}", ).model_dump() def _validate_assignment_constraints( self, workers: list[str], tasks: list[str], max_tasks_per_worker: int | None, min_tasks_per_worker: int | None, worker_skills: dict[str, list[str]] | None, task_requirements: dict[str, list[str]] | None, advanced_constraints: AdvancedConstraints | None, ) -> None: """Validate assignment constraints for feasibility.""" # Check basic capacity constraints if max_tasks_per_worker is not None and max_tasks_per_worker <= 0: if max_tasks_per_worker == 0 and len(tasks) > 0: raise ValueError("All workers have max_tasks_per_worker=0 but tasks exist") if max_tasks_per_worker < 0: raise ValueError("max_tasks_per_worker cannot be negative") if min_tasks_per_worker is not None and min_tasks_per_worker < 0: raise ValueError("min_tasks_per_worker cannot be negative") if ( max_tasks_per_worker is not None and min_tasks_per_worker is not None and min_tasks_per_worker > max_tasks_per_worker ): raise ValueError("min_tasks_per_worker cannot exceed max_tasks_per_worker") # Check if enough capacity exists if max_tasks_per_worker is not None: total_capacity = len(workers) * max_tasks_per_worker if total_capacity < len(tasks): raise ValueError( f"Insufficient total capacity: {total_capacity} < {len(tasks)} tasks" ) # Check minimum capacity requirements if min_tasks_per_worker is not None: min_total_assignments = len(workers) * min_tasks_per_worker if min_total_assignments > len(tasks): raise ValueError( f"Minimum assignments exceed available tasks: {min_total_assignments} > {len(tasks)}" ) # Validate skill constraints if worker_skills and task_requirements: for task, required_skills in task_requirements.items(): if task not in tasks: raise ValueError(f"Task '{task}' in requirements not found in task list") # Check if at least one worker has all required skills capable_workers = [] for worker in workers: worker_skill_set = set(worker_skills.get(worker, [])) required_skill_set = set(required_skills) if required_skill_set.issubset(worker_skill_set): capable_workers.append(worker) if not capable_workers: raise ValueError( f"No worker has all required skills for task '{task}': {required_skills}" ) # Validate advanced constraints if advanced_constraints: # Check precedence constraints task_set = set(tasks) for task_before, task_after in advanced_constraints.precedence_constraints: if task_before not in task_set: raise ValueError( f"Precedence constraint references unknown task: {task_before}" ) if task_after not in task_set: raise ValueError(f"Precedence constraint references unknown task: {task_after}") # Check exclusion constraints for excluded_tasks in advanced_constraints.exclusion_constraints: for task in excluded_tasks: if task not in task_set: raise ValueError(f"Exclusion constraint references unknown task: {task}") def _solve_assignment_with_advanced_constraints( self, workers: list[str], tasks: list[str], costs: list[list[float]], maximize: bool, max_tasks_per_worker: int | None, min_tasks_per_worker: int | None, worker_skills: dict[str, list[str]] | None, task_requirements: dict[str, list[str]] | None, advanced_constraints: AdvancedConstraints | None, start_time: float, ) -> dict[str, Any]: """Solve assignment problem with advanced constraints using linear programming.""" try: # Create linear programming solver solver = pywraplp.Solver.CreateSolver("SCIP") if not solver: raise RuntimeError("Could not create OR-Tools linear solver") # Create binary variables for each worker-task pair x = {} for i, worker in enumerate(workers): for j, task in enumerate(tasks): x[i, j] = solver.BoolVar(f"x_{worker}_{task}") # Objective function objective = solver.Objective() for i in range(len(workers)): for j in range(len(tasks)): coeff = -costs[i][j] if maximize else costs[i][j] objective.SetCoefficient(x[i, j], coeff) if maximize: objective.SetMaximization() else: objective.SetMinimization() # Constraints: each task assigned to exactly one worker for j in range(len(tasks)): constraint = solver.Constraint(1, 1) # Changed from (0,1) to (1,1) for i in range(len(workers)): constraint.SetCoefficient(x[i, j], 1) # Worker capacity constraints for i in range(len(workers)): if max_tasks_per_worker is not None: constraint = solver.Constraint(0, max_tasks_per_worker) for j in range(len(tasks)): constraint.SetCoefficient(x[i, j], 1) if min_tasks_per_worker is not None: constraint = solver.Constraint(min_tasks_per_worker, len(tasks)) for j in range(len(tasks)): constraint.SetCoefficient(x[i, j], 1) # Skill-based constraints if worker_skills and task_requirements: for j, task in enumerate(tasks): if task in task_requirements: required_skills = set(task_requirements[task]) # Only allow assignment to workers with required skills for i, worker in enumerate(workers): worker_skill_set = set(worker_skills.get(worker, [])) if not required_skills.issubset(worker_skill_set): # Force this assignment to be 0 constraint = solver.Constraint(0, 0) constraint.SetCoefficient(x[i, j], 1) # Advanced constraints if advanced_constraints: # Precedence constraints (simplified: tasks must be assigned to different workers) for task_before, task_after in advanced_constraints.precedence_constraints: try: j_before = tasks.index(task_before) j_after = tasks.index(task_after) # If both tasks are assigned to the same worker, ensure precedence for i in range(len(workers)): # This is a simplification - in reality, precedence needs scheduling # For now, we prevent same worker assignment constraint = solver.Constraint(0, 1) constraint.SetCoefficient(x[i, j_before], 1) constraint.SetCoefficient(x[i, j_after], 1) except ValueError: continue # Task not found, skip this constraint # Exclusion constraints for excluded_tasks in advanced_constraints.exclusion_constraints: task_indices = [] for task in excluded_tasks: try: task_indices.append(tasks.index(task)) except ValueError: continue # These tasks cannot be assigned to the same worker for i in range(len(workers)): constraint = solver.Constraint(0, 1) for j in task_indices: constraint.SetCoefficient(x[i, j], 1) # Grouping constraints for grouped_tasks, same_worker in advanced_constraints.grouping_constraints: task_indices = [] for task in grouped_tasks: try: task_indices.append(tasks.index(task)) except ValueError: continue if len(task_indices) < 2: continue if same_worker: # All tasks must be assigned to the same worker # Create auxiliary variable for worker selection worker_vars = [] for i in range(len(workers)): worker_var = solver.BoolVar(f"group_worker_{i}") worker_vars.append(worker_var) # If this worker is selected, all tasks go to this worker for j in task_indices: constraint = solver.Constraint(-solver.infinity(), 0) constraint.SetCoefficient(x[i, j], 1) constraint.SetCoefficient(worker_var, -1) # Exactly one worker must be selected constraint = solver.Constraint(1, 1) for worker_var in worker_vars: constraint.SetCoefficient(worker_var, 1) # Set time limit solver.SetTimeLimit(int(settings.max_solve_time * 1000)) # Solve status = solver.Solve() execution_time = time.time() - start_time if status == pywraplp.Solver.OPTIMAL: # Extract solution assignments = [] total_cost = 0.0 for i in range(len(workers)): for j in range(len(tasks)): if x[i, j].solution_value() > 0.5: original_cost = -costs[i][j] if maximize else costs[i][j] assignments.append( Assignment( worker=workers[i], task=tasks[j], cost=original_cost, ) ) total_cost += original_cost result = AssignmentResult( status=OptimizationStatus.OPTIMAL, total_cost=total_cost, assignments=assignments, execution_time=execution_time, solver_info=SolverInfo( solver_name="OR-Tools SCIP with Advanced Constraints", iterations=solver.iterations(), gap=None, ), ) logger.info( f"Assignment problem with advanced constraints solved in {execution_time:.3f}s" ) return result.model_dump() elif status == pywraplp.Solver.INFEASIBLE: return AssignmentResult( status=OptimizationStatus.INFEASIBLE, total_cost=None, assignments=[], execution_time=execution_time, error_message="Assignment problem with constraints is infeasible", ).model_dump() elif status == pywraplp.Solver.TIME_LIMIT: # Extract best solution found so far assignments = [] total_cost = 0.0 for i in range(len(workers)): for j in range(len(tasks)): if x[i, j].solution_value() > 0.5: original_cost = -costs[i][j] if maximize else costs[i][j] assignments.append( Assignment( worker=workers[i], task=tasks[j], cost=original_cost, ) ) total_cost += original_cost return AssignmentResult( status=OptimizationStatus.FEASIBLE, total_cost=total_cost, assignments=assignments, execution_time=execution_time, error_message="Time limit reached, returning best solution found", solver_info=SolverInfo( solver_name="OR-Tools SCIP with Advanced Constraints", iterations=solver.iterations(), gap=None, ), ).model_dump() else: return AssignmentResult( status=OptimizationStatus.ERROR, total_cost=None, assignments=[], execution_time=execution_time, error_message=f"Solver returned status: {status}", ).model_dump() except Exception as e: execution_time = time.time() - start_time logger.error(f"Error in advanced constrained assignment solver: {e}") return AssignmentResult( status=OptimizationStatus.ERROR, total_cost=None, assignments=[], execution_time=execution_time, error_message=f"Solver error: {str(e)}", ).model_dump() # Legacy method for backward compatibility def _solve_assignment_with_constraints( self, workers: list[str], tasks: list[str], costs: list[list[float]], maximize: bool, max_tasks_per_worker: int | None, min_tasks_per_worker: int | None, start_time: float, ) -> dict[str, Any]: """Legacy method for backward compatibility.""" return self._solve_assignment_with_advanced_constraints( workers, tasks, costs, maximize, max_tasks_per_worker, min_tasks_per_worker, None, # worker_skills None, # task_requirements None, # advanced_constraints start_time, ) def solve_transportation_problem( self, suppliers: list[dict[str, Any]], consumers: list[dict[str, Any]], costs: list[list[float]], ) -> dict[str, Any]: """Solve transportation problem using OR-Tools linear programming. Args: suppliers: List of suppliers with supply amounts consumers: List of consumers with demand amounts costs: Transportation cost matrix (suppliers x consumers) Returns: Transportation optimization result """ if not ORTOOLS_AVAILABLE: return { "status": "error", "error_message": "OR-Tools is not available. Please install it with 'pip install ortools'", "total_cost": None, "flows": [], "execution_time": 0.0, } start_time = time.time() try: # Validate input if len(costs) != len(suppliers): raise ValueError( f"Cost matrix rows ({len(costs)}) must match suppliers count ({len(suppliers)})" ) for i, row in enumerate(costs): if len(row) != len(consumers): raise ValueError( f"Cost matrix row {i} length ({len(row)}) must match consumers count ({len(consumers)})" ) # Create linear programming solver solver = pywraplp.Solver.CreateSolver("SCIP") if not solver: raise RuntimeError("Could not create OR-Tools linear solver") # Create variables for transportation amounts x = {} for i in range(len(suppliers)): for j in range(len(consumers)): x[i, j] = solver.NumVar( 0, solver.infinity(), f"x_{suppliers[i]['name']}_{consumers[j]['name']}", ) # Objective: minimize total transportation cost objective = solver.Objective() for i in range(len(suppliers)): for j in range(len(consumers)): objective.SetCoefficient(x[i, j], costs[i][j]) objective.SetMinimization() # Supply constraints for i, supplier in enumerate(suppliers): constraint = solver.Constraint(0, supplier["supply"]) for j in range(len(consumers)): constraint.SetCoefficient(x[i, j], 1) # Demand constraints for j, consumer in enumerate(consumers): constraint = solver.Constraint(consumer["demand"], consumer["demand"]) for i in range(len(suppliers)): constraint.SetCoefficient(x[i, j], 1) # Set time limit solver.SetTimeLimit(int(settings.max_solve_time * 1000)) # Solve status = solver.Solve() execution_time = time.time() - start_time if status == pywraplp.Solver.OPTIMAL: # Extract solution flows = [] total_cost = 0.0 for i in range(len(suppliers)): for j in range(len(consumers)): amount = x[i, j].solution_value() if amount > 1e-6: # Only include non-zero flows flow_cost = amount * costs[i][j] flows.append( TransportationFlow( supplier=suppliers[i]["name"], consumer=consumers[j]["name"], amount=amount, cost=flow_cost, ) ) total_cost += flow_cost result = TransportationResult( status=OptimizationStatus.OPTIMAL, total_cost=total_cost, flows=flows, execution_time=execution_time, solver_info=SolverInfo( solver_name="OR-Tools SCIP", iterations=solver.iterations(), gap=None, ), ) logger.info(f"Transportation problem solved optimally in {execution_time:.3f}s") return result.model_dump() elif status == pywraplp.Solver.INFEASIBLE: return TransportationResult( status=OptimizationStatus.INFEASIBLE, total_cost=None, flows=[], execution_time=execution_time, error_message="Transportation problem is infeasible", ).model_dump() else: return TransportationResult( status=OptimizationStatus.ERROR, total_cost=None, flows=[], execution_time=execution_time, error_message=f"Solver returned status: {status}", ).model_dump() except Exception as e: execution_time = time.time() - start_time logger.error(f"Error solving transportation problem: {e}") return TransportationResult( status=OptimizationStatus.ERROR, total_cost=None, flows=[], execution_time=execution_time, error_message=f"Solver error: {str(e)}", ).model_dump()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dmitryanchikov/mcp-optimizer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server