Skip to main content
Glama

FreeRTOS Embedded Development MCP

by aitersmam888
server.py16.7 kB
#!/usr/bin/env python3 """MCP服务器主文件 - 文心快码嵌入式FreeRTOS扩展""" import asyncio import logging from typing import Any, Dict, List, Optional try: from mcp.server import Server from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types MCP_AVAILABLE = True except ImportError: MCP_AVAILABLE = False # 创建模拟类用于开发测试 class Server: def __init__(self, name): pass def list_tools(self): def decorator(func): return func return decorator def call_tool(self): def decorator(func): return func return decorator def get_capabilities(self, **kwargs): return {} def run(self, *args, **kwargs): pass class types: Tool = type('Tool', (), {}) TextContent = type('TextContent', (), {}) class InitializationOptions: pass from .knowledge.knowledge_loader import KnowledgeLoader from .memory.memory_manager import MemoryManager from .generators.project_generator import ProjectGenerator logger = logging.getLogger(__name__) class FreeRTOSMCPServer: """嵌入式FreeRTOS MCP服务器""" def __init__(self): self.server = Server("embedded-freertos-mcp") self.knowledge_loader: Optional[KnowledgeLoader] = None self.memory_manager: Optional[MemoryManager] = None self.project_generator: Optional[ProjectGenerator] = None # 注册工具处理函数 self.server.list_tools()(self._list_tools) self.server.call_tool()(self._call_tool) async def start(self): """启动MCP服务器""" async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name="embedded-freertos-mcp", server_version="1.0.0", capabilities=self.server.get_capabilities( notification_options=None, experimental_capabilities=None ) ) ) def _initialize_components(self, config: Dict[str, Any]): """初始化组件""" knowledge_base_path = config.get("knowledge_base_path", "./knowledge_base") memory_db_path = config.get("memory_db_path", "./memory/memory.db") self.knowledge_loader = KnowledgeLoader(knowledge_base_path) self.memory_manager = MemoryManager(memory_db_path) self.project_generator = ProjectGenerator(knowledge_base_path, memory_db_path) async def _list_tools(self) -> List[types.Tool]: """列出可用工具""" return [ types.Tool( name="create_embedded_project", description="创建完整的嵌入式FreeRTOS项目,自动读取芯片知识库", inputSchema={ "type": "object", "properties": { "project_name": { "type": "string", "description": "项目名称" }, "chip_family": { "type": "string", "description": "芯片系列(如:bk7252, stm32f4)", "enum": self._get_available_chips() }, "output_dir": { "type": "string", "description": "输出目录路径", "default": "./projects" } }, "required": ["project_name", "chip_family"] } ), types.Tool( name="get_chip_info", description="获取芯片详细信息", inputSchema={ "type": "object", "properties": { "chip_family": { "type": "string", "description": "芯片系列", "enum": self._get_available_chips() } }, "required": ["chip_family"] } ), types.Tool( name="search_knowledge_base", description="搜索芯片规格书和文档", inputSchema={ "type": "object", "properties": { "chip_family": { "type": "string", "description": "芯片系列", "enum": self._get_available_chips() }, "search_query": { "type": "string", "description": "搜索关键词" } }, "required": ["chip_family", "search_query"] } ), types.Tool( name="generate_driver_code", description="生成外设驱动代码", inputSchema={ "type": "object", "properties": { "chip_family": { "type": "string", "description": "芯片系列", "enum": self._get_available_chips() }, "peripheral": { "type": "string", "description": "外设类型", "enum": ["gpio", "uart", "spi", "i2c", "adc", "pwm"] } }, "required": ["chip_family", "peripheral"] } ), types.Tool( name="save_project_memory", description="保存项目配置到记忆库", inputSchema={ "type": "object", "properties": { "project_name": { "type": "string", "description": "项目名称" }, "chip_family": { "type": "string", "description": "芯片系列" }, "config_data": { "type": "object", "description": "配置数据" } }, "required": ["project_name", "chip_family", "config_data"] } ), types.Tool( name="load_project_memory", description="从记忆库加载项目配置", inputSchema={ "type": "object", "properties": { "project_name": { "type": "string", "description": "项目名称" } }, "required": ["project_name"] } ) ] def _get_available_chips(self) -> List[str]: """获取可用芯片列表""" if self.knowledge_loader: return self.knowledge_loader.get_available_chips() return ["bk7252", "stm32f4", "esp32"] async def _call_tool(self, name: str, arguments: Dict[str, Any]) -> List[Any]: """调用工具""" try: if not self.knowledge_loader: # 使用默认配置初始化 self._initialize_components({}) if name == "create_embedded_project": return await self._create_project(arguments) elif name == "get_chip_info": return await self._get_chip_info(arguments) elif name == "search_knowledge_base": return await self._search_knowledge_base(arguments) elif name == "generate_driver_code": return await self._generate_driver_code(arguments) elif name == "save_project_memory": return await self._save_project_memory(arguments) elif name == "load_project_memory": return await self._load_project_memory(arguments) else: raise ValueError(f"未知工具: {name}") except Exception as e: logger.error(f"工具调用失败: {e}") if MCP_AVAILABLE: return [types.TextContent( type="text", text=f"错误: {str(e)}" )] else: return [{"type": "text", "text": f"错误: {str(e)}"}] async def _create_project(self, arguments: Dict[str, Any]) -> List[Any]: """创建项目工具""" project_name = arguments["project_name"] chip_family = arguments["chip_family"] output_dir = arguments.get("output_dir", "./projects") if not self.project_generator: raise RuntimeError("项目生成器未初始化") result = self.project_generator.generate_project(project_name, chip_family, output_dir) if result["success"]: text = f"""项目创建成功! 项目名称: {project_name} 芯片型号: {chip_family} 输出目录: {result['project_path']} 生成的文件: {chr(10).join(f"- {file}" for file in result['generated_files'][:10])} ... 共 {len(result['generated_files'])} 个文件 芯片信息: - 核心: {result['chip_info']['chip']['core']} - 频率: {result['chip_info']['chip']['frequency']} - Flash: {result['chip_info']['chip']['flash_size']} - RAM: {result['chip_info']['chip']['ram_size']} 项目已保存到记忆库,可以使用 load_project_memory 工具重新加载。""" else: text = f"项目创建失败: {result['error']}" if MCP_AVAILABLE: return [types.TextContent(type="text", text=text)] else: return [{"type": "text", "text": text}] async def _get_chip_info(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """获取芯片信息工具""" chip_family = arguments["chip_family"] chip_config = self.knowledge_loader.load_chip_config(chip_family) datasheet_info = self.knowledge_loader.load_datasheet_info(chip_family) sdk_info = self.knowledge_loader.load_sdk_info(chip_family) if not chip_config: return [types.TextContent(type="text", text=f"未找到芯片 {chip_family} 的信息")] text = f"""芯片信息: {chip_config['chip']['name']} 基本参数: - 系列: {chip_config['chip']['family']} - 核心: {chip_config['chip']['core']} - 频率: {chip_config['chip']['frequency']} - Flash: {chip_config['chip']['flash_size']} - RAM: {chip_config['chip']['ram_size']} 外设资源: {chr(10).join(f"- {periph}: {details}" for periph, details in chip_config['peripherals'].items())} 内存映射: - Flash: {chip_config['memory_map']['flash_start']} - {chip_config['memory_map']['flash_end']} - RAM: {chip_config['memory_map']['ram_start']} - {chip_config['memory_map']['ram_end']} 编译配置: 工具链: {chip_config['compiler']['toolchain']} 编译选项: {', '.join(chip_config['compiler']['flags'][:3])}...""" if datasheet_info: text += f"\n\n规格书: {', '.join(datasheet_info['datasheet_files'])}" if sdk_info: text += f"\n\nSDK驱动: {', '.join(sdk_info['available_drivers'][:5])}..." return [types.TextContent(type="text", text=text)] async def _search_knowledge_base(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """搜索知识库工具""" chip_family = arguments["chip_family"] search_query = arguments["search_query"] # 这里可以实现更复杂的搜索逻辑 chip_config = self.knowledge_loader.load_chip_config(chip_family) if not chip_config: return [types.TextContent(type="text", text=f"未找到芯片 {chip_family} 的信息")] # 简单的关键词匹配搜索 results = [] search_lower = search_query.lower() # 搜索芯片配置 chip_text = f"{chip_config['chip']['name']} {chip_config['chip']['description']}".lower() if search_lower in chip_text: results.append("芯片基本信息") # 搜索外设 for periph, details in chip_config['peripherals'].items(): if search_lower in periph.lower(): results.append(f"外设: {periph} - {details}") if results: text = f"搜索 '{search_query}' 在 {chip_family} 中的结果:\n\n" + "\n".join(f"- {result}" for result in results[:10]) else: text = f"未找到与 '{search_query}' 相关的内容" return [types.TextContent(type="text", text=text)] async def _generate_driver_code(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """生成驱动代码工具""" chip_family = arguments["chip_family"] peripheral = arguments["peripheral"] chip_config = self.knowledge_loader.load_chip_config(chip_family) if not chip_config: return [types.TextContent(type="text", text=f"未找到芯片 {chip_family} 的信息")] # 生成简单的驱动代码模板 driver_code = self._generate_driver_template(chip_family, peripheral, chip_config) return [types.TextContent(type="text", text=driver_code)] def _generate_driver_template(self, chip_family: str, peripheral: str, chip_config: Dict[str, Any]) -> str: """生成驱动代码模板""" if peripheral == "gpio": return f"""// {chip_config['chip']['name']} GPIO驱动模板 #include "gpio.h" void GPIO_Init(GPIO_TypeDef *GPIOx, GPIO_InitTypeDef *init) {{ // GPIO初始化代码 // 芯片: {chip_family} // 支持 {chip_config['peripherals']['gpio']['ports']} 端口 }} void GPIO_WritePin(GPIO_TypeDef *GPIOx, uint16_t pin, GPIO_PinState state) {{ // 写GPIO引脚 }} GPIO_PinState GPIO_ReadPin(GPIO_TypeDef *GPIOx, uint16_t pin) {{ // 读GPIO引脚 return GPIO_PIN_RESET; }}""" elif peripheral == "uart": return f"""// {chip_config['chip']['name']} UART驱动模板 #include "uart.h" void UART_Init(UART_TypeDef *UARTx, UART_InitTypeDef *init) {{ // UART初始化 // 波特率范围: {chip_config['peripherals']['uart']['baud_rate']} // 支持 {chip_config['peripherals']['uart']['count']} 个UART接口 }} void UART_SendData(UART_TypeDef *UARTx, uint8_t data) {{ // 发送数据 }} uint8_t UART_ReceiveData(UART_TypeDef *UARTx) {{ // 接收数据 return 0; }}""" else: return f"// {chip_config['chip']['name']} {peripheral.upper()} 驱动模板\n// 驱动代码生成功能待完善" async def _save_project_memory(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """保存项目记忆工具""" project_name = arguments["project_name"] chip_family = arguments["chip_family"] config_data = arguments["config_data"] success = self.memory_manager.save_project_config(project_name, chip_family, config_data) if success: text = f"项目 '{project_name}' 配置已保存到记忆库" else: text = f"保存项目配置失败" return [types.TextContent(type="text", text=text)] async def _load_project_memory(self, arguments: Dict[str, Any]) -> List[types.TextContent]: """加载项目记忆工具""" project_name = arguments["project_name"] config = self.memory_manager.load_project_config(project_name) if config: text = f"""项目 '{project_name}' 配置: 芯片: {config.get('chip_family', '未知')} 生成文件数: {len(config.get('generated_files', []))} 配置详情已加载,可以使用相关工具继续开发。""" else: text = f"未找到项目 '{project_name}' 的配置" return [types.TextContent(type="text", text=text)]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aitersmam888/freertos_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server