Skip to main content
Glama
mcp_test_harness_gui.py15 kB
import socket import json import time import tkinter as tk from tkinter import filedialog, messagebox, scrolledtext import threading import os import copy # Needed for deep copying commands SERVER_HOST = "localhost" SERVER_PORT = 5555 class MCPTestHarnessGUI: def __init__(self, root): self.root = root self.root.title("MCP Test Harness") # --- GUI Elements --- self.file_label = tk.Label(root, text="Test File:") self.file_label.grid(row=0, column=0, sticky="w") self.file_entry = tk.Entry(root, width=50) self.file_entry.grid(row=0, column=1) self.browse_button = tk.Button(root, text="Browse", command=self.browse_file) self.browse_button.grid(row=0, column=2) self.run_button = tk.Button(root, text="Run Test", command=self.run_test_thread) self.run_button.grid(row=1, column=1, pady=10) self.log_text = scrolledtext.ScrolledText( root, wrap=tk.WORD, width=80, height=30 ) self.log_text.grid(row=2, column=0, columnspan=3) # --- ADDED: Storage for GUIDs --- self.guid_map = {} # Maps requested_name -> actual_guid def browse_file(self): filename = filedialog.askopenfilename( filetypes=[("JSON Lines", "*.jsonl"), ("All Files", "*.*")] ) if filename: self.file_entry.delete(0, tk.END) self.file_entry.insert(0, filename) def log(self, message): # --- Modified to handle updates from thread --- def update_log(): self.log_text.insert(tk.END, message + "\n") self.log_text.see(tk.END) # Schedule the update in the main Tkinter thread self.root.after(0, update_log) print(message) # Also print to console def run_test_thread(self): # Disable run button during test self.run_button.config(state=tk.DISABLED) # Clear log and GUID map for new run self.log_text.delete(1.0, tk.END) self.guid_map = {} # Start the test in a separate thread threading.Thread(target=self.run_test, daemon=True).start() # --- ADDED: Recursive substitution function --- def substitute_placeholders(self, data_structure): """Recursively substitutes known names with GUIDs in dicts and lists.""" if isinstance(data_structure, dict): new_dict = {} for key, value in data_structure.items(): # Substitute the value if it's a string matching a known name if isinstance(value, str) and value in self.guid_map: new_dict[key] = self.guid_map[value] self.log( f" Substituted '{key}': '{value}' -> '{self.guid_map[value]}'" ) # Recursively process nested structures elif isinstance(value, (dict, list)): new_dict[key] = self.substitute_placeholders(value) else: new_dict[key] = value return new_dict elif isinstance(data_structure, list): new_list = [] for item in data_structure: # Substitute the item if it's a string matching a known name if isinstance(item, str) and item in self.guid_map: new_list.append(self.guid_map[item]) self.log( f" Substituted item in list: '{item}' -> '{self.guid_map[item]}'" ) # Recursively process nested structures elif isinstance(item, (dict, list)): new_list.append(self.substitute_placeholders(item)) else: new_list.append(item) return new_list else: # If it's not a dict or list, check if the value itself needs substitution if isinstance(data_structure, str) and data_structure in self.guid_map: substituted_value = self.guid_map[data_structure] self.log( f" Substituted value: '{data_structure}' -> '{substituted_value}'" ) return substituted_value return data_structure # Return other types unchanged def run_test(self): test_file = self.file_entry.get() if not os.path.exists(test_file): messagebox.showerror("Error", "Test file does not exist.") self.run_button.config(state=tk.NORMAL) # Re-enable button return try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: self.log(f"Connecting to MCP server at {SERVER_HOST}:{SERVER_PORT}...") sock.connect((SERVER_HOST, SERVER_PORT)) self.log("Connected ✅\n--- Test Start ---") with open(test_file, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): if not line.strip(): continue # Skip empty lines try: # Load original command from file original_command = json.loads(line.strip()) self.log( f"\n▶️ Command {line_num} (Original): {json.dumps(original_command)}" ) # --- MODIFIED: Substitute placeholders before sending --- # Deep copy to avoid modifying the original dict before logging command_to_send = copy.deepcopy(original_command) command_to_send = self.substitute_placeholders( command_to_send ) if command_to_send != original_command: self.log( f" Command {line_num} (Substituted): {json.dumps(command_to_send)}" ) # Send the potentially modified command sock.sendall( (json.dumps(command_to_send) + "\n").encode("utf-8") ) # Receive response (increase buffer size significantly for base64 previews) response_data = b"" sock.settimeout(120.0) # Set generous timeout for receiving while True: try: chunk = sock.recv(32768) # Read larger chunks if not chunk: # Connection closed prematurely? if not response_data: raise ConnectionAbortedError( "Server closed connection unexpectedly before sending response." ) break # No more data response_data += chunk # Basic check for newline delimiter, might need refinement for large data if b"\n" in chunk: break except socket.timeout: # Check if we received *any* data before timeout if not response_data: raise TimeoutError( f"Timeout waiting for response to Command {line_num}" ) else: self.log( f" Warning: Socket timeout, but received partial data ({len(response_data)} bytes). Assuming complete." ) break # Process what we got sock.settimeout(None) # Reset timeout # Decode and parse response response_text = response_data.decode("utf-8").strip() if not response_text: raise ValueError("Received empty response from server.") decoded_response = json.loads(response_text) # Log the response (truncate potentially huge base64 data) loggable_response = copy.deepcopy(decoded_response) if isinstance(loggable_response, dict): # Check common keys for base64 data and truncate for key in ["image_base64", "image_data"]: if ( key in loggable_response and isinstance(loggable_response[key], str) and len(loggable_response[key]) > 100 ): loggable_response[key] = ( loggable_response[key][:50] + "... [truncated]" ) # Also check within nested 'render' dict for snapshot if "render" in loggable_response and isinstance( loggable_response["render"], dict ): for key in ["image_base64", "image_data"]: render_dict = loggable_response["render"] if ( key in render_dict and isinstance(render_dict[key], str) and len(render_dict[key]) > 100 ): render_dict[key] = ( render_dict[key][:50] + "... [truncated]" ) self.log( f"✅ Response {line_num}: {json.dumps(loggable_response, indent=2)}" ) # --- ADDED: Capture GUID from response --- if isinstance(decoded_response, dict): # Check common patterns for created objects context_keys = [ "object", "light", "camera", "material", "cloner", "effector", "field", "shape", "group", ] for key in context_keys: if key in decoded_response and isinstance( decoded_response[key], dict ): obj_info = decoded_response[key] req_name = obj_info.get("requested_name") guid = obj_info.get("guid") act_name = obj_info.get("actual_name") if req_name and guid: self.guid_map[req_name] = guid self.log( f" Captured GUID: '{req_name}' -> {guid} (Actual name: '{act_name}')" ) # Also map actual name if different, preferring requested name if collision if ( act_name and act_name != req_name and act_name not in self.guid_map ): self.guid_map[act_name] = guid self.log( f" Mapped actual name: '{act_name}' -> {guid}" ) break # Assume only one primary object context per response # Brief pause between commands time.sleep(0.1) except json.JSONDecodeError as e: self.log(f"❌ Error decoding JSON for line {line_num}: {e}") self.log(f" Raw line: {line.strip()}") break # Stop test on error except Exception as cmd_e: self.log(f"❌ Error processing command {line_num}: {cmd_e}") import traceback self.log(traceback.format_exc()) break # Stop test on error self.log("--- Test End ---") except ConnectionRefusedError: self.log( f"❌ Connection Refused: Ensure C4D plugin server is running on {SERVER_HOST}:{SERVER_PORT}." ) messagebox.showerror( "Connection Error", "Connection Refused. Is the Cinema 4D plugin server running?", ) except socket.timeout: self.log( f"❌ Connection Timeout: Could not connect to {SERVER_HOST}:{SERVER_PORT}." ) messagebox.showerror("Connection Error", "Connection Timeout.") except Exception as e: self.log(f"❌ Unexpected Error: {str(e)}") import traceback self.log(traceback.format_exc()) messagebox.showerror("Error", f"An unexpected error occurred:\n{str(e)}") finally: # Re-enable run button after test finishes or errors out self.root.after(0, lambda: self.run_button.config(state=tk.NORMAL)) if __name__ == "__main__": root = tk.Tk() app = MCPTestHarnessGUI(root) root.mainloop()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ttiimmaacc/cinema4d-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server