Genesis MCP Server

by dustland
Verified
"""Simulation service for Genesis World.""" import inspect import logging import sys import traceback from io import StringIO from typing import Any, Dict, List, Optional, Tuple import genesis as gs from pydantic import BaseModel from src.genesis_mcp.models import SimulationResult logger = logging.getLogger(__name__) class SimulationService: """Service for running Genesis World simulations.""" def __init__(self): """Initialize the simulation service.""" self.gs = gs def run_simulation(self, code: str, parameters: Dict[str, Any] = None) -> SimulationResult: """Run a Genesis World simulation from provided code. Args: code: Python code that uses Genesis World parameters: Optional parameters to pass to the simulation Returns: SimulationResult with simulation results and logs """ logs = [] result = {} # Create a string buffer to capture stdout for logs log_capture = StringIO() # Create a safe execution environment local_vars = { "gs": self.gs, "parameters": parameters or {}, "result": result, } # Save original stdout and redirect to our capture original_stdout = sys.stdout sys.stdout = log_capture try: # Execute the simulation code exec(code, local_vars, local_vars) # Extract any results assigned to the result variable if "result" in local_vars: result = local_vars["result"] # Get the logs logs = log_capture.getvalue().splitlines() return SimulationResult( result=result, logs=logs ) except Exception as e: logs.append(f"Error: {str(e)}") logs.append(traceback.format_exc()) logger.exception("Error running simulation") return SimulationResult( result={"error": str(e)}, logs=logs ) finally: # Restore stdout sys.stdout = original_stdout def get_world_info(self) -> Dict[str, Any]: """Get information about the Genesis World API. Returns: Dictionary with information about available modules, classes, and functions """ world_info = { "version": getattr(self.gs, "__version__", "unknown"), "modules": {}, "examples": self._get_examples(), } # Collect module information for name, obj in inspect.getmembers(self.gs): if name.startswith("_"): continue if inspect.ismodule(obj): module_info = self._get_module_info(obj) if module_info: world_info["modules"][name] = module_info elif inspect.isclass(obj): world_info["classes"] = world_info.get("classes", {}) world_info["classes"][name] = self._get_class_info(obj) elif inspect.isfunction(obj): world_info["functions"] = world_info.get("functions", {}) world_info["functions"][name] = self._get_function_info(obj) return world_info def _get_module_info(self, module) -> Dict[str, Any]: """Extract information about a module.""" info = { "doc": inspect.getdoc(module), "classes": {}, "functions": {} } for name, obj in inspect.getmembers(module): if name.startswith("_"): continue if inspect.isclass(obj): info["classes"][name] = self._get_class_info(obj) elif inspect.isfunction(obj): info["functions"][name] = self._get_function_info(obj) return info def _get_class_info(self, cls) -> Dict[str, Any]: """Extract information about a class.""" return { "doc": inspect.getdoc(cls), "methods": { name: self._get_function_info(method) for name, method in inspect.getmembers(cls, inspect.isfunction) if not name.startswith("_") } } def _get_function_info(self, func) -> Dict[str, Any]: """Extract information about a function.""" sig = inspect.signature(func) return { "doc": inspect.getdoc(func), "signature": str(sig), "parameters": { name: { "default": str(param.default) if param.default is not inspect.Parameter.empty else None, "annotation": str(param.annotation) if param.annotation is not inspect.Parameter.empty else None, } for name, param in sig.parameters.items() } } def _get_examples(self) -> List[Dict[str, str]]: """Get example simulations.""" return [ { "name": "Basic Simulation", "description": "A simple simulation of a world with moving agents", "code": """ # Create a simple world with agents world = gs.World() agent = gs.Agent(position=(0, 0)) world.add_agent(agent) # Run the simulation for 10 steps for step in range(10): agent.move(direction="north", distance=1) world.step() print(f"Step {step}: Agent at position {agent.position}") # Return final world state result = { "world_state": world.get_state(), "agent_positions": [a.position for a in world.agents] } """ }, { "name": "Resource Collection Simulation", "description": "Agents collecting resources in a world", "code": """ # Create a world with resources world = gs.World() # Add resources to the world for i in range(5): resource = gs.Resource(position=(i*5, i*5), type="food", quantity=10) world.add_resource(resource) # Add agents with different strategies collector_agent = gs.Agent(position=(0, 0), type="collector") explorer_agent = gs.Agent(position=(10, 10), type="explorer") world.add_agent(collector_agent) world.add_agent(explorer_agent) # Run simulation for step in range(20): # Agents follow their strategies for agent in world.agents: if agent.type == "collector": # Collectors move toward nearest resource nearest = world.find_nearest_resource(agent.position) if nearest: direction = world.get_direction(agent.position, nearest.position) agent.move(direction=direction, distance=1) # Collect if at resource if agent.position == nearest.position: agent.collect(nearest, quantity=1) elif agent.type == "explorer": # Explorers move randomly direction = ["north", "east", "south", "west"][step % 4] agent.move(direction=direction, distance=2) # Step the world forward world.step() print(f"Step {step}: Collector has {collector_agent.inventory}, Explorer has {explorer_agent.inventory}") # Return results result = { "steps": step + 1, "collector_inventory": collector_agent.inventory, "explorer_inventory": explorer_agent.inventory, "remaining_resources": [ {"position": r.position, "quantity": r.quantity} for r in world.resources ] } """ } ]