from __future__ import annotations
import asyncio
import logging
import os
import time
from collections.abc import Awaitable, Callable
from typing import Annotated, Any, Literal
from fastmcp import Context
from mcp.types import ToolAnnotations
from models import MCPResponse
from services.registry import mcp_for_unity_tool
from services.tools import get_unity_instance_from_context
import transport.unity_transport as unity_transport
import transport.legacy.unity_connection as _legacy_conn
from transport.legacy.unity_connection import _extract_response_reason
from services.state.external_changes_scanner import external_changes_scanner
import services.resources.editor_state as editor_state
logger = logging.getLogger(__name__)
# Blocking reasons that indicate Unity is actually busy (not just stale status).
# Must match activityPhase values from EditorStateCache.cs
_REAL_BLOCKING_REASONS = {"compiling", "domain_reload", "running_tests", "asset_import"}
def _in_pytest() -> bool:
"""Return True when running inside pytest to avoid polling unmocked resources."""
return "PYTEST_CURRENT_TEST" in os.environ
async def wait_for_editor_ready(ctx: Context, timeout_s: float = 30.0) -> tuple[bool, float]:
"""Poll editor_state until Unity is ready for tool calls.
Returns (ready, elapsed_seconds). Treats exceptions from
get_editor_state as "not ready yet" so the loop survives transient
connection errors during domain reload.
"""
if _in_pytest():
return (True, 0.0)
start = time.monotonic()
while time.monotonic() - start < timeout_s:
try:
state_resp = await editor_state.get_editor_state(ctx)
state = state_resp.model_dump() if hasattr(state_resp, "model_dump") else state_resp
data = (state or {}).get("data") if isinstance(state, dict) else None
advice = (data or {}).get("advice") if isinstance(data, dict) else None
if isinstance(advice, dict):
if advice.get("ready_for_tools") is True:
return (True, time.monotonic() - start)
blocking = set(advice.get("blocking_reasons") or [])
if not (blocking & _REAL_BLOCKING_REASONS):
return (True, time.monotonic() - start)
except Exception:
pass # not ready yet — keep polling
await asyncio.sleep(0.25)
return (False, time.monotonic() - start)
def is_reloading_rejection(resp: Any) -> bool:
"""True when Unity rejected a command because it thinks it is reloading.
The command was never executed, so retrying is safe.
"""
if not isinstance(resp, dict) or resp.get("success"):
return False
data = resp.get("data") or {}
return data.get("reason") == "reloading" and resp.get("hint") == "retry"
def is_connection_lost_after_send(resp: Any) -> bool:
"""True when a mutation's response indicates TCP was lost after command was sent.
Script mutations trigger domain reload which kills the TCP connection.
The mutation was likely executed but the response was lost.
"""
if isinstance(resp, dict):
if resp.get("success"):
return False
err = (resp.get("error") or resp.get("message") or "").lower()
else:
if getattr(resp, "success", None):
return False
err = (getattr(resp, "error", "") or "").lower()
return "connection closed" in err or "disconnected" in err or "aborted" in err
async def send_mutation(
ctx: Context,
unity_instance: str | None,
command: str,
params: dict[str, Any],
*,
verify_after_disconnect: Callable[[], Awaitable[dict | None]] | None = None,
) -> dict | Any:
"""Send a non-idempotent mutation with reload recovery.
Handles the full retry/recovery pattern for script mutations:
1. Send with retry_on_reload=False (don't re-send if Unity is reloading)
2. If reloading rejection (command never executed) → wait + retry once
3. If connection lost after send → wait + verify via callback
4. Wait for editor readiness before returning
Args:
verify_after_disconnect: async callable returning a replacement response
dict if the mutation was verified after connection loss, or None to
keep the original error response.
"""
resp = await unity_transport.send_with_unity_instance(
_legacy_conn.async_send_command_with_retry,
unity_instance,
command,
params,
retry_on_reload=False,
)
if is_reloading_rejection(resp):
await wait_for_editor_ready(ctx)
resp = await unity_transport.send_with_unity_instance(
_legacy_conn.async_send_command_with_retry,
unity_instance,
command,
params,
retry_on_reload=False,
)
if is_connection_lost_after_send(resp) and verify_after_disconnect:
await wait_for_editor_ready(ctx)
verified = await verify_after_disconnect()
if verified is not None:
resp = verified
await wait_for_editor_ready(ctx)
return resp
async def verify_edit_by_sha(
unity_instance: str | None,
name: str,
path: str,
pre_sha: str | None,
) -> bool:
"""Verify a script edit was applied by comparing SHA before and after.
Returns True if the file's SHA changed (edit likely applied).
"""
if not pre_sha:
return False
try:
verify = await unity_transport.send_with_unity_instance(
_legacy_conn.async_send_command_with_retry,
unity_instance,
"manage_script",
{"action": "get_sha", "name": name, "path": path},
)
if isinstance(verify, dict) and verify.get("success"):
new_sha = (verify.get("data") or {}).get("sha256")
return bool(new_sha and new_sha != pre_sha)
except Exception as exc:
logger.debug(
"Failed to verify edit after disconnect for %s at %s: %r",
name, path, exc,
)
return False
@mcp_for_unity_tool(
description="Request a Unity asset database refresh and optionally a script compilation. Can optionally wait for readiness.",
annotations=ToolAnnotations(
title="Refresh Unity",
destructiveHint=True,
),
)
async def refresh_unity(
ctx: Context,
mode: Annotated[Literal["if_dirty", "force"], "Refresh mode"] = "if_dirty",
scope: Annotated[Literal["assets", "scripts", "all"],
"Refresh scope"] = "all",
compile: Annotated[Literal["none", "request"],
"Whether to request compilation"] = "none",
wait_for_ready: Annotated[bool,
"If true, wait until editor_state.advice.ready_for_tools is true"] = True,
) -> MCPResponse | dict[str, Any]:
unity_instance = get_unity_instance_from_context(ctx)
params: dict[str, Any] = {
"mode": mode,
"scope": scope,
"compile": compile,
"wait_for_ready": bool(wait_for_ready),
}
recovered_from_disconnect = False
# Don't retry on reload - refresh_unity triggers compilation/reload,
# so retrying would cause multiple reloads (issue #577)
response = await unity_transport.send_with_unity_instance(
_legacy_conn.async_send_command_with_retry,
unity_instance,
"refresh_unity",
params,
retry_on_reload=False,
)
# Handle connection errors during refresh/compile gracefully.
# Unity disconnects during domain reload, which is expected behavior - not a failure.
# If we sent the command and connection closed, the refresh was likely triggered successfully.
# Convert MCPResponse to dict if needed
response_dict = response if isinstance(response, dict) else (response.model_dump() if hasattr(response, "model_dump") else response.__dict__)
if not response_dict.get("success", True):
hint = response_dict.get("hint")
err = (response_dict.get("error") or response_dict.get("message") or "").lower()
reason = _extract_response_reason(response_dict)
# Connection closed/timeout during compile = refresh was triggered, Unity is reloading
# This is SUCCESS, not failure - don't return error to prevent Claude Code from retrying
is_connection_lost = (
"connection closed" in err
or "disconnected" in err
or "aborted" in err # WinError 10053: connection aborted
or "timeout" in err
or reason == "reloading"
)
if is_connection_lost and compile == "request":
# EXPECTED BEHAVIOR: When compile="request", Unity triggers domain reload which
# causes connection to close mid-command. This is NOT a failure - the refresh
# was successfully triggered. Treating this as success prevents Claude Code from
# retrying unnecessarily (which would cause multiple domain reloads - issue #577).
# The subsequent wait_for_ready loop (below) will verify Unity becomes ready.
logger.info("refresh_unity: Connection lost during compile (expected - domain reload triggered)")
recovered_from_disconnect = True
elif hint == "retry" or "could not connect" in err:
# Retryable error - proceed to wait loop if wait_for_ready
if not wait_for_ready:
return MCPResponse(**response_dict)
recovered_from_disconnect = True
else:
# Non-recoverable error - connection issue unrelated to domain reload
logger.warning(f"refresh_unity: Non-recoverable error (compile={compile}): {err[:100]}")
return MCPResponse(**response_dict)
# Optional server-side wait loop (defensive): if Unity tool doesn't wait or returns quickly,
# poll the canonical editor_state resource until ready or timeout.
ready_confirmed = False
if wait_for_ready:
ready_confirmed, _ = await wait_for_editor_ready(ctx, timeout_s=60.0)
# If we timed out without confirming readiness, log and return failure
if not ready_confirmed:
logger.warning("refresh_unity: Timed out after 60s waiting for editor to become ready")
return MCPResponse(
success=False,
message="Refresh triggered but timed out after 60s waiting for editor readiness.",
data={"timeout": True, "wait_seconds": 60.0},
)
# After readiness is restored, clear any external-dirty flag for this instance so future tools can proceed cleanly.
try:
inst = unity_instance or await editor_state.infer_single_instance_id(ctx)
if inst:
external_changes_scanner.clear_dirty(inst)
except Exception:
pass
if recovered_from_disconnect:
return MCPResponse(
success=True,
message="Refresh recovered after Unity disconnect/retry; editor is ready.",
data={"recovered_from_disconnect": True},
)
return MCPResponse(**response_dict) if isinstance(response, dict) else response