MCP2Serial
- src
- mcp2serial
# ====================================================
# Project: MCP2Serial
# Description: A protocol conversion tool that enables
# hardware devices to communicate with
# large language models (LLM) through serial ports.
# Repository: https://github.com/mcp2everything/mcp2serial.git
# License: MIT License
# Author: mcp2everything
# Copyright (c) 2024 mcp2everything
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# ====================================================
from typing import Any, Optional, Tuple, Dict, List
import asyncio
import serial
import serial.tools.list_ports
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
import logging
import yaml
import os
from dataclasses import dataclass, field
import time
# 设置日志级别为 DEBUG
logging.basicConfig(
level=logging.DEBUG, # 改为 DEBUG 级别以显示更多信息
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 添加版本号常量
VERSION = "0.1.0" # 添加了自动\r\n和更详细的错误信息
server = Server("mcp2serial")
@dataclass
class Command:
"""Configuration for a serial command."""
command: str
need_parse: bool
prompts: List[str]
@dataclass
class Config:
"""Configuration for MCP2Serial service."""
port: Optional[str] = None
baud_rate: int = 115200
timeout: float = 1.0
read_timeout: float = 1.0
response_start_string: str = "OK" # 新增:可配置的应答开始字符串
commands: Dict[str, Command] = field(default_factory=dict)
@staticmethod
def load(config_path: str = "config.yaml") -> 'Config':
"""Load configuration from YAML file."""
# 获取配置文件名
config_name = os.path.basename(config_path)
# 定义可能的配置文件位置
config_paths = [
config_path, # 首先检查指定的路径
os.path.join(os.getcwd(), config_name), # 当前工作目录
os.path.expanduser(f"~/.mcp2serial/{config_name}"), # 用户主目录
]
# 添加系统级目录
if os.name == 'nt': # Windows
config_paths.append(os.path.join(os.environ.get("ProgramData", "C:\\ProgramData"),
"mcp2serial", config_name))
else: # Linux/Mac
config_paths.append(f"/etc/mcp2serial/{config_name}")
# 尝试从每个位置加载配置
for path in config_paths:
if os.path.exists(path):
try:
with open(path, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
logger.info(f"Loading configuration from {path}")
# Load serial configuration
serial_config = config_data.get('serial', {})
config = Config(
port=serial_config.get('port'),
baud_rate=serial_config.get('baud_rate', 115200),
timeout=serial_config.get('timeout', 1.0),
read_timeout=serial_config.get('read_timeout', 1.0),
response_start_string=serial_config.get('response_start_string', 'OK') # 新增:加载应答开始字符串
)
# Load commands
commands_data = config_data.get('commands', {})
for cmd_id, cmd_data in commands_data.items():
raw_command = cmd_data.get('command', '')
logger.debug(f"Loading command {cmd_id}: {repr(raw_command)}")
config.commands[cmd_id] = Command(
command=raw_command,
need_parse=cmd_data.get('need_parse', False),
prompts=cmd_data.get('prompts', [])
)
logger.debug(f"Loaded command {cmd_id}: {repr(config.commands[cmd_id].command)}")
return config
except Exception as e:
logger.warning(f"Error loading config from {path}: {e}")
continue
logger.info("No valid config file found, using defaults")
return Config()
config = Config.load()
class SerialConnection:
"""Serial port connection manager."""
def __init__(self):
self.serial_port: Optional[serial.Serial] = None
self.baud_rate: int = config.baud_rate
self.timeout: float = 2.0 # 最大超时2秒
self.read_timeout: float = 1.0
self.is_loopback: bool = False # 新增:标记是否为回环模式
def connect(self) -> bool:
"""Attempt to connect to an available serial port."""
try:
# 检查是否为回环模式
if config.port == "LOOP_BACK":
logger.info("Using LOOP_BACK mode")
self.is_loopback = True
return True
# 如果已经连接,直接返回
if self.serial_port and self.serial_port.is_open:
logger.debug("Using existing serial connection")
return True
# 关闭可能存在的连接
if self.serial_port:
try:
self.serial_port.close()
except:
pass
self.serial_port = None
# 尝试连接指定端口
if config.port:
logger.info(f"Attempting to connect to configured port: {config.port}")
try:
self.serial_port = serial.Serial(
port=config.port,
baudrate=self.baud_rate,
timeout=self.timeout
)
logger.info(f"Connected to configured port: {config.port}")
return True
except serial.SerialException as e:
logger.error(f"Failed to connect to configured port {config.port}: {str(e)}")
raise ValueError(f"Serial port {config.port} not available: {str(e)}")
# 搜索可用端口
logger.info("No port configured, searching for available ports...")
ports = list(serial.tools.list_ports.comports())
if not ports:
logger.error("No serial ports found")
raise ValueError("No serial ports available")
logger.info(f"Found ports: {', '.join(p.device for p in ports)}")
for port in ports:
try:
self.serial_port = serial.Serial(
port=port.device,
baudrate=self.baud_rate,
timeout=self.timeout
)
logger.info(f"Connected to port: {port.device}")
return True
except serial.SerialException:
continue
raise ValueError("Failed to connect to any available serial port")
except Exception as e:
logger.error(f"Unexpected error in connect: {str(e)}")
raise ValueError(f"Connection error: {str(e)}")
def send_command(self, command: Command, arguments: Dict[str, Any]) -> list[types.TextContent]:
"""Send a command to the serial port and return result according to MCP protocol."""
try:
# 确保连接
if not self.is_loopback and (not self.serial_port or not self.serial_port.is_open):
logger.info("No active connection, attempting to connect...")
if not self.connect():
error_msg = f"[MCP2Serial v{VERSION}] Failed to establish serial connection.\n"
error_msg += "Please check:\n"
error_msg += "1. Serial port is correctly configured in config.yaml\n"
error_msg += "2. Device is properly connected\n"
error_msg += "3. No other program is using the port"
return [types.TextContent(
type="text",
text=error_msg
)]
# 准备命令
cmd_str = command.command.format(**arguments)
# 确保命令以\r\n结尾
cmd_str = cmd_str.rstrip() + '\r\n' # 移除可能的空白字符,强制添加\r\n
cmd_bytes = cmd_str.encode()
logger.info(f"Sending command: {cmd_str.strip()}")
logger.info(f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}")
if self.is_loopback:
# 回环模式:直接返回发送的命令和OK响应
responses = [
cmd_str.encode(), # 命令回显
f"{config.response_start_string}\r\n".encode() # OK响应
]
else:
# 清空缓冲区
self.serial_port.reset_input_buffer()
self.serial_port.reset_output_buffer()
# 发送命令
bytes_written = self.serial_port.write(cmd_bytes)
logger.info(f"Wrote {bytes_written} bytes")
self.serial_port.flush()
# 等待一段时间确保命令被处理
time.sleep(0.1)
# 读取所有响应
responses = []
while self.serial_port.in_waiting:
response = self.serial_port.readline()
logger.info(f"Raw response: {response}")
if response:
responses.append(response)
if not responses:
logger.error("No response received within timeout")
error_msg = f"[MCP2Serial v{VERSION}] Command timeout - no response within {self.read_timeout} second(s)\n"
error_msg += f"Command sent: {cmd_str.strip()}\n"
error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n"
error_msg += "Please check:\n"
error_msg += "1. Device is powered and responding\n"
error_msg += "2. Baud rate matches device settings\n"
error_msg += "3. Serial connection is stable\n"
return [types.TextContent(
type="text",
text=error_msg
)]
# 解码第一行响应
first_response = responses[0]
first_line = first_response.decode().strip()
logger.info(f"Decoded first response: {first_line}")
# 检查是否有第二行响应
if len(responses) > 1:
second_response = responses[1]
if second_response.startswith(config.response_start_string.encode()): # 使用配置的应答开始字符串
if command.need_parse:
return [types.TextContent(
type="text",
text=second_response.decode().strip()
)]
return []
# 如果响应不是预期的格式,返回详细的错误信息
error_msg = f"[MCP2Serial v{VERSION}] Command execution failed.\n"
error_msg += f"Command sent: {cmd_str.strip()}\n"
error_msg += f"Command bytes ({len(cmd_bytes)} bytes): {' '.join([f'0x{b:02X}' for b in cmd_bytes])}\n"
error_msg += "Responses received:\n"
for i, resp in enumerate(responses, 1):
error_msg += f"{i}. Raw: {resp!r}\n Decoded: {resp.decode().strip()}\n"
error_msg += "\nPossible reasons:\n"
error_msg += f"- Device echoed the command but did not send {config.response_start_string} response\n"
error_msg += "- Command format may be incorrect\n"
error_msg += "- Device may be in wrong mode\n"
return [types.TextContent(
type="text",
text=error_msg
)]
except serial.SerialTimeoutException as e:
logger.error(f"Serial timeout: {str(e)}")
error_msg = f"[MCP2Serial v{VERSION}] Command timeout - {str(e)}\n"
error_msg += "Please check:\n"
error_msg += "1. Device is powered and responding\n"
error_msg += "2. Baud rate matches device settings\n"
error_msg += "3. Device is not busy with other operations"
return [types.TextContent(
type="text",
text=error_msg
)]
except serial.SerialException as e:
logger.error(f"Serial error: {str(e)}")
error_msg = f"[MCP2Serial v{VERSION}] Serial communication failed - {str(e)}\n"
error_msg += "Please check:\n"
error_msg += "1. Serial port is correctly configured in config.yaml\n"
error_msg += "2. Device is properly connected\n"
error_msg += "3. No other program is using the port"
return [types.TextContent(
type="text",
text=error_msg
)]
def close(self) -> None:
"""Close the serial port connection if open."""
if self.serial_port and self.serial_port.is_open:
try:
self.serial_port.close()
logger.info(f"Closed serial port connection: {self.serial_port.port}")
except Exception as e:
logger.error(f"Error closing port: {str(e)}")
self.serial_port = None
serial_connection = SerialConnection()
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools for the MCP service."""
logger.info("Listing available tools")
tools = []
for cmd_id, command in config.commands.items():
# 从命令字符串中提取参数名
import re
param_names = re.findall(r'\{(\w+)\}', command.command)
properties = {name: {"type": "string"} for name in param_names}
tools.append(types.Tool(
name=cmd_id,
description=f"Execute {cmd_id} command",
inputSchema={
"type": "object",
"properties": properties,
"required": param_names
},
prompts=command.prompts
))
return tools
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
"""Handle tool execution requests according to MCP protocol."""
logger.info(f"Tool call received - Name: {name}, Arguments: {arguments}")
try:
if name not in config.commands:
error_msg = f"[MCP2Serial v{VERSION}] Error: Unknown tool '{name}'\n"
error_msg += "Please check:\n"
error_msg += "1. Tool name is correct\n"
error_msg += "2. Tool is configured in config.yaml"
return [types.TextContent(
type="text",
text=error_msg
)]
command = config.commands[name]
if arguments is None:
arguments = {}
# 发送命令并返回 MCP 格式的响应
return serial_connection.send_command(command, arguments)
except Exception as e:
logger.error(f"Error handling tool call: {str(e)}")
error_msg = f"[MCP2Serial v{VERSION}] Error: {str(e)}\n"
error_msg += "Please check:\n"
error_msg += "1. Configuration is correct\n"
error_msg += "2. Device is functioning properly"
return [types.TextContent(
type="text",
text=error_msg
)]
async def main(config_name: str = None) -> None:
"""Run the MCP server.
Args:
config_name: Optional configuration name. If not provided, uses default config.yaml
"""
logger.info("Starting MCP2Serial server")
# 处理配置文件名
if config_name and config_name != "default":
if not config_name.endswith("_config.yaml"):
config_name = f"{config_name}_config.yaml"
else:
config_name = "config.yaml"
# 加载配置
global config
config = Config.load(config_name)
try:
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="mcp2serial",
server_version=VERSION,
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
except Exception as e:
logger.error(f"Server error: {e}")
finally:
serial_connection.close()
if __name__ == "__main__":
import sys
config_name = sys.argv[1] if len(sys.argv) > 1 else None
asyncio.run(main(config_name))