Skip to main content
Glama
test_integration_real_port.py12.5 kB
"""集成测试脚本 - 使用真实串口设备进行回环测试 本脚本测试所有MCP工具功能,需要物理连接串口设备并将TX/RX接成回环。 默认测试端口: /dev/ttyUSB0 使用方法: pytest tests/test_integration_real_port.py -v 或者直接运行: python tests/test_integration_real_port.py """ import base64 import sys import time from typing import Any # 测试配置 TEST_PORT = "/dev/ttyUSB0" TEST_BAUDRATE = 115200 def print_result(test_name: str, success: bool, result: Any = None) -> None: """打印测试结果""" status = "✓ 通过" if success else "✗ 失败" print(f" {status}: {test_name}") if result and not success: print(f" 详情: {result}") def test_list_ports() -> None: """测试1: 列出所有可用串口""" from uart_mcp.tools.list_ports import list_ports try: ports = list_ports() found = any(p["port"] == TEST_PORT for p in ports) print_result("list_ports - 列出串口", found, ports) if found: print(f" 发现目标端口: {TEST_PORT}") assert found, f"未找到目标端口 {TEST_PORT}" except Exception as e: print_result("list_ports - 列出串口", False, str(e)) assert False, str(e) def test_open_port() -> None: """测试2: 打开串口""" from uart_mcp.tools.port_ops import open_port try: result = open_port( port=TEST_PORT, baudrate=TEST_BAUDRATE, bytesize=8, parity="N", stopbits=1.0, ) success = result.get("is_open", False) print_result("open_port - 打开串口", success, result) assert success, "无法打开串口" except Exception as e: print_result("open_port - 打开串口", False, str(e)) assert False, str(e) def test_get_status() -> None: """测试3: 获取串口状态""" from uart_mcp.tools.port_ops import get_status try: result = get_status(port=TEST_PORT) config = result.get("config", {}) success = result.get("is_open", False) and config.get("baudrate") == TEST_BAUDRATE print_result("get_status - 获取状态", success, result) assert success, "获取状态失败" except Exception as e: print_result("get_status - 获取状态", False, str(e)) assert False, str(e) def test_set_config() -> None: """测试4: 修改串口配置(热更新)""" from uart_mcp.tools.port_ops import set_config try: # 修改波特率 new_baudrate = 9600 result = set_config(port=TEST_PORT, baudrate=new_baudrate) config = result.get("config", {}) success = config.get("baudrate") == new_baudrate print_result("set_config - 修改配置", success, result) # 恢复原配置 set_config(port=TEST_PORT, baudrate=TEST_BAUDRATE) assert success, "修改配置失败" except Exception as e: print_result("set_config - 修改配置", False, str(e)) assert False, str(e) def test_send_receive_text() -> None: """测试5: 发送和接收文本数据(回环测试)""" from uart_mcp.tools.data_ops import read_data, send_data try: test_message = "Hello UART 你好串口!" # 发送数据 send_result = send_data(port=TEST_PORT, data=test_message, is_binary=False) if not send_result.get("success"): print_result("send_data - 发送文本", False, send_result) assert False, "发送文本失败" # 等待数据回环 time.sleep(0.1) # 读取数据 read_result = read_data(port=TEST_PORT, is_binary=False) received = read_result.get("data", "") success = test_message in received print_result( "send/read_data - 文本回环", success, f"发送: {test_message}, 接收: {received}" ) assert success, "文本回环测试失败" except Exception as e: print_result("send/read_data - 文本回环", False, str(e)) assert False, str(e) def test_send_receive_binary() -> None: """测试6: 发送和接收二进制数据(回环测试)""" from uart_mcp.tools.data_ops import read_data, send_data try: # 准备二进制数据 raw_data = bytes([0x01, 0x02, 0x03, 0xFE, 0xFF]) b64_data = base64.b64encode(raw_data).decode("ascii") # 发送二进制数据 send_result = send_data(port=TEST_PORT, data=b64_data, is_binary=True) if not send_result.get("success"): print_result("send_data - 发送二进制", False, send_result) assert False, "发送二进制失败" # 等待数据回环 time.sleep(0.1) # 读取二进制数据 read_result = read_data(port=TEST_PORT, is_binary=True) received_b64 = read_result.get("data", "") received_raw = base64.b64decode(received_b64) if received_b64 else b"" success = raw_data == received_raw print_result( "send/read_data - 二进制回环", success, f"发送: {raw_data.hex()}, 接收: {received_raw.hex()}" ) assert success, "二进制回环测试失败" except Exception as e: print_result("send/read_data - 二进制回环", False, str(e)) assert False, str(e) def test_create_session() -> None: """测试7: 创建终端会话""" from uart_mcp.tools.terminal import create_session try: result = create_session( port=TEST_PORT, line_ending="CRLF", local_echo=False, ) success = result.get("session_id") == TEST_PORT print_result("create_session - 创建会话", success, result) assert success, "创建会话失败" except Exception as e: print_result("create_session - 创建会话", False, str(e)) assert False, str(e) def test_list_sessions() -> None: """测试8: 列出所有会话""" from uart_mcp.tools.terminal import list_sessions try: result = list_sessions() sessions = result.get("sessions", []) # sessions 是对象列表,需要检查 session_id 字段 session_ids = [ s.get("session_id") if isinstance(s, dict) else s for s in sessions ] success = TEST_PORT in session_ids print_result("list_sessions - 列出会话", success, result) assert success, "列出会话失败" except Exception as e: print_result("list_sessions - 列出会话", False, str(e)) assert False, str(e) def test_get_session_info() -> None: """测试9: 获取会话信息""" from uart_mcp.tools.terminal import get_session_info try: result = get_session_info(session_id=TEST_PORT) success = result.get("session_id") == TEST_PORT print_result("get_session_info - 会话信息", success, result) assert success, "获取会话信息失败" except Exception as e: print_result("get_session_info - 会话信息", False, str(e)) assert False, str(e) def test_send_command_read_output() -> None: """测试10: 发送命令并读取输出(回环测试)""" from uart_mcp.tools.terminal import read_output, send_command try: test_cmd = "AT" # 先清空缓冲区 from uart_mcp.tools.terminal import clear_buffer clear_buffer(session_id=TEST_PORT) time.sleep(0.1) # 等待缓冲区清空 # 发送命令 send_result = send_command( session_id=TEST_PORT, command=test_cmd, add_line_ending=True, ) if not send_result.get("success"): print_result("send_command - 发送命令", False, send_result) assert False, "发送命令失败" # 等待数据回环(终端会话后台线程读取需要时间) # 尝试多次读取,最多等待2秒 output = "" for _ in range(20): # 20次 * 100ms = 2秒 time.sleep(0.1) read_result = read_output(session_id=TEST_PORT, clear=False) output = read_result.get("data", "") # 注意:键名是 "data" 而不是 "output" if test_cmd in output: break # 清空缓冲区 clear_buffer(session_id=TEST_PORT) # 回环模式下应该收到发送的命令 success = test_cmd in output print_result( "send/read_output - 命令回环", success, f"发送: {test_cmd}, 输出: {repr(output)}" ) assert success, "命令回环测试失败" except Exception as e: print_result("send/read_output - 命令回环", False, str(e)) assert False, str(e) def test_clear_buffer() -> None: """测试11: 清空缓冲区""" from uart_mcp.tools.terminal import clear_buffer, read_output try: result = clear_buffer(session_id=TEST_PORT) success = result.get("success", False) print_result("clear_buffer - 清空缓冲区", success, result) # 验证缓冲区已清空 read_result = read_output(session_id=TEST_PORT, clear=False) empty = len(read_result.get("data", "")) == 0 # 键名是 "data" print_result("clear_buffer - 验证已清空", empty, read_result) assert success and empty, "清空缓冲区失败" except Exception as e: print_result("clear_buffer - 清空缓冲区", False, str(e)) assert False, str(e) def test_close_session() -> None: """测试12: 关闭终端会话""" from uart_mcp.tools.terminal import close_session try: result = close_session(session_id=TEST_PORT) success = result.get("success", False) print_result("close_session - 关闭会话", success, result) assert success except Exception as e: print_result("close_session - 关闭会话", False, str(e)) assert False, str(e) def test_close_port() -> None: """测试13: 关闭串口""" from uart_mcp.tools.port_ops import close_port try: result = close_port(port=TEST_PORT) success = result.get("success", False) print_result("close_port - 关闭串口", success, result) assert success except Exception as e: print_result("close_port - 关闭串口", False, str(e)) assert False, str(e) def run_all_tests() -> None: """运行所有集成测试(独立脚本模式)""" print("=" * 60) print("UART MCP 集成测试 - 真实串口回环测试") print(f"测试端口: {TEST_PORT}") print(f"波特率: {TEST_BAUDRATE}") print("=" * 60) tests = [ ("list_ports", test_list_ports), ("open_port", test_open_port), ("get_status", test_get_status), ("set_config", test_set_config), ("send_receive_text", test_send_receive_text), ("send_receive_binary", test_send_receive_binary), ("create_session", test_create_session), ("list_sessions", test_list_sessions), ("get_session_info", test_get_session_info), ("send_command_read_output", test_send_command_read_output), ("clear_buffer", test_clear_buffer), ("close_session", test_close_session), ("close_port", test_close_port), ] results: dict[str, bool] = {} for name, test_func in tests: try: test_func() results[name] = True except AssertionError: results[name] = False except Exception as e: print(f" 测试 {name} 异常: {e}") results[name] = False # 测试统计 print("\n" + "=" * 60) print("测试结果统计") print("=" * 60) passed = sum(1 for v in results.values() if v) total = len(results) for name, success in results.items(): status = "✓" if success else "✗" print(f" {status} {name}") print("-" * 40) print(f"通过: {passed}/{total}") print(f"失败: {total - passed}/{total}") if passed == total: print("\n🎉 所有测试通过!") else: print("\n⚠️ 部分测试失败,请检查上述详情") if __name__ == "__main__": # 支持命令行参数指定端口 if len(sys.argv) > 1: TEST_PORT = sys.argv[1] if len(sys.argv) > 2: TEST_BAUDRATE = int(sys.argv[2]) run_all_tests()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/donnel666/uart-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server