Skip to main content
Glama
cli.py9.89 kB
import asyncio import os from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass from typing import List, Union from pymodbus.client import AsyncModbusTcpClient, AsyncModbusUdpClient, AsyncModbusSerialClient from pymodbus.exceptions import ModbusException from mcp.server.fastmcp import FastMCP, Context from mcp.server.fastmcp.prompts import base from dotenv import load_dotenv load_dotenv() # Modbus client configuration from environment variables MODBUS_TYPE = os.environ.get("MODBUS_TYPE", "tcp").lower() # tcp, udp, or serial MODBUS_HOST = os.environ.get("MODBUS_HOST", "127.0.0.1") MODBUS_PORT = int(os.environ.get("MODBUS_PORT", 502)) MODBUS_SERIAL_PORT = os.environ.get("MODBUS_SERIAL_PORT", "/dev/ttyUSB0") MODBUS_BAUDRATE = int(os.environ.get("MODBUS_BAUDRATE", 9600)) MODBUS_PARITY = os.environ.get("MODBUS_PARITY", "N") MODBUS_STOPBITS = int(os.environ.get("MODBUS_STOPBITS", 1)) MODBUS_BYTESIZE = int(os.environ.get("MODBUS_BYTESIZE", 8)) MODBUS_TIMEOUT = float(os.environ.get("MODBUS_TIMEOUT", 1)) MODBUS_DEFAULT_SLAVE_ID = int(os.environ.get("MODBUS_DEFAULT_SLAVE_ID", 1)) # Application context for dependency injection @dataclass class AppContext: modbus_client: Union[AsyncModbusTcpClient, AsyncModbusUdpClient, AsyncModbusSerialClient] # Lifespan manager for Modbus client @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: """Manage Modbus client lifecycle.""" # Initialize Modbus client based on MODBUS_TYPE if MODBUS_TYPE == "tcp": client = AsyncModbusTcpClient(host=MODBUS_HOST, port=MODBUS_PORT) elif MODBUS_TYPE == "udp": client = AsyncModbusUdpClient(host=MODBUS_HOST, port=MODBUS_PORT) elif MODBUS_TYPE == "serial": client = AsyncModbusSerialClient( port=MODBUS_SERIAL_PORT, baudrate=MODBUS_BAUDRATE, parity=MODBUS_PARITY, stopbits=MODBUS_STOPBITS, bytesize=MODBUS_BYTESIZE, timeout=MODBUS_TIMEOUT ) else: raise ValueError(f"Invalid MODBUS_TYPE: {MODBUS_TYPE}. Must be 'tcp', 'udp', or 'serial'.") # Connect to the Modbus device await client.connect() if not client.connected: raise RuntimeError(f"Failed to connect to Modbus {MODBUS_TYPE} device") try: yield AppContext(modbus_client=client) finally: # Cleanup client.close() # Initialize MCP server mcp = FastMCP( name="Modbus MCP Server", dependencies=["pymodbus"], lifespan=app_lifespan ) # Tools: Read and write Modbus registers @mcp.tool() async def read_register(address: int, ctx: Context, slave_id: int = MODBUS_DEFAULT_SLAVE_ID) -> str: """ Read a single Modbus holding register. Parameters: address (int): The starting address of the holding register (0-65535). slave_id (int): The Modbus slave ID (device ID).(2025/05/12) Returns: str: The value of the register or an error message. """ client = ctx.request_context.lifespan_context.modbus_client try: result = await client.read_holding_registers(address=address, count=1, slave=slave_id) if result.isError(): return f"Error reading register {address} from slave {slave_id}: {result}" ctx.info(f"Read register {address} from slave {slave_id}: {result.registers[0]}") return f"Slave {slave_id}, Register {address} Value: {result.registers[0]}" except ModbusException as e: return f"Error communicating with slave {slave_id}: {str(e)}" @mcp.tool() async def write_register(address: int, value: int, ctx: Context, slave_id: int = MODBUS_DEFAULT_SLAVE_ID) -> str: # 修改點 1 """ Write a value to a Modbus holding register. Parameters: address (int): The address of the holding register (0-65535). value (int): The value to write (0-65535). slave_id (int): The Modbus slave ID (device ID). Returns: str: Success message or an error message. """ client = ctx.request_context.lifespan_context.modbus_client try: # 修改點 2 result = await client.write_register(address=address, value=value, slave=slave_id) if result.isError(): return f"Error writing to register {address} on slave {slave_id}: {result}" ctx.info(f"Wrote {value} to register {address} on slave {slave_id}") return f"Successfully wrote {value} to register {address} on slave {slave_id}" except ModbusException as e: return f"Error communicating with slave {slave_id}: {str(e)}" # Tools: Coil operations @mcp.tool() async def read_coils(address: int, count: int, ctx: Context, slave_id: int = MODBUS_DEFAULT_SLAVE_ID) -> str: # 修改點 1 """ Read the status of multiple Modbus coils. Parameters: address (int): The starting address of the coils (0-65535). count (int): The number of coils to read (1-2000). slave_id (int): The Modbus slave ID (device ID). Returns: str: A list of coil states (True/False) or an error message. """ client = ctx.request_context.lifespan_context.modbus_client try: if count <= 0: return "Error: Count must be positive" # 修改點 2 result = await client.read_coils(address=address, count=count, slave=slave_id) if result.isError(): return f"Error reading coils starting at {address} from slave {slave_id}: {result}" ctx.info(f"Read {count} coils starting at {address} from slave {slave_id}: {result.bits}") return f"Slave {slave_id}, Coils {address} to {address+count-1}: {result.bits[:count]}" except ModbusException as e: return f"Error communicating with slave {slave_id}: {str(e)}" @mcp.tool() async def write_coil(address: int, value: bool, ctx: Context, slave_id: int = MODBUS_DEFAULT_SLAVE_ID) -> str: # 修改點 1 """ Write a value to a single Modbus coil. Parameters: address (int): The address of the coil (0-65535). value (bool): The value to write (True for ON, False for OFF). slave_id (int): The Modbus slave ID (device ID). Returns: str: Success message or an error message. """ client = ctx.request_context.lifespan_context.modbus_client try: # 修改點 2 result = await client.write_coil(address=address, value=value, slave=slave_id) if result.isError(): return f"Error writing to coil {address} on slave {slave_id}: {result}" ctx.info(f"Wrote {value} to coil {address} on slave {slave_id}") return f"Successfully wrote {value} to coil {address} on slave {slave_id}" except ModbusException as e: return f"Error communicating with slave {slave_id}: {str(e)}" # Tools: Input registers @mcp.tool() async def read_input_registers(address: int, count: int, ctx: Context, slave_id: int = MODBUS_DEFAULT_SLAVE_ID) -> str: # 修改點 1 """ Read multiple Modbus input registers. Parameters: address (int): The starting address of the input registers (0-65535). count (int): The number of registers to read (1-125). slave_id (int): The Modbus slave ID (device ID). Returns: str: A list of register values or an error message. """ client = ctx.request_context.lifespan_context.modbus_client try: if count <= 0: return "Error: Count must be positive" # 修改點 2 result = await client.read_input_registers(address=address, count=count, slave=slave_id) if result.isError(): return f"Error reading input registers starting at {address} from slave {slave_id}: {result}" ctx.info(f"Read {count} input registers starting at {address} from slave {slave_id}: {result.registers}") return f"Slave {slave_id}, Input Registers {address} to {address+count-1}: {result.registers}" except ModbusException as e: return f"Error communicating with slave {slave_id}: {str(e)}" # Tools: Read multiple holding registers @mcp.tool() async def read_multiple_holding_registers(address: int, count: int, ctx: Context, slave_id: int = MODBUS_DEFAULT_SLAVE_ID) -> str: # 修改點 1 """ Read multiple Modbus holding registers. Parameters: address (int): The starting address of the holding registers (0-65535). count (int): The number of registers to read (1-125). slave_id (int): The Modbus slave ID (device ID). Returns: str: A list of register values or an error message. """ client = ctx.request_context.lifespan_context.modbus_client try: if count <= 0: return "Error: Count must be positive" # 修改點 2 result = await client.read_holding_registers(address=address, count=count, slave=slave_id) if result.isError(): return f"Error reading holding registers starting at {address} from slave {slave_id}: {result}" ctx.info(f"Read {count} holding registers starting at {address} from slave {slave_id}: {result.registers}") return f"Slave {slave_id}, Holding Registers {address} to {address+count-1}: {result.registers}" except ModbusException as e: return f"Error communicating with slave {slave_id}: {str(e)}" # Prompts: Templates for Modbus interactions @mcp.prompt() def analyze_register(value: str) -> List[base.Message]: """Prompt to analyze a Modbus register value.""" return [ base.UserMessage(f"I read this value from a Modbus register: {value}"), base.UserMessage("Can you help me understand what it means?"), base.AssistantMessage("I'll help analyze the register value. Please provide any context about the device or system.") ] def main() -> None: """Run the MCP server.""" mcp.run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kukapay/mudbus-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server