Skip to main content
Glama
emitter.gd8.1 kB
extends Node # Sentinel Emitter - Sends runtime events to Sentinel MCP server # This runs in the background and posts JSON events to http://127.0.0.1:8787/events var http_request: HTTPRequest var settings: Resource var enabled: bool = true var sentinel_url: String = "http://127.0.0.1:8787" var last_error_time: int = 0 var frame_counter: int = 0 func _ready(): name = "SentinelEmitter" # Load settings _load_settings() if not enabled: print("Sentinel Emitter: Disabled via settings") return # Create HTTP request node http_request = HTTPRequest.new() add_child(http_request) http_request.request_completed.connect(_on_request_completed) # Connect to error signals if not Engine.is_editor_hint(): _setup_error_monitoring() print("Sentinel Emitter: Ready (URL: %s/events)" % sentinel_url) func _load_settings(): var settings_path = "res://addons/sentinel_emitter/settings.tres" if ResourceLoader.exists(settings_path): settings = load(settings_path) enabled = settings.get("enabled", true) sentinel_url = settings.get("sentinel_url", "http://127.0.0.1:8787") else: print("Sentinel Emitter: No settings found, using defaults") func _setup_error_monitoring(): # Monitor engine errors using signals var tree = get_tree() if tree: # Connect to scene change for context updates tree.connect("current_scene_changed", _on_scene_changed) # Override push_error to capture script errors _override_error_functions() func _override_error_functions(): # This is a simplified approach - in a real implementation, # you might need to hook into Godot's internal error system pass func _physics_process(delta): if not enabled: return frame_counter += 1 # Periodically send snapshots (every 60 frames = 1 second at 60 FPS) if frame_counter % 60 == 0: _send_snapshot() func _send_snapshot(): var snapshot_data = { "type": "snapshot", "frame": frame_counter, "rng_seed": _get_current_rng_seed(), "timestamp": Time.get_datetime_string_from_system(true), "fighters": _collect_fighter_data(), "scene_data": _collect_scene_data() } _send_event(snapshot_data) func _collect_fighter_data() -> Array: var fighters = [] # Look for nodes with "Fighter" in their name or group var fighter_nodes = get_tree().get_nodes_in_group("fighters") if fighter_nodes.is_empty(): # Try to find by type/name fighter_nodes = _find_nodes_by_pattern("Fighter") for fighter in fighter_nodes: var fighter_data = { "id": fighter.name, "state": _get_fighter_state(fighter) } # Add position if available if fighter.has_method("get_global_position"): var pos = fighter.get_global_position() fighter_data.position = {"x": pos.x, "y": pos.y} # Add velocity if available if fighter.has_method("get_velocity") or fighter.has_signal("velocity_changed"): var vel = fighter.get("velocity", Vector2.ZERO) fighter_data.velocity = {"x": vel.x, "y": vel.y} # Add damage if available if fighter.has_method("get_damage_percent"): fighter_data.damage = fighter.get_damage_percent() elif fighter.has_signal("damage_changed") or "damage" in fighter: fighter_data.damage = fighter.get("damage", 0) # Add current move info if fighter.has_method("get_current_move"): fighter_data.current_move = fighter.get_current_move() if fighter.has_method("get_move_window"): fighter_data.move_window = fighter.get_move_window() fighters.append(fighter_data) return fighters func _get_fighter_state(fighter: Node) -> String: # Try different ways to get state machine state if fighter.has_method("get_current_state"): return str(fighter.get_current_state()) elif fighter.has_signal("state_changed"): return fighter.get("current_state", "unknown") elif fighter.get("state_machine"): var sm = fighter.get("state_machine") if sm and sm.has_method("get_current_state"): return str(sm.get_current_state()) return "unknown" func _find_nodes_by_pattern(pattern: String) -> Array: var found = [] _find_nodes_recursive(get_tree().current_scene, pattern, found) return found func _find_nodes_recursive(node: Node, pattern: String, found: Array): if node.name.contains(pattern) or node.get_script(): var script = node.get_script() if script and script.get_path().contains(pattern.to_lower()): found.append(node) for child in node.get_children(): _find_nodes_recursive(child, pattern, found) func _collect_scene_data() -> Dictionary: var scene_data = {} var current_scene = get_tree().current_scene if current_scene: scene_data.scene_path = current_scene.scene_file_path scene_data.scene_name = current_scene.name # Collect any nodes in important groups var combat_nodes = get_tree().get_nodes_in_group("combat") if not combat_nodes.is_empty(): scene_data.combat_active = true scene_data.combat_nodes_count = combat_nodes.size() return scene_data func _get_current_rng_seed() -> int: # Try to find the game's RNG instance var rng_node = get_tree().get_first_node_in_group("rng") if rng_node and rng_node.has_method("get_seed"): return rng_node.get_seed() # Fallback to system time-based seed return Time.get_ticks_msec() % 1000000 func send_script_error(file: String, line: int, message: String, stack: Array = []): if not enabled: return # Rate limit errors (max 1 per second) var current_time = Time.get_ticks_msec() if current_time - last_error_time < 1000: return last_error_time = current_time var error_data = { "type": "script_error", "file": file, "line": line, "message": message, "error_type": "runtime_error", "stack": stack, "timestamp": Time.get_datetime_string_from_system(true), "context": { "scene": get_tree().current_scene.scene_file_path if get_tree().current_scene else "unknown" } } _send_event(error_data) func send_test_result(suite: String, test_name: String, passed: bool, time_ms: float, error_msg: String = ""): if not enabled: return var test_data = { "type": "test_result", "suite": suite, "test_name": test_name, "passed": passed, "time_ms": time_ms, "timestamp": Time.get_datetime_string_from_system(true) } if not error_msg.is_empty(): test_data.error_message = error_msg _send_event(test_data) func _send_event(event_data: Dictionary): if not http_request or not enabled: return var json = JSON.new() var json_string = json.stringify(event_data) var headers = [ "Content-Type: application/json", "User-Agent: Godot-Sentinel-Emitter/1.0" ] var url = sentinel_url + "/events" var error = http_request.request(url, headers, HTTPClient.METHOD_POST, json_string) if error != OK: print("Sentinel Emitter: Failed to send event - HTTP error: ", error) func _on_request_completed(result: int, response_code: int, headers: PackedStringArray, body: PackedByteArray): if response_code != 200: print("Sentinel Emitter: Server responded with code: ", response_code) # If server is down, we could disable temporarily if response_code == 0: # Connection failed print("Sentinel Emitter: Server unreachable, events will be lost") func _on_scene_changed(): if not enabled: return var event_data = { "type": "game_event", "event_type": "scene_changed", "timestamp": Time.get_datetime_string_from_system(true), "data": { "new_scene": get_tree().current_scene.scene_file_path if get_tree().current_scene else "unknown" } } _send_event(event_data) func _capture_runtime_context(): # Called when errors are detected var context_data = { "type": "snapshot", "frame": frame_counter, "timestamp": Time.get_datetime_string_from_system(true), "fighters": _collect_fighter_data(), "scene_data": _collect_scene_data(), "rng_seed": _get_current_rng_seed() } _send_event(context_data) # Public API for manual error reporting func report_error(file: String, line: int, message: String): send_script_error(file, line, message) func report_test_result(suite: String, test: String, passed: bool, duration: float, error: String = ""): send_test_result(suite, test, passed, duration, error)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Snack-JPG/Godot-Sentinel-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server