Skip to main content
Glama
test_integration_mock.py12.6 kB
"""基于Mock的集成测试脚本 本测试脚本模拟串口回环效果,无需真实硬件即可测试所有MCP工具功能。 适用于CICD环境的自动化测试。 运行方式: pytest tests/test_integration_mock.py -v """ import base64 import time import pytest # 测试配置 MOCK_PORT = "/dev/ttyMOCK0" MOCK_BAUDRATE = 115200 class TestMockIntegration: """基于Mock的集成测试类""" # ========== 阶段1:串口基础操作测试 ========== def test_list_ports(self, mock_list_ports_with_devices, reset_managers): """测试:列出所有可用串口""" from uart_mcp.tools.list_ports import list_ports ports = list_ports() assert len(ports) == 2 port_names = [p["port"] for p in ports] assert MOCK_PORT in port_names assert "/dev/ttyMOCK1" in port_names def test_open_port(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:打开串口""" from uart_mcp.tools.port_ops import open_port result = open_port( port=MOCK_PORT, baudrate=MOCK_BAUDRATE, bytesize=8, parity="N", stopbits=1.0, ) assert result.get("is_open") is True assert result.get("port") == MOCK_PORT def test_get_status(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:获取串口状态""" from uart_mcp.tools.port_ops import get_status, open_port # 先打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) result = get_status(port=MOCK_PORT) assert result.get("is_open") is True config = result.get("config", {}) assert config.get("baudrate") == MOCK_BAUDRATE def test_set_config(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:修改串口配置(热更新)""" from uart_mcp.tools.port_ops import get_status, open_port, set_config # 先打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) # 修改波特率 new_baudrate = 9600 result = set_config(port=MOCK_PORT, baudrate=new_baudrate) config = result.get("config", {}) assert config.get("baudrate") == new_baudrate # 验证配置已更新 status = get_status(port=MOCK_PORT) assert status.get("config", {}).get("baudrate") == new_baudrate def test_close_port(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:关闭串口""" from uart_mcp.tools.port_ops import close_port, open_port # 先打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) result = close_port(port=MOCK_PORT) assert result.get("success") is True # ========== 阶段2:数据通信测试(回环) ========== def test_send_receive_text(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:发送和接收文本数据(回环测试)""" from uart_mcp.tools.data_ops import read_data, send_data from uart_mcp.tools.port_ops import open_port # 打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) test_message = "Hello UART 你好串口!" # 发送数据 send_result = send_data(port=MOCK_PORT, data=test_message, is_binary=False) assert send_result.get("success") is True # 读取数据(回环) read_result = read_data(port=MOCK_PORT, is_binary=False) received = read_result.get("data", "") assert test_message in received def test_send_receive_binary(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:发送和接收二进制数据(回环测试)""" from uart_mcp.tools.data_ops import read_data, send_data from uart_mcp.tools.port_ops import open_port # 打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) # 准备二进制数据 raw_data = bytes([0x01, 0x02, 0x03, 0xFE, 0xFF]) b64_data = base64.b64encode(raw_data).decode("ascii") # 发送二进制数据 send_result = send_data(port=MOCK_PORT, data=b64_data, is_binary=True) assert send_result.get("success") is True # 读取二进制数据(回环) read_result = read_data(port=MOCK_PORT, is_binary=True) received_b64 = read_result.get("data", "") received_raw = base64.b64decode(received_b64) if received_b64 else b"" assert raw_data == received_raw # ========== 阶段3:终端会话测试 ========== def test_create_session(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:创建终端会话""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import create_session # 先打开串口 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) result = create_session( port=MOCK_PORT, line_ending="CRLF", local_echo=False, ) assert result.get("session_id") == MOCK_PORT def test_list_sessions(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:列出所有会话""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import create_session, list_sessions # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) result = list_sessions() sessions = result.get("sessions", []) session_ids = [s.get("session_id") if isinstance(s, dict) else s for s in sessions] assert MOCK_PORT in session_ids def test_get_session_info(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:获取会话信息""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import create_session, get_session_info # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) result = get_session_info(session_id=MOCK_PORT) assert result.get("session_id") == MOCK_PORT def test_send_command_read_output(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:发送命令并读取输出(回环测试)""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import ( clear_buffer, create_session, read_output, send_command, ) # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) # 清空缓冲区 clear_buffer(session_id=MOCK_PORT) test_cmd = "AT" # 发送命令 send_result = send_command( session_id=MOCK_PORT, command=test_cmd, add_line_ending=True, ) assert send_result.get("success") is True # 等待后台线程读取数据 time.sleep(0.2) # 读取输出(回环模式下应该收到发送的命令) read_result = read_output(session_id=MOCK_PORT, clear=False) output = read_result.get("data", "") assert test_cmd in output def test_clear_buffer(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:清空缓冲区""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import clear_buffer, create_session, read_output # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) result = clear_buffer(session_id=MOCK_PORT) assert result.get("success") is True # 验证缓冲区已清空 read_result = read_output(session_id=MOCK_PORT, clear=False) assert len(read_result.get("data", "")) == 0 def test_close_session(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:关闭终端会话""" from uart_mcp.tools.port_ops import open_port from uart_mcp.tools.terminal import close_session, create_session # 打开串口并创建会话 open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) create_session(port=MOCK_PORT) result = close_session(session_id=MOCK_PORT) assert result.get("success") is True # ========== 阶段4:错误处理测试 ========== def test_open_nonexistent_port(self, mock_list_ports_with_devices, reset_managers): """测试:打开不存在的端口(端口不在列表中)""" from uart_mcp.errors import PortNotFoundError from uart_mcp.tools.port_ops import open_port # 不使用mock_serial_loopback,让它尝试打开真实(不存在)端口 # 由于端口不存在于系统中,应该抛出异常 with pytest.raises((PortNotFoundError, Exception)): open_port(port="/dev/ttyNONEXIST_CICD_TEST", baudrate=9600) def test_get_status_unopened_port(self, reset_managers): """测试:获取未打开端口的状态""" from uart_mcp.tools.port_ops import get_status with pytest.raises(Exception): get_status(port=MOCK_PORT) def test_send_data_unopened_port(self, reset_managers): """测试:向未打开的端口发送数据""" from uart_mcp.tools.data_ops import send_data with pytest.raises(Exception): send_data(port=MOCK_PORT, data="test", is_binary=False) class TestMockIntegrationWorkflow: """完整工作流程测试""" def test_full_workflow(self, mock_serial_loopback, mock_list_ports_with_devices, reset_managers): """测试:完整的串口通信工作流程""" from uart_mcp.tools.data_ops import read_data, send_data from uart_mcp.tools.port_ops import ( close_port, get_status, open_port, set_config, ) from uart_mcp.tools.terminal import ( clear_buffer, close_session, create_session, get_session_info, list_sessions, read_output, send_command, ) # 1. 打开串口 result = open_port(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) assert result.get("is_open") is True # 2. 获取状态 status = get_status(port=MOCK_PORT) assert status.get("is_open") is True # 3. 修改配置 new_config = set_config(port=MOCK_PORT, baudrate=9600) assert new_config.get("config", {}).get("baudrate") == 9600 # 恢复配置 set_config(port=MOCK_PORT, baudrate=MOCK_BAUDRATE) # 4. 发送/接收文本 send_data(port=MOCK_PORT, data="Hello", is_binary=False) read_result = read_data(port=MOCK_PORT, is_binary=False) assert "Hello" in read_result.get("data", "") # 5. 发送/接收二进制 raw = bytes([0xAA, 0xBB]) send_data(port=MOCK_PORT, data=base64.b64encode(raw).decode(), is_binary=True) read_result = read_data(port=MOCK_PORT, is_binary=True) assert base64.b64decode(read_result.get("data", "")) == raw # 6. 创建终端会话 session = create_session(port=MOCK_PORT) assert session.get("session_id") == MOCK_PORT # 7. 列出会话 sessions = list_sessions() assert len(sessions.get("sessions", [])) > 0 # 8. 获取会话信息 info = get_session_info(session_id=MOCK_PORT) assert info.get("session_id") == MOCK_PORT # 9. 发送命令 clear_buffer(session_id=MOCK_PORT) send_command(session_id=MOCK_PORT, command="TEST") time.sleep(0.2) output = read_output(session_id=MOCK_PORT) assert "TEST" in output.get("data", "") # 10. 清空缓冲区 clear_buffer(session_id=MOCK_PORT) output = read_output(session_id=MOCK_PORT) assert output.get("data", "") == "" # 11. 关闭会话 close_session(session_id=MOCK_PORT) # 12. 关闭串口 result = close_port(port=MOCK_PORT) assert result.get("success") is True

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/donnel666/uart-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server