Skip to main content
Glama

solve_job_shop_scheduling

Optimize job shop scheduling to improve machine utilization and reduce completion times by assigning tasks to machines within specified constraints.

Instructions

Solve Job Shop Scheduling Problem to optimize machine utilization and completion times.

    Args:
        jobs: List of job dictionaries with tasks and constraints
        machines: List of available machine names
        horizon: Maximum time horizon for scheduling
        objective: Optimization objective ("makespan" or "total_completion_time")
        time_limit_seconds: Maximum solving time in seconds (default: 30.0)

    Returns:
        Optimization result with job schedule and machine assignments
    

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
jobsYes
machinesYes
horizonYes
objectiveNomakespan
time_limit_secondsNo

Implementation Reference

  • The main handler function for the 'solve_job_shop_scheduling' tool, registered with @mcp.tool(). It prepares the input dictionary and calls the core solver function.
    @mcp.tool()
    def solve_job_shop_scheduling(
        jobs: list[dict[str, Any]],
        machines: list[str],
        horizon: int,
        objective: str = "makespan",
        time_limit_seconds: float = 30.0,
    ) -> dict[str, Any]:
        """Solve Job Shop Scheduling Problem to optimize machine utilization and completion times.
    
        Args:
            jobs: List of job dictionaries with tasks and constraints
            machines: List of available machine names
            horizon: Maximum time horizon for scheduling
            objective: Optimization objective ("makespan" or "total_completion_time")
            time_limit_seconds: Maximum solving time in seconds (default: 30.0)
    
        Returns:
            Optimization result with job schedule and machine assignments
        """
        input_data = {
            "jobs": jobs,
            "machines": machines,
            "horizon": horizon,
            "objective": objective,
            "time_limit_seconds": time_limit_seconds,
        }
    
        result = solve_job_scheduling(input_data)
        result_dict: dict[str, Any] = result.model_dump()
        return result_dict
  • Pydantic schema for validating input to the job shop scheduling problem, including jobs, machines, horizon, objective, and time limit.
    class JobSchedulingInput(BaseModel):
        """Input schema for Job Shop Scheduling."""
    
        jobs: list[Job]
        machines: list[str]
        horizon: int = Field(ge=1)
        objective: str = Field(default="makespan", pattern="^(makespan|total_completion_time)$")
        time_limit_seconds: float = Field(default=30.0, ge=0)
    
        @field_validator("jobs")
        @classmethod
        def validate_jobs(cls, v: list[Job]) -> list[Job]:
            if not v:
                raise ValueError("Must have at least one job")
            return v
    
        @field_validator("machines")
        @classmethod
        def validate_machines(cls, v: list[str]) -> list[str]:
            if not v:
                raise ValueError("Must have at least one machine")
            return v
  • Core helper function that implements the Job Shop Scheduling optimization logic using OR-Tools CP-SAT solver, handling variables, constraints, objectives, and extracting the schedule.
    @with_resource_limits(timeout_seconds=120.0, estimated_memory_mb=150.0)
    def solve_job_scheduling(input_data: dict[str, Any]) -> OptimizationResult:
        """Solve Job Shop Scheduling Problem using OR-Tools CP-SAT.
    
        Args:
            input_data: Job scheduling problem specification
    
        Returns:
            OptimizationResult with job schedule and makespan
        """
        if not ORTOOLS_AVAILABLE:
            return OptimizationResult(
                status=OptimizationStatus.ERROR,
                objective_value=None,
                variables={},
                execution_time=0.0,
                error_message="OR-Tools is not available. Please install it with 'pip install ortools'",
            )
    
        start_time = time.time()
    
        try:
            # Parse and validate input
            scheduling_input = JobSchedulingInput(**input_data)
            jobs = scheduling_input.jobs
            machines = scheduling_input.machines
            horizon = scheduling_input.horizon
    
            # Create CP-SAT model
            model = cp_model.CpModel()
    
            # Variables for each task: (start_time, end_time, interval)
            task_vars: dict[tuple, tuple] = {}
            machine_intervals: dict[int, list] = {i: [] for i in range(len(machines))}
    
            # Create variables for each task
            for job in jobs:
                for task_idx, task in enumerate(job.tasks):
                    suffix = f"_{job.id}_{task_idx}"
                    start_var = model.NewIntVar(0, horizon, f"start{suffix}")
                    duration = task.duration + task.setup_time
                    end_var = model.NewIntVar(0, horizon, f"end{suffix}")
                    interval_var = model.NewIntervalVar(
                        start_var, duration, end_var, f"interval{suffix}"
                    )
    
                    task_vars[(job.id, task_idx)] = (start_var, end_var, interval_var)
                    machine_intervals[task.machine].append(interval_var)
    
            # Add precedence constraints within jobs
            for job in jobs:
                for task_idx in range(len(job.tasks) - 1):
                    _, end_var, _ = task_vars[(job.id, task_idx)]
                    start_var_next, _, _ = task_vars[(job.id, task_idx + 1)]
                    model.Add(end_var <= start_var_next)
    
            # Add machine capacity constraints (no overlap)
            for machine_idx in range(len(machines)):
                if machine_intervals[machine_idx]:
                    model.AddNoOverlap(machine_intervals[machine_idx])
    
            # Add release time constraints
            for job in jobs:
                if job.release_time > 0:
                    start_var, _, _ = task_vars[(job.id, 0)]
                    model.Add(start_var >= job.release_time)
    
            # Add deadline constraints
            for job in jobs:
                if job.deadline is not None:
                    _, end_var, _ = task_vars[(job.id, len(job.tasks) - 1)]
                    model.Add(end_var <= job.deadline)
    
            # Objective: minimize makespan or total completion time
            if scheduling_input.objective == "makespan":
                makespan = model.NewIntVar(0, horizon, "makespan")
                for job in jobs:
                    _, end_var, _ = task_vars[(job.id, len(job.tasks) - 1)]
                    model.Add(makespan >= end_var)
                model.Minimize(makespan)
            else:  # total_completion_time
                completion_times = []
                for job in jobs:
                    _, end_var, _ = task_vars[(job.id, len(job.tasks) - 1)]
                    completion_times.append(end_var)
                model.Minimize(sum(completion_times))
    
            # Solve
            solver = cp_model.CpSolver()
            solver.parameters.max_time_in_seconds = scheduling_input.time_limit_seconds
            status = solver.Solve(model)
    
            if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:  # type: ignore[comparison-overlap,unused-ignore]
                # Extract solution
                schedule = []
                job_completion_times = {}
    
                for job in jobs:
                    job_schedule = []
                    for task_idx, task in enumerate(job.tasks):
                        start_var, end_var, _ = task_vars[(job.id, task_idx)]
                        start_time_val = solver.Value(start_var)
                        end_time_val = solver.Value(end_var)
    
                        job_schedule.append(
                            {
                                "task_index": task_idx,
                                "machine": machines[task.machine],
                                "machine_index": task.machine,
                                "start_time": start_time_val,
                                "end_time": end_time_val,
                                "duration": task.duration,
                                "setup_time": task.setup_time,
                            }
                        )
    
                    completion_time = max(task["end_time"] for task in job_schedule)  # type: ignore[type-var]
                    job_completion_times[job.id] = completion_time
    
                    schedule.append(
                        {
                            "job_id": job.id,
                            "tasks": job_schedule,
                            "completion_time": completion_time,
                            "priority": job.priority,
                        }
                    )
    
                makespan = max(job_completion_times.values()) if job_completion_times else 0  # type: ignore[type-var,assignment]
                total_completion_time = sum(job_completion_times.values())  # type: ignore[arg-type]
    
                execution_time = time.time() - start_time
    
                return OptimizationResult(
                    status=OptimizationStatus.OPTIMAL
                    if status == cp_model.OPTIMAL  # type: ignore[comparison-overlap,unused-ignore]
                    else OptimizationStatus.FEASIBLE,
                    objective_value=float(
                        makespan  # type: ignore[arg-type]
                        if scheduling_input.objective == "makespan"
                        else total_completion_time
                    ),
                    variables={
                        "schedule": schedule,
                        "makespan": makespan,
                        "total_completion_time": total_completion_time,
                        "job_completion_times": job_completion_times,
                        "num_jobs": len(jobs),
                        "num_machines": len(machines),
                    },
                    execution_time=execution_time,
                    solver_info={
                        "solver_name": "OR-Tools CP-SAT",
                        "status": solver.StatusName(status),
                        "objective": scheduling_input.objective,
                    },
                )
            else:
                status_name = solver.StatusName(status)
                return OptimizationResult(
                    status=OptimizationStatus.INFEASIBLE
                    if status == cp_model.INFEASIBLE  # type: ignore[comparison-overlap,unused-ignore]
                    else OptimizationStatus.ERROR,
                    error_message=f"No solution found: {status_name}",
                    execution_time=time.time() - start_time,
                )
    
        except Exception as e:
            return OptimizationResult(
                status=OptimizationStatus.ERROR,
                error_message=f"Job scheduling error: {str(e)}",
                execution_time=time.time() - start_time,
            )
  • Registration of all tools including scheduling tools via register_scheduling_tools(mcp) in the MCP server creation function.
    register_linear_programming_tools(mcp)
    register_integer_programming_tools(mcp)
    register_assignment_tools(mcp)
    register_knapsack_tools(mcp)
    register_routing_tools(mcp)
    register_scheduling_tools(mcp)
    register_financial_tools(mcp)
    register_production_tools(mcp)
    register_validation_tools(mcp)
  • Function that registers the scheduling tools, including 'solve_job_shop_scheduling', by defining them with @mcp.tool() decorators.
    def register_scheduling_tools(mcp: FastMCP[Any]) -> None:
        """Register scheduling optimization tools with MCP server."""
    
        @mcp.tool()
        def solve_job_shop_scheduling(
            jobs: list[dict[str, Any]],
            machines: list[str],
            horizon: int,
            objective: str = "makespan",
            time_limit_seconds: float = 30.0,
        ) -> dict[str, Any]:
            """Solve Job Shop Scheduling Problem to optimize machine utilization and completion times.
    
            Args:
                jobs: List of job dictionaries with tasks and constraints
                machines: List of available machine names
                horizon: Maximum time horizon for scheduling
                objective: Optimization objective ("makespan" or "total_completion_time")
                time_limit_seconds: Maximum solving time in seconds (default: 30.0)
    
            Returns:
                Optimization result with job schedule and machine assignments
            """
            input_data = {
                "jobs": jobs,
                "machines": machines,
                "horizon": horizon,
                "objective": objective,
                "time_limit_seconds": time_limit_seconds,
            }
    
            result = solve_job_scheduling(input_data)
            result_dict: dict[str, Any] = result.model_dump()
            return result_dict
    
        @mcp.tool()
        def solve_employee_shift_scheduling(
            employees: list[str],
            shifts: list[dict[str, Any]],
            days: int,
            employee_constraints: dict[str, dict[str, Any]] | None = None,
            time_limit_seconds: float = 30.0,
        ) -> dict[str, Any]:
            """Solve Employee Shift Scheduling to assign employees to shifts optimally.
    
            Args:
                employees: List of employee names
                shifts: List of shift dictionaries with time and requirements
                days: Number of days to schedule
                employee_constraints: Optional constraints and preferences per employee
                time_limit_seconds: Maximum solving time in seconds (default: 30.0)
    
            Returns:
                Optimization result with employee schedules and coverage statistics
            """
            input_data = {
                "employees": employees,
                "shifts": shifts,
                "days": days,
                "employee_constraints": employee_constraints or {},
                "time_limit_seconds": time_limit_seconds,
            }
    
            result = solve_shift_scheduling(input_data)
            result_dict: dict[str, Any] = result.model_dump()
            return result_dict
    
        logger.info("Registered scheduling tools")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/dmitryanchikov/mcp-optimizer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server