YetAnotherUnityMcp
by Azreal42
- YetAnotherUnityMcp
- server
"""
TCP client for connecting to Unity TCP server.
Despite the filename, this is a TCP client implementation (not WebSocket).
The name is kept for backward compatibility reasons.
This module provides:
- Low-level TCP communication with the Unity server
- Custom binary framing protocol for message boundaries
- Reconnection and keep-alive mechanisms
- Asynchronous request-response pattern
"""
import json
import logging
import uuid
import asyncio
import struct
import socket
import time
from typing import Dict, Any, Optional, List, Union, Callable
logger = logging.getLogger("low_level_tcp_client")
# Protocol constants
START_MARKER = 0x02 # STX (Start of Text)
END_MARKER = 0x03 # ETX (End of Text)
PING_MESSAGE = "PING"
PONG_RESPONSE = "PONG"
HANDSHAKE_REQUEST = "YAUM_HANDSHAKE_REQUEST"
HANDSHAKE_RESPONSE = "YAUM_HANDSHAKE_RESPONSE"
RECONNECT_DELAY = 2 # seconds
class LowLevelTcpClient:
"""
TCP client for connecting to the Unity MCP TCP server.
Named WebSocketClient for backward compatibility.
"""
def __init__(self, url: str = "tcp://localhost:8080/"):
"""
Initialize the TCP client.
Args:
url: TCP server URL (tcp://host:port/)
"""
self.url = url
self.reader = None
self.writer = None
self.connected = False
self.pending_requests: Dict[str, asyncio.Future] = {}
self.receive_task = None
self.callbacks: Dict[str, List[Callable]] = {
"connected": [],
"disconnected": [],
"message": [],
"error": []
}
if url.startswith("tcp://"):
self.host = url.replace("tcp://", "").split("/")[0].split(":")[0]
port_str = url.replace("tcp://", "").split("/")[0].split(":")
self.port = int(port_str[1]) if len(port_str) > 1 else 8080
else:
# Assume basic host:port format
parts = url.split(":")
self.host = parts[0]
self.port = int(parts[1]) if len(parts) > 1 else 8080
async def connect(self, max_attempts: int = 5) -> bool:
"""
Connect to the Unity TCP server with retry mechanism.
Args:
max_attempts: Maximum number of connection attempts
Returns:
True if connected successfully, False otherwise
"""
if self.connected:
logger.warning("Already connected to Unity TCP server")
return True
attempts = 0
connected = False
last_error = None
while attempts < max_attempts and not connected:
attempts += 1
try:
logger.info(f"Connecting to Unity TCP server at {self.host}:{self.port} (attempt {attempts}/{max_attempts})")
# Create a TCP connection
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
# Perform handshake
if await self._perform_handshake():
self.connected = True
connected = True
logger.info("Connected to Unity TCP server")
# Start the message receive loop
self.receive_task = asyncio.create_task(self._receive_messages())
# Trigger connected callbacks
await self._trigger_callbacks("connected")
return True
else:
logger.error("Handshake failed")
# Close connection and retry
if self.writer:
self.writer.close()
await self.writer.wait_closed()
self.reader = None
self.writer = None
except Exception as e:
last_error = str(e)
logger.error(f"Error connecting to Unity TCP server: {last_error}")
# Wait before retrying
if attempts < max_attempts:
wait_time = RECONNECT_DELAY * attempts # Progressive backoff
logger.info(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
# All attempts failed
await self._trigger_callbacks("error", f"Connection error: {last_error}")
return False
async def disconnect(self) -> None:
"""
Disconnect from the Unity TCP server.
"""
if not self.connected:
logger.warning("Not connected to Unity TCP server")
return
try:
logger.info("Disconnecting from Unity TCP server")
# Cancel the receive task
if self.receive_task:
self.receive_task.cancel()
self.receive_task = None
# Close the TCP connection
if self.writer:
self.writer.close()
await self.writer.wait_closed()
self.writer = None
self.reader = None
self.connected = False
logger.info("Disconnected from Unity TCP server")
# Trigger disconnected callbacks
await self._trigger_callbacks("disconnected")
# Cancel all pending requests
for request_id, future in self.pending_requests.items():
if not future.done():
future.set_exception(Exception("Disconnected from server"))
self.pending_requests.clear()
except Exception as e:
logger.error(f"Error disconnecting from Unity TCP server: {str(e)}")
await self._trigger_callbacks("error", f"Disconnection error: {str(e)}")
async def _perform_handshake(self) -> bool:
"""
Perform handshake with the TCP server.
Returns:
True if handshake was successful, False otherwise
"""
try:
# Send handshake request - ensure it's exactly the expected string with no CR/LF
handshake_bytes = HANDSHAKE_REQUEST.encode('utf-8')
logger.info(f"Sending handshake request: {HANDSHAKE_REQUEST} ({len(handshake_bytes)} bytes)")
# Log hex representation for debugging
hex_bytes = ' '.join(f'{b:02x}' for b in handshake_bytes)
logger.info(f"Handshake bytes: {hex_bytes}")
self.writer.write(handshake_bytes)
await self.writer.drain()
# Read handshake response with timeout
logger.info("Waiting for handshake response...")
response_bytes = await asyncio.wait_for(self.reader.read(1024), timeout=5.0)
# Log hex representation for debugging
hex_bytes = ' '.join(f'{b:02x}' for b in response_bytes)
logger.info(f"Response bytes: {hex_bytes}")
response = response_bytes.decode('utf-8').strip()
logger.info(f"Received handshake response: '{response}' ({len(response_bytes)} bytes)")
if response == HANDSHAKE_RESPONSE:
logger.info("Handshake successful")
# Add a delay to ensure server is ready
await asyncio.sleep(0.5)
logger.info("Handshake completed, now ready for framed communication")
return True
else:
logger.error(f"Invalid handshake response: {response}")
return False
except asyncio.TimeoutError:
logger.error("Handshake timeout")
return False
except Exception as e:
logger.error(f"Handshake error: {str(e)}")
return False
async def _send_frame(self, message: str) -> None:
"""
Send a framed message to the TCP server.
Args:
message: Message to send
"""
if not self.connected or not self.writer:
raise Exception("Not connected to Unity TCP server")
# Convert message to bytes
message_bytes = message.encode('utf-8')
# Create frame: STX + [LENGTH:4] + [MESSAGE] + ETX
frame = bytearray()
frame.append(START_MARKER)
frame.extend(struct.pack("<I", len(message_bytes))) # Length as 4-byte little-endian
frame.extend(message_bytes)
frame.append(END_MARKER)
# Log frame details for debugging
logger.debug(f"Sending frame: STX + {len(message_bytes)} bytes + ETX (total: {len(frame)} bytes)")
if len(message) > 200:
logger.debug(f"Message content (truncated): {message[:100]}... (total: {len(message)} bytes)")
else:
logger.debug(f"Message content: {message}")
# Send the frame in a single operation
self.writer.write(frame)
await self.writer.drain()
logger.debug("Frame sent successfully")
async def _receive_frame(self) -> Optional[str]:
"""
Receive a framed message from the TCP server.
Returns:
Received message as string, or None if connection closed
"""
if not self.connected or not self.reader:
raise Exception("Not connected to Unity TCP server")
try:
# Read until start marker (STX)
logger.debug("Waiting for start marker (STX)...")
bytes_checked = 0
start_marker_found = False
# Store initial bytes for debugging
initial_bytes = bytearray()
while bytes_checked < 1000: # Reasonable limit to avoid infinite loop
try:
b = await asyncio.wait_for(self.reader.readexactly(1), timeout=5)
bytes_checked += 1
# Store initial bytes for debugging (up to 16 bytes)
if len(initial_bytes) < 16:
initial_bytes.append(b[0])
if b[0] == START_MARKER:
logger.debug(f"Found start marker (STX) after {bytes_checked} bytes")
start_marker_found = True
break
# Log occasionally
if bytes_checked % 10 == 0:
hex_initial = ' '.join(f'{b:02x}' for b in initial_bytes)
logger.debug(f"Checked {bytes_checked} bytes, no start marker yet. Initial bytes: {hex_initial}")
except asyncio.TimeoutError:
# Timeout reading, try again
continue
except asyncio.IncompleteReadError:
logger.error("Connection closed while waiting for start marker")
return None
if not start_marker_found:
hex_initial = ' '.join(f'{b:02x}' for b in initial_bytes)
logger.error(f"No start marker found after {bytes_checked} bytes. Initial bytes: {hex_initial}")
return None
# Read message length (4 bytes)
logger.debug("Reading message length (4 bytes)...")
try:
length_bytes = await self.reader.readexactly(4)
message_length = struct.unpack("<I", length_bytes)[0]
# Sanity check for message length
if message_length <= 0 or message_length > 10 * 1024 * 1024: # Max 10 MB
logger.error(f"Invalid message length: {message_length}")
return None
logger.debug(f"Message length: {message_length} bytes")
# Read message data
logger.debug(f"Reading message data ({message_length} bytes)...")
message_bytes = await self.reader.readexactly(message_length)
# Prepare to read end marker (ETX)
logger.debug("Reading end marker (ETX)...")
# Log the last few bytes of the message for debugging
last_bytes = message_bytes[-min(10, len(message_bytes)):]
logger.debug(f"Last {len(last_bytes)} bytes of message: {' '.join(f'{b:02x}' for b in last_bytes)} (ASCII: {''.join(chr(b) if 32 <= b < 127 else '.' for b in last_bytes)})")
# Try to read the end marker with more debug info
try:
end_marker = await self.reader.readexactly(1)
logger.debug(f"End marker byte: 0x{end_marker[0]:02x} (expected: 0x{END_MARKER:02x})")
if end_marker[0] != END_MARKER:
# Special case: if the byte we got is '}' (0x7D), this might be the end of a JSON message
# Let's try to be resilient and accept it anyway
if end_marker[0] == 0x7D: # ASCII '}'
logger.warning("Got '}' (0x7D) instead of ETX marker - potential JSON end, trying to recover")
# Try to decode and validate the message
try:
message_text = message_bytes.decode('utf-8')
if message_text.strip().endswith('}'):
# Seems like valid JSON, let's try to parse it
try:
json.loads(message_text)
logger.warning("Message is valid JSON despite incorrect end marker - accepting anyway")
return message_text
except json.JSONDecodeError:
logger.warning("Message ends with '}' but is not valid JSON, rejecting")
except:
logger.warning("Failed to decode message as UTF-8, rejecting")
# Try to read a few more bytes to see what follows
try:
extra_bytes = await asyncio.wait_for(self.reader.read(10), timeout=0.5)
if extra_bytes:
logger.error(f"Missing end marker, got: 0x{end_marker[0]:02x} followed by: {' '.join(f'{b:02x}' for b in extra_bytes)}")
else:
logger.error(f"Missing end marker, got: 0x{end_marker[0]:02x} (no additional bytes available)")
except:
logger.error(f"Missing end marker, got: 0x{end_marker[0]:02x}")
return None
except Exception as e:
logger.error(f"Error reading end marker: {str(e)}")
return None
# We already handled the error case above
# If we get here, the end marker was correct
# Convert to string
message = message_bytes.decode('utf-8')
logger.debug(f"Successfully received framed message: {message[:100]}...")
return message
except asyncio.IncompleteReadError:
logger.error("Connection closed while reading message")
return None
except Exception as e:
logger.error(f"Error receiving frame: {str(e)}")
return None
def _extract_text_from_content(self, content_array: List[Dict]) -> str:
"""
Extract text content from MCP content array.
Args:
content_array: List of content items
Returns:
Extracted text or empty string if no text found
"""
if not content_array:
return ""
text_parts = []
for item in content_array:
if not isinstance(item, dict):
continue
content_type = item.get("type")
if content_type == "text" and "text" in item:
text_parts.append(item["text"])
return "\n".join(text_parts)
async def send_command(self, command: str, parameters: Optional[Dict[str, Any]] = None) -> Any:
"""
Send a command to the Unity TCP server.
Args:
command: Command to execute
parameters: Command parameters
Returns:
Command result
"""
if not self.connected:
raise Exception("Not connected to Unity TCP server")
# Generate a unique request ID
request_id = f"req_{uuid.uuid4().hex}"
# Create a future for the response
future = asyncio.get_running_loop().create_future()
self.pending_requests[request_id] = future
# Create the request message
request = {
"id": request_id,
"command": command,
"client_timestamp": int(time.time() * 1000)
}
if parameters:
request["parameters"] = parameters
# Send the request
try:
await self._send_frame(json.dumps(request))
logger.debug(f"Sent request {request_id}: {command}")
# Wait for the response with a timeout
try:
response = await asyncio.wait_for(future, timeout=60.0)
logger.debug(f"Received response for request {request_id}")
# Process the response
if response.get("status") == "error":
error_message = response.get("error", "Unknown error")
raise Exception(f"Error executing command {command}: {error_message}")
# Extract result, handling MCP response format
result = response.get("result")
# Check if result is in the new MCPResponse format with content array
if isinstance(result, dict) and "content" in result:
# Handle new MCP response format
if result.get("isError", False):
# This is an error response via the content array
error_text = self._extract_text_from_content(result["content"])
raise Exception(f"Error in MCP response: {error_text}")
# Return just the content for now - advanced clients can interpret the content types
return result
return result
except asyncio.TimeoutError:
self.pending_requests.pop(request_id, None)
raise Exception(f"Timeout waiting for response to command {command}")
except Exception as e:
self.pending_requests.pop(request_id, None)
logger.error(f"Error sending command {command}: {str(e)}")
raise
async def _receive_messages(self) -> None:
"""
Receive and process messages from the Unity TCP server.
"""
if not self.connected:
logger.error("TCP not connected")
return
logger.info("Starting message receive loop")
# Send a ping every 30 seconds
ping_interval = 30
last_ping_time = time.time()
# Send an initial ping right away to make sure the framing works
try:
logger.info("Sending initial PING to test framing...")
await self._send_frame(PING_MESSAGE)
logger.info("Initial PING sent successfully")
except Exception as e:
logger.error(f"Error sending initial ping: {str(e)}")
# Continue anyway, we'll try to recover
try:
while self.connected:
# Check if it's time to send a ping
current_time = time.time()
if current_time - last_ping_time >= ping_interval:
try:
logger.info("Sending periodic PING...")
await self._send_frame(PING_MESSAGE)
last_ping_time = current_time
logger.info("PING sent successfully")
except Exception as e:
logger.error(f"Error sending ping: {str(e)}")
break # Connection likely broken
# Try to read with a short timeout
try:
# Use wait_for with a short timeout to allow for ping checks
logger.debug("Waiting to receive frame from server...")
message = await asyncio.wait_for(self._receive_frame(), timeout=1.0)
# Check if connection closed
if message is None:
logger.error("Connection closed by server")
break
# Handle special messages
if message == PONG_RESPONSE:
logger.debug("Received PONG")
continue
elif message == PING_MESSAGE:
# Respond to PING with PONG
logger.debug("Received PING, responding with PONG")
await self._send_frame(PONG_RESPONSE)
continue
try:
# Parse the message
data = json.loads(message)
# Log the message (truncated if large)
message_str = message
if len(message_str) > 500:
message_str = message_str[:500] + "... (truncated)"
logger.debug(f"Received message: {message_str}")
# Trigger message callbacks
await self._trigger_callbacks("message", data)
# Check if this is a response to a pending request
request_id = data.get("id")
if request_id in self.pending_requests:
future = self.pending_requests.pop(request_id)
if not future.done():
future.set_result(data)
except json.JSONDecodeError:
logger.error(f"Invalid JSON received: {message}")
except Exception as e:
logger.exception(f"Error processing message: {str(e)}")
except asyncio.TimeoutError:
# This is expected - just continue to next iteration
continue
except asyncio.CancelledError:
logger.info("TCP receive task cancelled")
except Exception as e:
logger.error(f"TCP receive error: {str(e)}")
await self._trigger_callbacks("error", f"Receive error: {str(e)}")
# Close the connection on error
if self.connected:
await self.disconnect()
def on(self, event: str, callback: Callable) -> None:
"""
Register a callback for an event.
Args:
event: Event name (connected, disconnected, message, error)
callback: Callback function
"""
if event not in self.callbacks:
logger.warning(f"Unknown event: {event}")
return
self.callbacks[event].append(callback)
def off(self, event: str, callback: Callable) -> None:
"""
Unregister a callback for an event.
Args:
event: Event name (connected, disconnected, message, error)
callback: Callback function
"""
if event not in self.callbacks:
logger.warning(f"Unknown event: {event}")
return
if callback in self.callbacks[event]:
self.callbacks[event].remove(callback)
async def _trigger_callbacks(self, event: str, data: Any = None) -> None:
"""
Trigger callbacks for an event.
Args:
event: Event name
data: Event data
"""
if event not in self.callbacks:
return
for callback in self.callbacks[event]:
try:
if data is not None:
await callback(data)
else:
await callback()
except Exception as e:
logger.error(f"Error in {event} callback: {str(e)}")