Skip to main content
Glama
server.py8.62 kB
""" 优化版文件夹文档生成MCP服务器主文件 这是MCP服务器的核心实现,集成了所有中间件、服务和工具。 """ import asyncio import logging logger = logging.getLogger(__name__) import sys from pathlib import Path from typing import Optional, Dict, Any from mcp.server.fastmcp import FastMCP import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from data_access import ( FileSystemInterface, CacheInterface, ConfigInterface, SecurityValidator ) from middleware import ( auth_middleware, logging_middleware, error_handler_middleware ) from services import ( DocumentService, AnalysisService, TemplateService, CacheService ) from tools import register_all_tools from utils import ( ConfigManager, create_default_config_manager, InteractionManager, set_interaction_level, InteractionLevel, create_task, start_task, complete_task, fail_task, get_progress_summary, default_logger, LogLevel, LoggerCategory, performance_monitor, memory_usage_monitor, save_performance_report, get_current_performance_status ) class OptimizedFolderDocMCPServer: """优化版文件夹文档生成MCP服务器""" def __init__(self, config_path: Optional[str] = None, interaction_level: InteractionLevel = InteractionLevel.NORMAL): """ 初始化MCP服务器 Args: config_path: 配置文件路径 interaction_level: 交互级别 """ # 设置交互级别 self.interaction = InteractionManager() set_interaction_level(interaction_level) # 初始化配置管理器 self.config_manager = create_default_config_manager(config_path or "config/user_config.yaml") # 加载配置 self.config_interface = ConfigInterface() self.config = self.config_interface.load_config(config_path) # 初始化数据访问层 self.security_validator = SecurityValidator() self.file_system = FileSystemInterface(self.security_validator) self.cache = CacheInterface( memory_size=self.config.performance.cache_memory_size, disk_cache_dir=self.config.performance.cache_disk_dir, disk_max_size=self.config.performance.cache_disk_max_size, enable_disk_cache=self.config.performance.cache_disk_enabled ) # 初始化服务层 self.cache_service = CacheService(self.cache) self.template_service = TemplateService( template_dir=self.config_dir / "config/templates" ) self.document_service = DocumentService( self.file_system, self.template_service, self.cache_service ) self.analysis_service = AnalysisService( self.file_system, self.cache_service ) # 初始化MCP服务器 self.mcp = FastMCP("optimized_folder_docs_mcp") # 设置中间件 self._setup_middleware() # 注册工具 self._register_tools() # 配置认证中间件的允许路径 if self.config.security.allowed_paths: auth_middleware.allowed_paths = set(self.config.security.allowed_paths) @property def config_dir(self) -> Path: """获取配置目录路径""" return Path(__file__).parent.parent def _setup_middleware(self): """设置中间件""" # 中间件逻辑已集成到各服务中 pass def _register_tools(self): """注册所有MCP工具""" register_all_tools( self.mcp, self.document_service, self.analysis_service, self.template_service, self.file_system, self.cache_service ) async def start(self): """启动MCP服务器""" task_id = "server_startup" try: # 创建启动进度跟踪 create_task(task_id, "MCP服务器启动") start_task(task_id) logging_middleware.logger.info( "MCP服务器启动", version="2.0.0", config=str(self.config_dir / "config" / "default.yaml") ) complete_task(task_id, "服务器初始化完成") # 启动FastMCP服务器 await self.mcp.run() except KeyboardInterrupt: fail_task(task_id, "收到停止信号") logging_middleware.logger.info("MCP服务器收到停止信号") except (RuntimeError, ValueError) as e: fail_task(task_id, f"启动失败: {str(e)}") logging_middleware.logger.error( "MCP服务器启动失败", error=str(e), error_type=type(e).__name__ ) raise async def stop(self): """停止MCP服务器""" try: # 清理缓存 self.cache.clear() # 清理会话 auth_middleware.cleanup_expired_sessions() logging_middleware.logger.info("MCP服务器已停止") except (RuntimeError, ValueError) as e: logging_middleware.logger.error( "MCP服务器停止时出错", error=str(e) ) async def health_check(self) -> Dict[str, Any]: """健康检查""" try: # 检查各组件状态 cache_stats = self.cache.get_stats() health = { "status": "healthy", "version": "2.0.0", "components": { "cache": { "status": "healthy", "stats": cache_stats }, "config": { "status": "healthy", "config_loaded": self.config is not None }, "file_system": { "status": "healthy" } }, "timestamp": logging_middleware.logger.bind().get("timestamp", "unknown") } return health except (RuntimeError, ValueError) as e: return { "status": "unhealthy", "error": str(e), "timestamp": "unknown" } async def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description="优化版文件夹文档生成MCP服务器") parser.add_argument( "--config", type=str, help="配置文件路径", default=None ) parser.add_argument( "--health-check", action="store_true", help="执行健康检查并退出" ) parser.add_argument( "--interactive-config", action="store_true", help="运行交互式配置" ) parser.add_argument( "--interaction-level", type=str, choices=["silent", "basic", "normal", "verbose", "debug"], default="normal", help="交互级别" ) args = parser.parse_args() # 设置交互级别 interaction_level = InteractionLevel(args.interaction_level) # 创建服务器实例 server = OptimizedFolderDocMCPServer(args.config, interaction_level) # 如果需要交互式配置 if args.interactive_config: server.config_manager.interactive_config() return try: if args.health_check: # 执行健康检查 health = await server.health_check() print("健康检查结果:") print(f"状态: {health['status']}") if health['status'] == 'unhealthy': print(f"错误: {health['error']}") sys.exit(1) else: print("所有组件运行正常") sys.exit(0) else: # 启动服务器 await server.start() except KeyboardInterrupt: print("\n收到停止信号,正在关闭服务器...") except (RuntimeError, ValueError) as e: print(f"服务器运行出错: {e}") sys.exit(1) finally: await server.stop() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server