Skip to main content
Glama

mcp-solver

MIT License
133
  • Linux
  • Apple
model_manager.py13.3 kB
import asyncio import logging from datetime import timedelta from typing import Any from minizinc import Instance, Model, Result, Solver, Status from minizinc.error import MiniZincError, SyntaxError, TypeError from ..core.base_model_manager import BaseModelManager from ..core.constants import ( CLEANUP_TIMEOUT, MAX_SOLVE_TIMEOUT, VALIDATION_TIMEOUT, ) logger = logging.getLogger(__name__) def error_response( code: str, message: str, details: dict | None = None ) -> dict[str, Any]: """Helper function to create a standardized error response.""" return {"error": {"code": code, "message": message, "details": details or {}}} class ModelError(Exception): """Custom exception for model-related errors""" pass class MiniZincModelManager(BaseModelManager): def __init__(self, solver_name: str = "chuffed"): super().__init__() self.current_solution: Any | None = None self.solver = Solver.lookup(solver_name) self.current_process = None self.cleanup_lock = asyncio.Lock() self.solve_progress = 0.0 self.solve_status = "" self.initialized = True @property def model_string(self) -> str: return self._get_full_code() def _update_progress(self, progress: float, status: str): self.solve_progress = progress self.solve_status = status def get_solve_progress(self) -> tuple[float, str]: return self.solve_progress, self.solve_status async def _cleanup(self): async with self.cleanup_lock: if self.current_process: try: async with asyncio.timeout(CLEANUP_TIMEOUT.total_seconds()): self.current_process.terminate() await asyncio.sleep(0.1) if self.current_process.is_alive(): self.current_process.kill() except TimeoutError: logger.warning("Cleanup timeout reached, forcing process kill") if self.current_process: self.current_process.kill() except Exception as e: logger.error(f"Error cleaning up process: {e}") finally: self.current_process = None async def _validate_hypothetical_model( self, proposed_items: list[str], timeout: timedelta | None = None ) -> None: """Validates a hypothetical model state by creating a temporary instance""" timeout = timeout or VALIDATION_TIMEOUT try: async with asyncio.timeout(timeout.total_seconds()): model = Model() model_text = "\n".join(proposed_items) if model_text.strip(): if 'include "globals.mzn"' not in model_text: model_text = 'include "globals.mzn";\n' + model_text model.add_string(model_text) instance = Instance(self.solver, model) instance.analyse() except TimeoutError: raise ModelError( f"Model validation timed out after {timeout.total_seconds()} seconds" ) except MiniZincError as e: if isinstance(e, SyntaxError): raise ModelError(f"Syntax error: {e!s}") elif isinstance(e, TypeError): raise ModelError(f"Type error: {e!s}") else: raise ModelError(f"Model error: {e!s}") async def add_item( self, index: int, content: str, validation_timeout: timedelta | None = None ) -> dict[str, Any]: """ Adds a new item at the specified index (1-based). Returns a standardized error response if the index is invalid or validation fails. """ if not content.strip(): return error_response("EMPTY_CONTENT", "Content is empty") # First, let the parent handle the list operation result = await super().add_item(index, content) if not result.get("success"): # Convert parent's error format to MiniZinc's error format return error_response( "INVALID_INDEX", result.get("error", "Invalid index"), {"valid_range": f"0-{len(self.code_items)}"}, ) # Validate the model after the change try: await self._validate_hypothetical_model( self.code_items, timeout=validation_timeout ) except ModelError as e: # Rollback the change del self.code_items[index] return error_response("MODEL_VALIDATION_FAILED", str(e)) return {"message": f"Item added\nCurrent model:\n{self.model_string}"} async def delete_item( self, index: int, validation_timeout: timedelta | None = None ) -> dict[str, Any]: if not self.code_items: return error_response( "MODEL_EMPTY", "Operation 'delete_item' cannot be performed on an empty model", ) # Store the item to restore if validation fails if 0 <= index < len(self.code_items): removed_item = self.code_items[index] # Let parent handle the delete result = await super().delete_item(index) if not result.get("success"): return error_response( "INVALID_INDEX", result.get("error", "Invalid index"), {"valid_range": f"0-{len(self.code_items) - 1}"}, ) # Validate the model after deletion try: await self._validate_hypothetical_model( self.code_items, timeout=validation_timeout ) except ModelError as e: # Rollback the change self.code_items.insert(index, removed_item) return error_response("MODEL_VALIDATION_FAILED", str(e)) return {"message": f"Item deleted\nCurrent model:\n{self.model_string}"} async def replace_item( self, index: int, content: str, validation_timeout: timedelta | None = None ) -> dict[str, Any]: if not self.code_items: return error_response( "MODEL_EMPTY", "Operation 'replace_item' cannot be performed on an empty model", ) if not content.strip(): return error_response("EMPTY_CONTENT", "Content is empty") # Store old content for rollback if 0 <= index < len(self.code_items): old_content = self.code_items[index] # Let parent handle the replace result = await super().replace_item(index, content) if not result.get("success"): return error_response( "INVALID_INDEX", result.get("error", "Invalid index"), {"valid_range": f"0-{len(self.code_items) - 1}"}, ) # Validate the model after replacement try: await self._validate_hypothetical_model( self.code_items, timeout=validation_timeout ) except ModelError as e: # Rollback the change self.code_items[index] = old_content return error_response("MODEL_VALIDATION_FAILED", str(e)) return {"message": f"Item replaced\nCurrent model:\n{self.model_string}"} async def clear_model(self) -> dict[str, Any]: """Override to maintain MiniZinc-specific state""" result = await super().clear_model() self.current_solution = None self.solve_progress = 0.0 self.solve_status = "" return {"message": "Model cleared"} # Reintroduce alias: if a tool call uses "insert_item", it is mapped to add_item. insert_item = add_item async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self._cleanup() async def solve_model(self, timeout: timedelta) -> dict[str, Any]: if not self.model_string.strip(): return error_response( "MODEL_EMPTY", "Model is empty. Cannot solve an empty model." ) async with self: return await self._solve_model_impl(timeout) async def _solve_model_impl(self, timeout: timedelta) -> dict[str, Any]: # We should now always have a valid timeout from the server if timeout > MAX_SOLVE_TIMEOUT: return error_response( "TIMEOUT_EXCEEDED", f"Timeout {timeout} exceeds maximum allowed timeout {MAX_SOLVE_TIMEOUT}", ) try: model = Model() model_text = ( 'include "globals.mzn";\n' + self.model_string if 'include "globals.mzn"' not in self.model_string else self.model_string ) model.add_string(model_text) instance = Instance(self.solver, model) self._update_progress(0.0, "Starting solve") result = await asyncio.wait_for( asyncio.to_thread(instance.solve, timeout=timeout), timeout=timeout.total_seconds() + 1.0, ) self._update_progress(1.0, "Solve completed") return self._process_result(result, timeout.total_seconds()) except TimeoutError: self._update_progress(1.0, "Timeout reached") if hasattr(instance, "cancel"): await instance.cancel() return error_response( "SOLVER_TIMEOUT", "Solver reached timeout without conclusion", {"solve_time": timeout.total_seconds()}, ) except MiniZincError as e: self._update_progress(1.0, "Error occurred") logger.error("MiniZinc solve error", exc_info=True) return error_response("MINIZINC_ERROR", f"MiniZinc error: {e!s}") def _process_result(self, result: Result, timeout_seconds: float) -> dict[str, Any]: self.current_solution = result self.last_solve_time = ( result.statistics["solveTime"].total_seconds() if "solveTime" in result.statistics else None ) # Build a standardized solution format solution = {"status": str(result.status)} if ( result.status == Status.SATISFIED or result.status == Status.ALL_SOLUTIONS or result.status == Status.OPTIMAL_SOLUTION ): solution["satisfiable"] = True # Extract solution values solution_values = {} for name, value in result.solution.__dict__.items(): if not name.startswith("_"): # Skip private attributes solution_values[name] = value solution["solution"] = solution_values # Add objective value if it exists if hasattr(result, "objective") and result.objective is not None: solution["objective"] = result.objective # Add optimization status if result.status == Status.OPTIMAL_SOLUTION: solution["optimal"] = True else: solution["optimal"] = False elif result.status == Status.UNSATISFIABLE: solution["satisfiable"] = False else: solution["satisfiable"] = False solution["message"] = ( f"Solver status: {result.status} after {timeout_seconds} seconds" ) # Store in format compatible with BaseModelManager self.last_solution = { "satisfiable": solution.get("satisfiable", False), "status": str(result.status), "values": solution.get("solution", {}), } if "objective" in solution: self.last_solution["objective"] = solution["objective"] if "optimal" in solution: self.last_solution["optimal"] = solution["optimal"] return solution def get_solution(self) -> dict[str, Any]: if not self.current_solution: return error_response( "NO_SOLUTION", "No solution is available. Please solve the model first." ) return self._process_result(self.current_solution, 0) def get_variable_value(self, variable_name: str) -> dict[str, Any]: if not self.current_solution: return error_response( "NO_SOLUTION", "No solution is available. Please solve the model first." ) if variable_name not in self.current_solution.solution.__dict__: return error_response( "VARIABLE_NOT_FOUND", f"Variable '{variable_name}' not found in solution", ) return { "name": variable_name, "value": self.current_solution.solution.__dict__[variable_name], } def get_solve_time(self) -> dict[str, Any]: if self.last_solve_time is None: return error_response( "NO_SOLUTION", "No solve time available. Please solve the model first." ) return {"solve_time": self.last_solve_time}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/szeider/mcp-solver'

If you have feedback or need assistance with the MCP directory API, please join our Discord server