Skip to main content
Glama

mcp-solver

MIT License
133
  • Linux
  • Apple
model_manager.py29.7 kB
""" PySAT model manager implementation using BaseModelManager. This module implements the SolverManager abstract base class for PySAT, providing methods for managing PySAT models. """ import logging import re import time from datetime import timedelta from typing import Any from ..core.base_model_manager import BaseModelManager from ..core.constants import MAX_SOLVE_TIMEOUT, MIN_SOLVE_TIMEOUT from ..core.validation import ( DictionaryMisuseValidator, ValidationError, get_standardized_response, validate_content, validate_python_code_safety, validate_timeout, ) from .environment import execute_pysat_code class PySATModelManager(BaseModelManager): """ PySAT model manager implementation. This class manages PySAT models, including adding, removing, and modifying model items, as well as solving models and extracting solutions. """ def __init__(self): """ Initialize a new PySAT model manager. """ super().__init__() self.initialized = True self.logger = logging.getLogger(__name__) self.dict_validator = DictionaryMisuseValidator() self.logger.info("PySAT model manager initialized") async def clear_model(self) -> dict[str, Any]: """ Clear the current model. Returns: A dictionary with a message indicating the model was cleared """ result = await super().clear_model() # Reset the dictionary misuse validator self.dict_validator = DictionaryMisuseValidator() self.logger.info("Model cleared") return {"message": "Model cleared successfully"} async def add_item(self, index: int, content: str) -> dict[str, Any]: """ Add an item to the model at the specified index. Args: index: The index at which to add the item content: The content of the item Returns: A dictionary with the result of the operation Raises: ValidationError: If the input is invalid """ try: # Validate content and code safety validate_content(content) validate_python_code_safety(content) # First call parent's add_item to handle list operations result = await super().add_item(index, content) if not result.get("success"): return result # Add the code fragment to the dictionary misuse validator self.dict_validator.add_fragment(content, index) self.logger.debug(f"Added fragment to dictionary validator: item {index}") return result except ValidationError as e: error_msg = str(e) self.logger.error(f"Validation error in add_item: {error_msg}") return get_standardized_response( success=False, message=f"Failed to add item: {error_msg}", error=error_msg, ) except Exception as e: error_msg = f"Unexpected error in add_item: {e!s}" self.logger.error(error_msg, exc_info=True) return get_standardized_response( success=False, message="Failed to add item due to an internal error", error=error_msg, ) async def replace_item(self, index: int, content: str) -> dict[str, Any]: """ Replace an item in the model at the specified index. Args: index: The index of the item to replace content: The new content of the item Returns: A dictionary with the result of the operation """ try: # Validate content and code safety validate_content(content) validate_python_code_safety(content) # First call parent's replace_item result = await super().replace_item(index, content) if not result.get("success"): return result # Add the code fragment to the dictionary misuse validator self.dict_validator.add_fragment(content, index) self.logger.debug(f"Added fragment to dictionary validator: item {index}") return result except ValidationError as e: error_msg = str(e) self.logger.error(f"Validation error in replace_item: {error_msg}") return get_standardized_response( success=False, message=f"Failed to replace item: {error_msg}", error=error_msg, ) except Exception as e: error_msg = f"Unexpected error in replace_item: {e!s}" self.logger.error(error_msg, exc_info=True) return get_standardized_response( success=False, message="Failed to replace item due to an internal error", error=error_msg, ) async def solve_model(self, timeout: timedelta) -> dict[str, Any]: """ Solve the current model with a timeout. Args: timeout: Maximum time to spend on solving Returns: A dictionary with the solving result """ try: if not self.initialized: return get_standardized_response( success=False, message="Model manager not initialized", error="Not initialized", ) if not self.code_items: return get_standardized_response( success=False, message="No model items to solve", error="Empty model", ) # Validate timeout validate_timeout(timeout, MIN_SOLVE_TIMEOUT, MAX_SOLVE_TIMEOUT) # Validate code fragments for dictionary misuse dict_validation_result = self.dict_validator.validate() if dict_validation_result["has_errors"]: # Dictionary misuse detected, format errors for user error_messages = [] for error in dict_validation_result["errors"]: item_index = error.get("item", "?") line_num = error.get("fragment_line", error.get("line", "?")) error_messages.append( f"Item {item_index}, Line {line_num}: {error['message']} {error['suggestion']}" ) self.logger.warning(f"Dictionary misuse detected: {error_messages}") return get_standardized_response( success=False, message="Your code contains a common dictionary usage error that will cause unexpected behavior.", error="Dictionary misuse detected", error_details={ "errors": error_messages, "suggestion": "Check how you're updating dictionary variables. Use var_dict[key] = value instead of var_dict = value.", }, ) # Get full code using parent's method code_string = self._get_full_code() # Perform static analysis on the code before executing it try: import ast ast_tree = ast.parse(code_string) # Check for solver.solve() patterns solve_calls = 0 for node in ast.walk(ast_tree): if isinstance(node, ast.Call): if ( isinstance(node.func, ast.Attribute) and node.func.attr == "solve" and isinstance(node.func.value, ast.Name) ): solve_calls += 1 if solve_calls == 0: self.logger.warning("No solver.solve() call found in the code") return get_standardized_response( success=False, message="No solver.solve() call found in the code. Make sure to create a solver and call its solve() method.", error="Missing solve call", code_analysis="Missing solver.solve() call", ) # Check for export_solution calls export_calls = 0 for node in ast.walk(ast_tree): if isinstance(node, ast.Call): if ( isinstance(node.func, ast.Name) and node.func.id == "export_solution" ): export_calls += 1 if export_calls == 0: self.logger.warning("No export_solution() call found in the code") return get_standardized_response( success=False, message="No export_solution() call found in the code. Make sure to call export_solution() with your result.", error="Missing export_solution call", code_analysis="Missing export_solution() call", ) # AST-based validation has replaced these regex checks line_issues = [] except SyntaxError as e: line_num = e.lineno if hasattr(e, "lineno") else "?" col_num = e.offset if hasattr(e, "offset") else "?" error_text = ( e.text.strip() if hasattr(e, "text") and e.text else "unknown" ) self.logger.error( f"Syntax error in code at line {line_num}, column {col_num}: {e!s}" ) return get_standardized_response( success=False, message=f"Syntax error at line {line_num}, column {col_num}: {e!s}", error="Syntax error", error_details={ "line": line_num, "column": col_num, "code": error_text, "message": str(e), }, ) except Exception as e: self.logger.error(f"Error analyzing code: {e!s}") # Continue despite analysis error # Modify the code to enhance debugging modified_code = self._enhance_code_for_debugging(code_string) # Set timeout timeout_seconds = timeout.total_seconds() # Execute code with timeout start_time = time.time() self.last_result = execute_pysat_code( modified_code, timeout=timeout_seconds ) self.last_solve_time = time.time() - start_time # Check if there were execution errors if self.last_result.get("error"): error_msg = self.last_result["error"] self.logger.error(f"Error executing code: {error_msg}") # If we captured line issues during analysis, include them in error details if line_issues: return get_standardized_response( success=False, message=f"Error executing code: {error_msg}", error="Execution error", error_details={ "execution_error": error_msg, "code_issues": line_issues, }, ) return get_standardized_response( success=False, message=f"Error executing code: {error_msg}", error="Execution error", ) # Extract solver output to check for satisfiability output = self.last_result.get("output", "") satisfiable = False # Parse output for explicit satisfiability result sat_match = re.search( r"PYSAT_DEBUG_OUTPUT: model_is_satisfiable=(\w+)", output ) if sat_match: satisfiable = sat_match.group(1).lower() == "true" self.logger.debug( f"Found explicit satisfiability result: {satisfiable}" ) else: # Also try to find standard output messages if "Is satisfiable: True" in output: satisfiable = True self.logger.debug("Found 'Is satisfiable: True' in output") # Extract solution if available if self.last_result.get("solution"): self.last_solution = self.last_result["solution"] # Use our parsed satisfiability result to override the solution # This ensures consistency between satisfiable flag and status if sat_match or "Is satisfiable: True" in output: self.last_solution["satisfiable"] = satisfiable else: # Create a minimal solution with just the satisfiability flag self.last_solution = { "satisfiable": satisfiable, "status": "sat" if satisfiable else "unsat", "values": {}, } # Ensure there's a 'values' dictionary for standardized access if "values" not in self.last_solution: self.last_solution["values"] = {} # Extract solution data from the debug output if available # Look for the _LAST_SOLUTION debug output which contains the complete solution data last_solution_pattern = re.compile(r"DEBUG - _LAST_SOLUTION set to: (.*)") last_solution_match = last_solution_pattern.search(output) if last_solution_match: try: last_solution_str = last_solution_match.group(1) # Preprocess string to improve JSON compatibility last_solution_str = self._prepare_solution_string_for_json( last_solution_str ) # Try to parse as JSON import json try: last_solution_data = json.loads(last_solution_str) if isinstance(last_solution_data, dict): # Enhanced copy mechanism for all solution data self._merge_solution_data(last_solution_data) except json.JSONDecodeError as e: self.logger.error(f"Error parsing solution JSON: {e!s}") self.logger.debug( f"Problematic JSON string: {last_solution_str[:100]}..." ) # Attempt alternative parsing using ast.literal_eval which is more forgiving self._try_alternative_parsing(last_solution_str) # Even with parsing error, we'll keep the solution data we have self.last_solution["warning"] = ( f"Solution parsing warning: {e!s}" ) except Exception as e: self.logger.error(f"Error extracting solution: {e!s}") self.last_solution["warning"] = f"Solution extraction error: {e!s}" # Add solve time to solution self.last_solution["solve_time"] = f"{self.last_solve_time:.6f} seconds" # Add any warnings from static analysis if line_issues: self.last_solution["warnings"] = [ f"Line {issue['line']}: {issue['issue']}" for issue in line_issues ] # Standard message for SAT problems message = "Model solved successfully" + ( " (satisfiable)" if satisfiable else " (unsatisfiable)" ) response = { "message": message, "success": True, "solve_time": f"{self.last_solve_time:.6f} seconds", "output": output, "satisfiable": satisfiable, } # Include status and values - ensure consistency if satisfiable: response["status"] = "sat" response["satisfiable"] = True # Ensure this is explicitly set else: response["status"] = "unsat" response["satisfiable"] = False # Ensure this is explicitly set # Override with solution-specific status if available if self.last_solution.get("status"): response["status"] = self.last_solution["status"] # Include values dictionary if self.last_solution.get("values"): response["values"] = self.last_solution["values"] # Include warnings if any if self.last_solution.get("warnings"): response["warnings"] = self.last_solution["warnings"] return response except Exception as e: # Log the error self.logger.error(f"Error in solve_model: {e!s}", exc_info=True) # Return a structured error response return get_standardized_response( success=False, message=f"Error solving model: {e!s}", error="Internal error", ) def _prepare_solution_string_for_json(self, solution_str): """ Prepare a solution string for JSON parsing by handling common formatting issues. Args: solution_str: The solution string extracted from output Returns: A cleaned string ready for JSON parsing """ # Replace Python syntax with JSON syntax clean_str = solution_str.replace("'", '"') clean_str = clean_str.replace("True", "true").replace("False", "false") clean_str = clean_str.replace("None", "null") # Fix common tuple formatting issues (convert Python tuples to JSON arrays) # Handle simple tuples with two numbers clean_str = re.sub(r"\((\d+),\s*(\d+)\)", r"[\1, \2]", clean_str) # Handle tuples with three numbers (e.g., in cut_edges) clean_str = re.sub(r"\((\d+),\s*(\d+),\s*(\d+)\)", r"[\1, \2, \3]", clean_str) # Handle nested tuples in lists clean_str = re.sub(r"\[\(", "[[", clean_str) clean_str = re.sub(r"\)\]", "]]", clean_str) clean_str = re.sub(r"\),\s*\(", "], [", clean_str) # Remove trailing commas which are valid in Python but not in JSON clean_str = re.sub(r",\s*([}\]])", r"\1", clean_str) # Replace NaN and Infinity with null (which is JSON compatible) clean_str = re.sub(r"NaN", "null", clean_str) clean_str = re.sub(r"Infinity", "null", clean_str) return clean_str def _merge_solution_data(self, solution_data): """ Merge solution data from extracted output into the last_solution dictionary. Args: solution_data: Dictionary containing solution data """ # Copy all fields, not just dictionaries for key, value in solution_data.items(): # Special handling for dictionaries if isinstance(value, dict): if key not in self.last_solution or not isinstance( self.last_solution[key], dict ): self.last_solution[key] = {} # Merge dictionaries rather than replace for inner_key, inner_value in value.items(): self.last_solution[key][inner_key] = inner_value self.logger.debug(f"Merged dictionary '{key}' into solution") # Special handling for list fields (like 'queens' or 'knights') elif isinstance(value, list): self.last_solution[key] = value self.logger.debug(f"Copied list '{key}' to solution") # For primitive values else: self.last_solution[key] = value self.logger.debug(f"Copied value '{key}' to solution") # Ensure values are properly formatted if "values" in solution_data: self.logger.debug( f"Found values in solution_data: {solution_data['values']}" ) # Convert JSON booleans back to Python booleans for key, value in solution_data["values"].items(): if value is True or value == "true": self.last_solution["values"][key] = True elif value is False or value == "false": self.last_solution["values"][key] = False else: self.last_solution["values"][key] = value def _try_alternative_parsing(self, solution_str): """ Try alternative parsing methods when JSON parsing fails. Args: solution_str: The solution string that failed JSON parsing Returns: Boolean indicating whether parsing succeeded """ try: # Try using Python's literal_eval which can handle more Python-like syntax import ast solution_data = ast.literal_eval(solution_str) if isinstance(solution_data, dict): self.logger.info( "Successfully parsed solution using ast.literal_eval fallback" ) self._merge_solution_data(solution_data) return True except Exception as e: self.logger.debug(f"Alternative parsing also failed: {e!s}") # Even if both parsing methods fail, try to extract any useful data using regex if self._extract_data_with_regex(solution_str): return True return False def _extract_data_with_regex(self, solution_str): """ Extract critical data using regex patterns when all parsing fails. Args: solution_str: The solution string that failed parsing Returns: Boolean indicating whether any useful data was extracted """ extracted_something = False # Try to extract satisfiability sat_match = re.search( r"'satisfiable':\s*(true|false)", solution_str, re.IGNORECASE ) if sat_match: is_sat = sat_match.group(1).lower() == "true" self.last_solution["satisfiable"] = is_sat self.last_solution["status"] = "sat" if is_sat else "unsat" extracted_something = True self.logger.debug(f"Extracted satisfiability from regex: {is_sat}") # Try to extract lists like 'queens' or 'knights' positions for list_type in [ "queens", "knights", "board_representation", "set_s", "set_complement", "cut_edges", ]: list_match = re.search( f"'{list_type}':\\s*(\\[.*?\\])", solution_str, re.DOTALL ) if list_match: try: import ast # Clean up the list string and convert Python to JSON syntax list_str = list_match.group(1).replace("'", '"') list_data = ast.literal_eval(list_str) self.last_solution[list_type] = list_data self.logger.debug(f"Extracted {list_type} list using regex") extracted_something = True except Exception: pass # If this fails, we still continue with other extractions return extracted_something def _enhance_code_for_debugging(self, code_string: str) -> str: """ Enhances the code with debug statements to aid in debugging. Args: code_string: The original code string Returns: Enhanced code string with debug information """ # Add debug headers and imports if not present debug_header = ( "# === DEBUG INSTRUMENTATION ===\n" "import traceback\n" "# === END DEBUG HEADER ===\n\n" ) modified_code = debug_header + code_string # Modify the code to add debug info around solver.solve() calls modified_lines = [] lines = modified_code.split("\n") for _, line in enumerate(lines): modified_lines.append(line) # Add debugging for if solver.solve(): if re.search(r"if\s+\w+\.solve\(\)", line): # Capture the solver variable name solver_var = re.search(r"if\s+(\w+)\.solve\(\)", line) if solver_var: solver_name = solver_var.group(1) # Indent level of the original line indent = re.match(r"^(\s*)", line).group(1) next_indent = indent + " " # Assume 4-space indentation # Add debug prints after the conditional modified_lines.append( f'{next_indent}print(f"PYSAT_DEBUG_OUTPUT: model_is_satisfiable=True")' ) modified_lines.append( f'{next_indent}print(f"PYSAT_DEBUG_OUTPUT: solver={solver_name!r}")' ) # Add exception handling around export_solution calls if "export_solution(" in line: # Find indentation indent = re.match(r"^(\s*)", line).group(1) # Find the start of the function call call_start = line.find("export_solution(") # If the call is not at the beginning of the line, we need to preserve what comes before it prefix = line[:call_start] # Extract the call and arguments call_match = re.search(r"export_solution\((.*)\)", line) if call_match: args = call_match.group(1) # Replace the line with a try-except block modified_lines[-1] = f"{indent}try:" modified_lines.append( f"{indent} {prefix}export_solution({args})" ) modified_lines.append(f"{indent}except Exception as e:") modified_lines.append( f'{indent} print(f"PYSAT_DEBUG_OUTPUT: export_solution_error={{str(e)}}")' ) modified_lines.append( f'{indent} print(f"PYSAT_DEBUG_OUTPUT: traceback={{traceback.format_exc()}}")' ) modified_lines.append( f"{indent} # Re-raise to ensure proper error handling" ) modified_lines.append(f"{indent} raise") return "\n".join(modified_lines) def get_solution(self) -> dict[str, Any]: """ Get the current solution. Returns: A dictionary with the current solution """ if not self.last_solution: return {"message": "No solution available", "success": False} return { "message": "Solution retrieved", "success": True, "solution": self.last_solution, } def get_variable_value(self, variable_name: str) -> dict[str, Any]: """ Get the value of a variable from the current solution. Args: variable_name: The name of the variable Returns: A dictionary with the value of the variable """ if not self.last_solution: return {"message": "No solution available", "success": False} # First, check if the variable is directly available in the solution # This handles custom dictionaries like 'casting' if variable_name in self.last_solution: return { "message": f"Value of dictionary '{variable_name}'", "success": True, "value": self.last_solution[variable_name], } # Then check in the values dictionary for individual variables if "values" not in self.last_solution: return { "message": "Solution does not contain variable values", "success": False, } values = self.last_solution["values"] if variable_name not in values: return { "message": f"Variable '{variable_name}' not found in solution", "success": False, } return { "message": f"Value of variable '{variable_name}'", "success": True, "value": values[variable_name], } def get_solve_time(self) -> dict[str, Any]: """ Get the time taken for the last solve operation. Returns: A dictionary with the solve time information """ if self.last_solve_time is None: return {"message": "No solve time available", "success": False} return { "message": "Solve time retrieved", "success": True, "solve_time": f"{self.last_solve_time:.6f} seconds", }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/szeider/mcp-solver'

If you have feedback or need assistance with the MCP directory API, please join our Discord server