"""
B&R Automation Studio MCP Server
An MCP (Model Context Protocol) server that exposes B&R Automation Studio tools
to AI assistants like Claude. Enables building projects, running simulators,
and interacting with OPC UA variables.
This server is designed to run on Windows and be invoked from Claude Code
running in WSL2 via PowerShell interop.
"""
import subprocess
import os
import tempfile
from mcp.server.fastmcp import FastMCP
from asyncua.sync import Client, ua
# B&R tool paths - adjust these to match your Automation Studio installation
AS_BUILD_PATH = "C:\\BrAutomation\\AS412\\bin-en\\BR.AS.Build.exe"
PVI_TRANSFER_PATH = "C:\\BrAutomation\\PVI\\V4.12\\PVI\\Tools\\PVITransfer\\PVITransfer.exe"
# OPC UA connection defaults for ARsim
OPCUA_HOST = "localhost"
OPCUA_PORT = 4840
OPCUA_USERNAME = "Admin"
OPCUA_PASSWORD = "password"
# OPC UA type conversion map
OPCUA_TYPE_MAP = {
"Boolean": (ua.VariantType.Boolean, lambda v: v.lower() in ("true", "1", "yes")),
"Int16": (ua.VariantType.Int16, int),
"Int32": (ua.VariantType.Int32, int),
"Float": (ua.VariantType.Float, float),
"Double": (ua.VariantType.Double, float),
"String": (ua.VariantType.String, str),
}
# Cached OPC UA client for connection reuse
_opcua_client = None
def _get_opcua_client():
"""Get or create a cached OPC UA client connection."""
global _opcua_client
if _opcua_client is None:
url = f"opc.tcp://{OPCUA_USERNAME}:{OPCUA_PASSWORD}@{OPCUA_HOST}:{OPCUA_PORT}/"
_opcua_client = Client(url=url)
_opcua_client.connect()
return _opcua_client
def _reset_opcua_client():
"""Reset the cached client on connection failure."""
global _opcua_client
if _opcua_client is not None:
try:
_opcua_client.disconnect()
except:
pass
_opcua_client = None
mcp = FastMCP("br-automation-mcp")
@mcp.tool()
async def build_automation_studio_project(project_file: str, configuration: str) -> str:
"""
Compile an Automation Studio project for a specific configuration.
Args:
project_file: Path to the .apj file (Windows path format).
configuration: Name of the configuration to build.
Returns:
Compiler output. Look for "RUC package created." to confirm success.
The RUC package is created at: <project>/Binaries/<config>/<cpu>/RUCPackage/RUCPackage.zip
"""
result = subprocess.run(
[AS_BUILD_PATH, project_file, "-c", configuration, "-buildMode", "Rebuild", "-simulation", "-buildRUCPackage"],
capture_output=True,
text=True,
timeout=60
)
return result.stdout + result.stderr
@mcp.tool()
async def run_automation_studio_simulator(ruc_package: str, start_simulator: bool = True) -> str:
"""
Generate ARsim structure and optionally launch the simulator.
Args:
ruc_package: Path to RUCPackage.zip (Windows path). Generated by build with -buildRUCPackage.
start_simulator: Whether to start the simulator after creation. Defaults to True.
Returns:
Execution output including ARsim destination path.
"""
destination = tempfile.mkdtemp(prefix="ARSim_")
pil_file = os.path.join(tempfile.gettempdir(), 'CreateARSim.pil')
pil_content = f'CreateARsimStructure "{ruc_package}", "{destination}", "Start={int(start_simulator)}"\n'
if start_simulator:
pil_content += 'Connection "/IF=TCPIP /SA=1", "/DA=2 /DAIP=127.0.0.1 /REPO=11160", "WT=120"'
with open(pil_file, 'w') as f:
f.write(pil_content)
result = subprocess.run(
[PVI_TRANSFER_PATH, "-silent", pil_file],
capture_output=True,
text=True,
timeout=180
)
output = f"ARSim destination: {destination}\n"
output += f"Return code: {result.returncode}\n"
output += f"stdout: {result.stdout}\n"
output += f"stderr: {result.stderr}"
if result.returncode == 0 and start_simulator:
loader_path = os.path.join(destination, 'ar000loader.exe')
if os.path.exists(loader_path):
subprocess.Popen(
loader_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
creationflags=0x00000008 # DETACHED_PROCESS
)
output += f"\nStarted simulator: {loader_path}"
else:
output += f"\nWarning: Loader not found at {loader_path}"
return output
@mcp.tool()
async def opcua_read_write(node_id: str, operation: str = "read", value: str = None, value_type: str = "Boolean") -> str:
"""
Read or write an OPC UA node on the simulator's OPC UA server.
Args:
node_id: OPC UA node identifier. Format: "ns=6;s=::<scope>:<variable>"
- Global variables: "ns=6;s=::AsGlobalPV:<varpath>"
- Task-local variables: "ns=6;s=::<TaskName>:<varname>"
- Example: "ns=6;s=::TempCtrl:heatOn"
operation: "read" or "write". Defaults to "read".
value: Value to write (required for write operations).
value_type: Data type for writes: Boolean, Int16, Int32, Float, Double, String.
Returns:
For read: Current value. For write: Confirmation message.
"""
try:
client = _get_opcua_client()
node = client.get_node(node_id)
if operation == "read":
result = node.read_value()
return f"Value: {result}"
elif operation == "write":
if value is None:
return "Error: value is required for write operations"
if value_type not in OPCUA_TYPE_MAP:
return f"Error: Unsupported value_type '{value_type}'. Use one of: {', '.join(OPCUA_TYPE_MAP.keys())}"
variant_type, converter = OPCUA_TYPE_MAP[value_type]
converted_value = converter(value)
data_value = ua.DataValue(ua.Variant(converted_value, variant_type))
node.write_value(data_value)
return f"Written {node_id} = {converted_value} ({value_type})"
else:
return f"Error: Invalid operation '{operation}'. Use 'read' or 'write'."
except Exception as e:
_reset_opcua_client()
return f"Error: {e}"
if __name__ == "__main__":
mcp.run()