import sys
import subprocess
import json
import threading
import time
def run_verification():
# command = ["uv", "run", "robodk-mcp"]
# Or run the module directly
command = ["uv", "run", "python", "-m", "robodk_mcp.server"]
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr, # direct stderr to console
text=True,
cwd="/Users/lil/workspace/mcp_robodk"
)
def send_request(method, params=None, req_id=1):
msg = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": req_id
}
json_str = json.dumps(msg)
print(f"-> {method}")
process.stdin.write(json_str + "\n")
process.stdin.flush()
def read_response():
line = process.stdout.readline()
if not line:
return None
try:
return json.loads(line)
except:
print(f"Raw output: {line.strip()}")
return None
# 1. Initialize
send_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0"}
}, 1)
resp = read_response()
print(f"<- Init: {resp.get('result', {}).get('serverInfo', 'Error')}")
send_request("notifications/initialized", {}, None)
# 2. List Tools
send_request("tools/list", {}, 2)
resp = read_response()
tools = resp.get('result', {}).get('tools', [])
print(f"<- Tools: {[t['name'] for t in tools]}")
# 3. Start RoboDK
# Note: This might launch UI.
send_request("tools/call", {"name": "robodk.start", "arguments": {}}, 3)
resp = read_response()
# It might take time, so we wait for response
print(f"<- Start: {resp.get('result', {}).get('content', [{}])[0].get('text', 'Error')}")
# 4. Open Empty Station
send_request("tools/call", {"name": "robodk.open_station", "arguments": {"empty": True}}, 4)
resp = read_response()
print(f"<- Open Station: {resp.get('result', {}).get('content', [{}])[0].get('text', 'Error')}")
# 5. Add Robot
# We'll try adding a robot. If UR10 isn't in default lib, this might fail,
# but we'll see the error. RoboDK 'AddFile' often searches online if allowed/configured.
# Safe bet is often a generic one or checking if we can just skip if it fails (not success criteria to have internet)
# But Success Criteria says "A robot can be added".
send_request("tools/call", {"name": "station.add_robot", "arguments": {"robot_model": "UR10"}}, 5)
resp = read_response()
error = resp.get('error')
if error:
print(f"<- Add Robot Error: {error['message']}")
else:
print(f"<- Add Robot: {resp.get('result', {}).get('content', [{}])[0].get('text', 'ok')}")
# 6. Read Resource
send_request("resources/read", {"uri": "station://current/tree"}, 6)
resp = read_response()
content = resp.get('result', {}).get('contents', [{}])[0]
tree_json = content.get('text', '[]')
tree = json.loads(tree_json)
print(f"<- Tree Nodes: {len(tree)}")
# Print first few nodes
for node in tree[:3]:
print(f" - {node['name']} ({node['type']})")
process.terminate()
if __name__ == "__main__":
run_verification()